Kafka入门-消费者

消费者

Kafka消费方式:采用pull(拉)的方式,消费者从broker中主动拉去数据。使用pull的好处就是消费者可以根据自身需求,进行拉取数据,但是坏处就是如果Kafka没有数据,那么消费者可能会陷入循环中,一直返回空数据。

消费者与消费者之间是独立的,一个消费者可以消费多个分区数据。但是消费组不同,每个分区的数据只能由消费者组中的一个消费者消费,避免重复消费导致数据重复。

消费者组:

  • 消费者组由多个consumer组成,形成一个消费者组的条件,是所有消费者的groupid相同。消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费。
  • 消费者组之间互不影响,所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者。

消费者组初始化流程:

在这里插入图片描述

消费者组详细消费流程

在这里插入图片描述

Java创建消费者

注意:在消费者API代码中必须配置消费者组id。命令行启动消费者不需要填写是因为id被自动填写为随机的消费者组id。

通过API消费一个主题的数据
//配置Properties properties = new Properties();//连接集群properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.27.101:9092,192.168.27.102:9092");//反序列化properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());//消费者组idproperties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");//1.创建一个消费者KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);//2.定义主题ArrayList<String> topics = new ArrayList<>();topics.add("first");kafkaConsumer.subscribe(topics);//3.消费数据while (true){//拉取的间隔时间ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> record : records) {System.out.println(record);}}
消费者消费指定分区

需要指定分区只需要在定义主题时,使用定义主题以及分区方法

//2.定义主题以及分区ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);
消费者组案例

创建三个消费者,进行消费不同分区

直接复制上面消费主题的代码,因为设置的groupid都是test,因此会自动成为一个消费者组。运行消费者组test内的三个消费者,然后运行生产者对每个分区进行发送消息,可以看到每个消费者都只消费了一个分区的消息。

注意:消费者组内的消费者在底层进行了编号,跟java类取名无关。

分区的分配以及再平衡

消费者组有多个消费者,而一个topic又有多个分区,那么应该由哪个消费者消费哪个分区呢?

Kafka有三种主流的分区分配策略,可以通过配置参数partiton.assignment.strategy修改分配策略,默认的策略是Range+CooperativeSticky。Kafka可以同时使用多个分区分配策略。

//设置分区分配策略
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,"org.apache.kafka.clients.consumer.RoundRobinAssignor");

再平衡:相当于原有分区的消费者突然发送意外,不能再进行消费,重新分配该分区给其他消费者;或者消费者组中新增了消费组,需要重新分配分区。

在这里插入图片描述

  • Range

    在这里插入图片描述

    Range的再平衡,会将原消费者负责的分区一次性全部交给剩下的某一个消费者

  • RoundRobin

    在这里插入图片描述

    当触发再分配时,会将原消费者负责的分区按照RoundRobin一样进行重新分发

  • Sticky

    Sticky也是针对所有topic的策略,黏性分区是一种均匀随机的分配策略,会在执行一次新的分配之前,考虑上一次的分配结果,尽量少的调整分配的变动,可以节省开销。首先会尽量均衡的分配分区给消费者,在同一组内的消费者出现问题,也会尽量保持原有分配的分区不发送变化。但是在发生再平衡时,所有的消费者需要先放弃当前持有的分区资源,等待重新分配。

  • CooperativeSticky

    CooperativeSticky是2.4版本新增的策略,在原有Sticky策略上,将原本大规模的再平衡操作,拆分成了多次小规模的再平衡,直到平衡完成。

Offset位移

offset的默认维护位置

Offset,消息位移,它表示分区中每条消息的位置信息,是一个单调递增且不变的值。换句话说,offset可以用来唯一的标识分区中每一条记录。消费者消费完一条消息记录之后,需要提交offset来告诉Kafka Broker自己消费到哪里了。

在这里插入图片描述

_consumer_offsets主题采用key和value的方式存储数据,key是group.id+topic+分区号,value就是当前offset的值,每隔一段时间,kafka内部会对这个topic进行compact,也就是每个key都保留最新数据。

默认情况下,是不允许查看消费系统主题数据的,如果需要查看该系统主题数据,要设置config/consumer.properties中添加配置exclude.internal.topics=false。默认是true,表示不能看系统相关信息。

自动提交offset

为了让用户更专注于自己的业务逻辑,Kafka提供了自动提交offset的功能,一段时间后自动提交offset。相关参数:

enable.auto.commit 是否开启自动提交offset功能,默认为true

auto.commmit.intervalms 自动提交offset分时间间隔,默认是5s。

//自动提交properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);//提交时间间隔properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,1000);
手动提交offset

自动提交是基于时间提交,开发人员很难把握提交的时机,因此Kafka还提供了手动提交offset的API。

//设置手动提交(关闭自动)properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,false);

手动提交分为同步提交和异步提交。两者的共同点是都会将本次提交的一批数据最高的偏移量提交,不同的是同步提交阻塞当前线程,一直到提交成功,并且会自动失败重试,而异步提交则没有失败重试机制,所有有可能提交失败。

同步提交:必须等待offset提交完毕,再去消费下一批数据。

//手动提交(同步)kafkaConsumer.commitSync();

异步提交:发送完offset请求后,就开始消费下一批数据了。

//手动提交(异步)kafkaConsumer.commitAsync();
指定Offset消费
//指定位置进行消费,先获取分区信息Set<TopicPartition> assignment = kafkaConsumer.assignment();//保证分区分配方案已经指定完毕,刚订阅主题时不能立即获取到分区信息while (assignment.size()==0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment = kafkaConsumer.assignment();}for (TopicPartition topicPartition : assignment) {//                   分区           指定offsetkafkaConsumer.seek(topicPartition,100);}
指定时间消费
//指定位置进行消费,先获取分区信息Set<TopicPartition> assignment = kafkaConsumer.assignment();//保证分区分配方案已经指定完毕,刚订阅主题时不能立即获取到分区信息while (assignment.size()==0){kafkaConsumer.poll(Duration.ofSeconds(1));assignment = kafkaConsumer.assignment();}HashMap<TopicPartition, Long> topicPartitionLongHashMap = new HashMap<>();for (TopicPartition topicPartition : assignment) {//如果希望是一天前的当前时刻,那么就用当前时间减去一天间隔,单位为mstopicPartitionLongHashMap.put(topicPartition,System.currentTimeMillis()- 1 * 24 * 3600 * 1000);}//将时间转换为对应的offsetMap<TopicPartition, OffsetAndTimestamp> topicPartitionOffsetAndTimestampMap = kafkaConsumer.offsetsForTimes(topicPartitionLongHashMap);for (TopicPartition topicPartition : assignment) {OffsetAndTimestamp offsetAndTimestamp = topicPartitionOffsetAndTimestampMap.get(topicPartition);kafkaConsumer.seek(topicPartition,offsetAndTimestamp.offset());}
消费者事务

在消费者消费的过程中会遇到重复消费和漏消费的情况发生。

漏消费:先提交offset后进行消费,有可能造成数据的漏消费

重复消费:已经消费数据,但是offset没有提交

在这里插入图片描述

如果想精准的进行一次性消费,那么需要Kafka消费端将消费过程和提交offset过程做原子绑定。可以将Kafka的offset保存到支持事务的工具中。(比如MySQL)

数据积压

默认日志存储时间为7天,如果当消费速度低于消息的发送速度,那么就很可能造成数据积压。

如果Kafka消费能力不足,那么可以增加Topic的分区数,并且同时提升消费者组的消费者数量,消费者数=分区数。

如果下游数据处理不及时,那么提高每批次拉取的数据量。批次拉取数据过少,使得处理的数据小于生产的数据,也会造成数据积压。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/diannao/85917.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

SpringBoot自动化部署实战技术文章大纲

技术背景与目标 介绍SpringBoot在现代开发中的重要性自动化部署的价值&#xff1a;提升效率、减少人为错误、实现CI/CD适用场景&#xff1a;中小型Web应用、微服务架构 自动化部署核心方案 基于Docker的容器化部署 SpringBoot应用打包为Docker镜像使用Docker Compose编排多容…

TDengine 集群运行监控

简介 为了确保集群稳定运行&#xff0c;TDengine 集成了多种监控指标收集机制&#xff0c;并通过 taosKeeper 进行汇总。taosKeeper 负责接收这些数据&#xff0c;并将其写入一个独立的 TDengine 实例中&#xff0c;该实例可以与被监控的 TDengine 集群保持独立。TDengine 中的…

C# 委托UI控件更新例子,何时需要使用委托

1. 例子1 private void UdpRxCallBackFunc(UdpDataStruct info) {// 1. 前置检查防止无效调用if (textBoxOutput2.IsDisposed || !textBoxOutput2.IsHandleCreated)return;// 2. 使用正确的委托类型Invoke(new Action(() >{// 3. 双重检查确保安全if (textBoxOutput2.IsDis…

[10-2]MPU6050简介 江协科技学习笔记(22个知识点)

1 2 3 欧拉角是描述三维空间中刚体或坐标系之间相对旋转的一种方法。它们由三个角度组成&#xff0c;通常表示为&#xff1a; • 偏航角&#xff08;Yaw&#xff09;&#xff1a;绕垂直轴&#xff08;通常是z轴&#xff09;的旋转&#xff0c;表示偏航方向的变化。 • 俯仰角&a…

虚拟环境共享系统包

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 虚拟环境共享系统包 python basic_pipelines/detection.py如果报错显示如下&#xff1a; Traceback (most recent call last):File "/home/ai/hailort/hailo-rpi5-exam…

Java求职者面试题解析:基础概念、计算机基础与源码原理

Java求职者面试题解析&#xff1a;基础概念、计算机基础与源码原理 第一轮&#xff1a;基础概念问题 1. 什么是Java的跨平台特性&#xff1f; Java的跨平台特性是指Java程序可以在任何支持Java虚拟机&#xff08;JVM&#xff09;的设备上运行&#xff0c;而无需重新编译。这…

网页前端开发(基础进阶4--axios)

Ajax Ajax(异步的JavaScript和XML) 。 XML是可扩展标记语言&#xff0c;本质上是一种数据格式&#xff0c;可以用来存储复杂的数据结构。 可以通过Ajax给服务器发送请求&#xff0c;并获取服务器响应的数据。 Ajax采用异步交互&#xff1a;可以在不重新加载整个页面的情况下&am…

设计模式-迪米特法则

迪米特法则 迪米特法则 (Law of Demeter, LoD)&#xff0c;也被称为“最少知识原则 (Principle of Least Knowledge)”&#xff0c;是面向对象设计中的一个重要原则。 核心思想&#xff1a;一个对象应该对其他对象有尽可能少的了解。 更具体地说&#xff0c;它规定了一个对象…

结构性-代理模式

动态代理主要是为了处理重复创建模板代码的场景。 使用示例 public interface MyInterface {String doSomething(); }public class MyInterfaceImpl implements MyInterface{Overridepublic String doSomething() {return "接口方法dosomething";} }public class M…

Unity大型项目资源框架

🎯 Unity大型项目资源管理:低端机检测后自动切换资源框架(大厂风格) 🧩 框架目标 ✅ 启动时检测机型性能,判定设备等级 ✅ 同一资源有高配/中配/低配不同压缩格式 ✅ 根据设备等级,加载对应资源包(AB) ✅ 支持动态切换(可用来切换特效/贴图分辨率/模型LOD) ✅ 保证…

MATLAB仿真:偏振光在光纤通信中的应用研究_可复现,有问题请联系博主

MATLAB仿真:偏振光在光纤通信中的应用研究 1. 研究概述 本文通过MATLAB仿真研究偏振光在光纤通信中的关键技术,包括偏振态生成、传输特性和检测方法,重点分析偏振模色散(PMD)的影响机制,并设计偏振控制优化方案。 %% 主程序框架 clc; clear; close all; addpath(Polar…

CTA-861-G-2017中文pdf版

CTA-861-G标准&#xff08;2016年11月发布&#xff09;规范未压缩高速数字接口的DTV配置&#xff0c;涵盖视频格式、色彩编码、辅助信息传输等&#xff0c;适用于DVI、HDMI等接口&#xff0c;还涉及EDID数据结构及HDR元数据等内容。

C++核心编程_继承方式

继承的语法&#xff1a;class 子类 : 继承方式 父类 继承降属性权限&#xff0c;不可升属性权限 继承方式一共有三种&#xff1a; 公共继承 保护继承 私有继承 #include <iostream> #include <string> using namespace std;class Base1 { public:int m_A; p…

Dockerfile常用指令介绍

Dockerfile常用指令介绍 Dockerfile是一个文本文件&#xff0c;用于定义Docker镜像的构建过程。下面介绍一些最常用的Dockerfile指令及其用法&#xff1a; 基础指令 FROM - 指定基础镜像 FROM python:3.9-slim这是Dockerfile的第一个指令&#xff0c;用于指定构建镜像的基础镜…

Spring中@Primary注解的作用与使用

在 Spring 框架中&#xff0c;Primary 注解用于解决依赖注入时的歧义性&#xff08;Ambiguity&#xff09;问题。当 Spring 容器中存在多个相同类型的 Bean 时&#xff0c;通过 Primary 标记其中一个 Bean 作为默认的首选注入对象。 核心作用&#xff1a; 解决多个同类型 Bean …

本地优先的状态管理与工具选型策略

本地优先&#xff1a;合理把控状态共享边界 在 React 应用开发过程中&#xff0c;开发者容易陷入一个认知误区——过度追求状态的全局化。许多新手开发者在项目初期就急于引入 Redux、Zustand 或 Jotai 等状态管理工具&#xff0c;将一些本应属于组件内部的琐碎状态&#xff0…

OpenCV CUDA模块图像处理-----对图像执行 均值漂移过程(Mean Shift Procedure)函数meanShiftProc()

操作系统&#xff1a;ubuntu22.04 OpenCV版本&#xff1a;OpenCV4.9 IDE:Visual Studio Code 编程语言&#xff1a;C11 算法描述 执行一个均值漂移过程&#xff08;mean-shift procedure&#xff09;&#xff0c;并将处理后的点的信息&#xff08;它们的颜色和位置&#xff0…

硬件I2C和软件I2C的区别

硬件I2C和软件I2C的区别 一、硬件I2C 1、硬件IC的局限性及学习意义 尽管硬件IC外设在STM32等微控制器中提供了标准化的通信支持&#xff0c;但在实际应用中&#xff0c;其稳定性可能存在问题。例如&#xff0c;某些情况下外设会因事件检测异常而进入死锁状态&#xff0c;仅能…

推荐12个wordpress企业网站模板

WordPress企业网站模板是一种专为企业网站设计的WordPress主题&#xff0c;旨在帮助企业创建专业、美观且易于管理的网站。这些模板通常具备响应式设计、SEO优化、多语言支持等功能&#xff0c;能够满足不同行业和企业的需求。 WordPress企业网站模板的适用场景 企业官网&…

68道Hbase高频题整理(附答案背诵版)

简述什么是Hbase数据库&#xff1f; Hbase是一个高可靠性、高性能、面向列、可伸缩的分布式存储系统&#xff0c;它利用HBase技术在HDFS上提供了类似于Bigtable的能力。换句话说&#xff0c;Hbase是Apache Hadoop生态系统中的一部分&#xff0c;可以为大数据应用提供快速的随机…