Kafka 生产者与消费者分区策略全解析:从原理到实践

一、生产者分区策略

1.1 分区好处

(1)便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果。

(2)提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

1.2 生产者发送消息的分区策略

1)默认的分区器 DefaultPartitioner

在 IDEA 中 ctrl +n,全局查找 DefaultPartitioner。

/**
* The default partitioning strategy:
* <ul>
* <li>If a partition is specified in the record, use it假如发送消息的时候指定分区,就使用这个分区
* <li>If no partition is specified but a key is present choose a 
partition based on a hash of the key假如发送消息没有指定分区,指定了Key值,对Key进行hash,然后对分区数取模,得到哪个分区就使用哪个分区
* <li>If no partition or key is present choose the sticky 
partition(粘性分区) that changes when the batch is full.假如分区和key值都没有指定,使用粘性分区(黏住它,使用它,发送完毕为止)
* 
* See KIP-480 for details about sticky partitioning.
*/
public class DefaultPartitioner implements Partitioner {… …
}

例如:第一次随机选择0号分区,等0号分区当前批次满了(默认16k)或者linger.ms设置的时间到, Kafka再随机一个分区进行使用(如果还是0会继续随机)。

2)案例一

将数据发往指定 partition 的情况下,例如,将所有数据发往分区 1 中。

测试:

①在 node01 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server node01:9092 --topic first

②在 IDEA 中执行代码,观察 bigdata01 控制台中是否接收到消息。

bin/kafka-console-consumer.sh --bootstrap-server node01:9092 --topic first

③在 IDEA 控制台观察回调信息。

主题:first->分区:0
主题:first->分区:0 
主题:first->分区:0 
主题:first->分区:0 
主题:first->分区:0

3)案例二

没有指明 partition 值但有 key 的情况下,将 key 的 hash 值与 topic 的 partition 数进行取余得到 partition 值。

测试:

①key="a"时,在控制台查看结果。

主题:first->分区:1

主题:first->分区:1

主题:first->分区:1

主题:first->分区:1

主题:first->分区:1

②key="b"时,在控制台查看结果。

主题:first->分区:2

主题:first->分区:2

主题:first->分区:2

主题:first->分区:2

主题:first->分区:2

③key="f"时,在控制台查看结果。

主题:first->分区:0

主题:first->分区:0

主题:first->分区:0

主题:first->分区:0

主题:first->分区:0

1.3 自定义分区器

如果研发人员可以根据企业需求,自己重新实现分区器。

1)需求

例如我们实现一个分区器实现,发送过来的数据中如果包含 bigdata,就发往 0 号分区, 不包含bigdata,就发往 1 号分区。

2)实现步骤

(1)定义类实现 Partitioner 接口。

(2)重写 partition()方法

package com.bigdata.partitioner;import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;import java.util.Map;public class MyPartitioner implements Partitioner {/*** 返回信息对应的分区* @param topic 主题* @param key 消息的 key* @param keyBytes 消息的 key 序列化后的字节数组* @param value 消息的 value* @param valueBytes 消息的 value 序列化后的字节数组* @param cluster 集群元数据可以查看分区信息* @return*/@Overridepublic int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {// 假如消息中含有bigdata 发送0分区,否则发送1分区String msg = new String(valueBytes);if(msg.contains("bigdata")){return 0;}return 1;}@Overridepublic void close() {}@Overridepublic void configure(Map<String, ?> configs) {}
}

(3)使用分区器的方法,在生产者的配置中添加分区器参数

package com.bigdata.producer;import org.apache.kafka.clients.producer.*;import java.util.Properties;
import java.util.concurrent.ExecutionException;public class CustomProducer06 {public static void main(String[] args) throws InterruptedException, ExecutionException {Properties properties = new Properties();// 设置连接kafka集群的ip和端口properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");properties.put("bootstrap.servers", "bigdata01:9092");properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.bigdata.partitioner.MyPartitioner");KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);for (int i = 0; i < 10; i++) {ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>("second","abc","加油!:"+i);kafkaProducer.send(producerRecord, new Callback() {@Overridepublic void onCompletion(RecordMetadata metadata, Exception exception) {System.out.printf("消息发送给了%d分区\n",(metadata.partition()));}});}kafkaProducer.close();}
}

4)测试

测试按照以下几种情况:

第一种:使用了自定义分区器,并且指定分区发送

第二种:使用了自定义分区器,并且发送的时候带有 key 值

第三种:使用了自定义分区器,没有指定分区和 key

每一种测试时消息发送带有 bigdata 的,再 换成不含 bigdata 的。

①在 bigdata01 上开启 Kafka 消费者。

kafka-console-consumer.sh --bootstrap-server bigdata01:9092 --topic first 

②在 IDEA 控制台观察回调信息。

主题:first->分区:0

主题:first->分区:0

主题:first->分区:0

主题:first->分区:0

主题:first->分区:0

注意:假如我自定义了一个分区规则,如果代码中指定了消息发送到某个分区,自定义的分区规则无效。

比如:我自定义了一个分区器,包含 bigdata 发送 0 分区,不包含发送 1 分区,但假如发送消息的时候指定消息发送到 2 分区,那么消息就必然发送 2 分区。不走咱们自定义的分区器规则了。

如果没有指定分区规则,指定了 key 值,那么依然走我们的自定义分区器,不走默认。

二、消费者分区策略

1、一个consumer group中有多个consumer组成,一个 topic有多个partition组成,现在的问题是,到底由哪个consumer来消费哪个partition的数据。

2、Kafka有四种主流的分区分配策略: Range、RoundRobin(轮询)、Sticky(粘性)、CooperativeSticky(配合的粘性)。

可以通过配置参数partition.assignment.strategy,修改分区的分配策略。默认策略是Range + CooperativeSticky

参数名称

描述

heartbeat.interval.ms

Kafka 消费者和 coordinator 之间的心跳时间,默认 3s。 该条目的值必须小于session.timeout.ms,也不应该高于 session.timeout.ms 的 1/3。

session.timeout.ms

Kafka 消费者和 coordinator 之间连接超时时间,默认 45s。超 过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是 5 分钟。超过该值,该 消费者被移除,消费者组执行再平衡

partition.assignment.strategy

消 费 者 分 区 分 配 策 略 , 默 认 策 略 是 Range +CooperativeSticky。Kafka 可以同时使用多个分区分配策略。

可 以 选 择 的 策 略 包 括 : Range 、 RoundRobin 、 Sticky 、CooperativeSticky

2.1 Range 以及再平衡

1)Range 分区策略原理

Range 是对每个 topic 而言的。

首先对同一个topic里面的分区按照序号进行排序,并对消费者按照字母顺序进行排序。

假如现在有7个分区,3个消费者,排序后的分区将会是0,1,2,3,4,5,6;消费者排序完之后将会是C0,C1,C2。

通过partitions数/consumer数来决定每个消费者应该消费几个分区。如果除不尽,那么前面几个消费者将会多

消费1个分区,

例如,7/3=2余1,除不尽,那么消费者C0便会多消费1个分区。8/3=2余2,除不尽,那么C0和C1分别多

消费一个。

注意:如果只是针对1个topic而言,C0消费者多消费1个分区影响不是很大。但是如果有N多个topic,那么针对每个 topic,消费者C0都将多消费1个分区,topic越多,C0消费的分区会比其他消费者明显多消费N个分区。

容易产生数据倾斜!

2)Range 分区分配策略案例

(1)修改主题 first 为 7 个分区。

bin/kafka-topics.sh --bootstrap-server bigdata01:9092 --alter --topic first --partitions 7

2)这样可以由三个消费者

CustomConsumer、CustomConsumer1、CustomConsumer2 组成消费者组,组名都为“test”, 同时启动 3 个消费者。

(3)启动生产者,发送 500 条消息,随机发送到不同的分区。

注意:分区数可以增加,但是不能减少。

一个主题,假如副本数想修改,是否可以直接修改?答案是不可以。

如果想修改,如何修改?制定计划,执行计划

Kafka 默认的分区分配策略就是 Range + CooperativeSticky,所以不需要修改策略。

默认是Range,但是在经过一次升级之后,会自动变为CooperativeSticky。这个是官方给出的解释。

默认的分配器是[RangeAssignor, CooperativeStickyAssignor],默认情况下将使用RangeAssignor,但允许通过一次滚动反弹升级到CooperativeStickyAssignor,该滚动反弹会将RangeAssignor从列表中删除。

(4)观看 3 个消费者分别消费哪些分区的数据。

假如消费情况和预想的不一样:

1、集群是否健康,比如某些kafka进程没启动

2、发送数据的时候7个分区没有使用完,因为它使用了粘性分区。如何让它发送给7个分区呢,代码中添加:

// 延迟一会会看到数据发往不同分区

Thread.sleep(20);

发现一个消费者消费了,5,6分区,一个消费了0,1,2分区,一个消费了3,4分区。

此时并没有修改分区策略,原因是默认是Range.

3Range 分区分配再平衡案例

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 3、4 号分区数据。

2 号消费者:消费到 5、6 号分区数据。

0号的数据,没人消费。

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、1、2、3 号分区数据。

2 号消费者:消费到 4、5、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照 range 方式分配。

2.2 RoundRobin(轮询) 以及再平衡

1)RoundRobin 分区策略原理

RoundRobin针对集群中所有Topic而言。

RoundRobin轮询分区策略,是把所有的partition和所有的consumer都列出来,然后按照hashcode进行排序,最后

通过轮询算法来分配partition给到各个消费者。

2RoundRobin 分区分配策略案例

(1)依次在 CustomConsumer、CustomConsumer1、CustomConsumer2 三个消费者代 码中修改分区分配策略为 RoundRobin。

轮询的类的全路径是:
org.apache.kafka.clients.consumer.RoundRobinAssignorA list of class names or class types, ordered by preference, of supported partition assignment strategies that the client will use to distribute partition ownership amongst consumer instances when group management is used. Available options are:org.apache.kafka.clients.consumer.RangeAssignor: Assigns partitions on a per-topic basis.
org.apache.kafka.clients.consumer.RoundRobinAssignor: Assigns partitions to consumers in a round-robin fashion.
org.apache.kafka.clients.consumer.StickyAssignor: Guarantees an assignment that is maximally balanced while preserving as many existing partition assignments as possible.
org.apache.kafka.clients.consumer.CooperativeStickyAssignor: Follows the same StickyAssignor logic, but allows for cooperative rebalancing.

3RoundRobin 分区分配再平衡案例

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5 号分区数据

2 号消费者:消费到 4、1 号分区数据

0 号消费者 以前对应的数据没有人消费

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 0、2、4、6 号分区数据

2 号消费者:消费到 1、3、5 号分区数据

说明:消费者 0 已经被踢出消费者组,所以重新按照 RoundRobin 方式分配。

2.3 Sticky 以及再平衡

粘性分区定义:可以理解为分配的结果带有“粘性的”。即在执行一次新的分配之前, 考虑上一次分配的结果,尽量少的调整分配的变动,可以节省大量的开销。 粘性分区是 Kafka 从 0.11.x 版本开始引入这种分配策略,首先会尽量均衡的放置分区 到消费者上面,在出现同一消费者组内消费者出现问题的时候,会尽量保持原有分配的分区不变化。

比如分区有 0 1 2 3 4 5 6

消费者有 c1 c2 c3

c1 消费 3个 c2 消费2个 c3 消费2个分区

跟以前不一样的是,c1 消费的3个分区是随机的,不是按照 0 1 2 这样的顺序来的。

1)需求

设置主题为 first,7 个分区;准备 3 个消费者,采用粘性分区策略,并进行消费,观察

消费分配情况。然后再停止其中一个消费者,再次观察消费分配情况。

2)步骤

(1)修改分区分配策略为粘性。

注意:3 个消费者都应该注释掉,之后重启 3 个消费者,如果出现报错,全部停止等

会再重启,或者修改为全新的消费者组。

// 修改分区分配策略
ArrayList<String> startegys = new ArrayList<>();
startegys.add("org.apache.kafka.clients.consumer.StickyAssignor");
properties.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, startegys);

(2)使用同样的生产者发送 500 条消息。

可以看到会尽量保持分区的个数近似划分分区。

3Sticky 分区分配再平衡案例

(1)停止掉 0 号消费者,快速重新发送消息观看结果(45s 以内,越快越好)。

1 号消费者:消费到 2、5、3 号分区数据。

2 号消费者:消费到 4、6 号分区数据。

0 号消费者的任务没人顶替它消费

说明:0 号消费者挂掉后,消费者组需要按照超时时间 45s 来判断它是否退出,所以需

要等待,时间到了 45s 后,判断它真的退出就会把任务分配给其他 broker 执行。

(2)再次重新发送消息观看结果(45s 以后)。

1 号消费者:消费到 2、3、5 号分区数据。

2 号消费者:消费到 0、1、4、6 号分区数据。

说明:消费者 0 已经被踢出消费者组,所以重新按照粘性方式分配。

2.4 CooperativeSticky 的解释【新的kafka中刚添加的策略】

在消费过程中,会根据消费的偏移量情况进行重新再平衡,也就是粘性分区,运行过程中还会根据消费的实际情况重新分配消费者,直到平衡为止。

好处是:负载均衡,不好的地方是:多次平衡浪费性能。

动态平衡,在消费过程中,实施再平衡,而不是定下来,等某个消费者退出再平衡。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/92616.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/92616.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

高频面试点:深入理解 TCP 三次握手与四次挥手

在网络通信的世界里,TCP(Transmission Control Protocol,传输控制协议)是确保数据可靠传输的基石。其中,三次握手建立连接、四次挥手断开连接的过程,更是 Java 秋招面试中的高频考点。今天,我们就深入剖析这两个关键过程,结合原理、代码示例与面试真题,帮你吃透知识点…

k8s-nfs实现创建sc的两种方式

法一&#xff1a;基于 官方 NFS CSI 插件 法二&#xff1a;基于 nfs-subdir-external-provisioner 法一 官方 NFS CSI 插件 大致步骤# 安装 NFS sudo apt update sudo apt install -y nfs-kernel-server # 创建共享目录 sudo mkdir -p /data/nfs sudo chmod 777 /data/nfs # 配…

n8n 入门指南:更适合跨境出海搞钱的AI智能体

如果你最近刷到 AI 圈的分享应该会发现——n8n 又火起来了。其实 n8n 早在 2020 年左右就被程序员玩过一波&#xff0c;当时很多人拿它做网站自动发邮件、消息转发之类的“流程自动化”。但那时候 AI 还没这么卷&#xff0c;大家也没觉得多有用。n8n为什么最近又翻红&#xff1…

【数据分享】各省农业土地流转率(2010-2023)

数据介绍土地流转是推动农业规模化、现代化发展的关键机制。为助力相关研究&#xff0c;现分享一份覆盖全国30个省级行政区、时间跨度为2010-2023年的农业土地流转率面板数据集。本数据直接提取自权威统计年报&#xff0c;具有较高的参考价值。一、数据概览覆盖范围&#xff1a…

音视频时间戳获取与同步原理详解

引言&#xff1a;为什么音视频同步如此重要&#xff1f; 在音视频技术领域&#xff0c;"同步"是决定用户体验的核心要素。想象一下观看电影时画面与声音错位0.5秒的场景&#xff1a;角色说话时嘴唇动作与声音不匹配&#xff0c;爆炸场景的视觉冲击先于音效到达——这…

Day38--动态规划--322. 零钱兑换,279. 完全平方数,139. 单词拆分,56. 携带矿石资源(卡码网),背包问题总结

Day38–动态规划–322. 零钱兑换&#xff0c;279. 完全平方数&#xff0c;139. 单词拆分&#xff0c;56. 携带矿石资源&#xff08;卡码网&#xff09;&#xff0c;背包问题总结 今天的是几道经典的“完全背包”题目。前两道题目&#xff0c;要区分求的是“价值”&#xff0c;还…

应用层Http协议(1)

应用层Http协议&#xff08;1&#xff09; 在互联网世界中&#xff0c;HTTP&#xff08;HyperText Transfer Protocol&#xff0c;超文本传输协议&#xff09;是一个至关重要的协议。它定义了客户端&#xff08;如浏览器&#xff09;与服务器之间如何通信&#xff0c;以交换或传…

elementui input无法输入问题

背景。开发小程序。自定义表单在pc段设置好input输入框属性后。 在小程序端无法输入原因&#xff1a;长度受限制&#xff0c;导致input组件的maxlength属性认为长度是0导致无法输入任何值。看解释是应为遇到空字符串等情况会设置为0解决。因为未找到设置maxlength为0处&#xf…

算法_python_学习记录_02

算法学习和视频学习过程中&#xff0c;有许多前几天还不知道的知识点&#xff0c;现在一点一点归纳整理出来&#xff0c;稳步前进&#xff0c;前进~ 20_贪心算法系列题 00_参考文档 详解贪心算法&#xff08;Python实现贪心算法典型例题&#xff09;_顺序贪婪算法-CSDN博客P…

Meta AI水印计划的致命缺陷——IEEE Spectrum深度文献精读

一、原文信息 标题: Metas AI Watermarking Plan Is Flimsy, at Best 中文译名: Meta的AI水印计划脆弱不堪 作者: David Evan Harris(加州大学伯克利分校)、Lawrence Norden(纽约大学法学院) 发表日期: 2024年3月5日 发表期刊: IEEE Spectrum 二、原文全文翻译 Met…

gpt-oss 全量技术解读

一、概述 gpt-oss 是 OpenAI 发布的开放权重&#xff08;open-weight&#xff09;模型系列&#xff0c;面向强推理、Agent 能力与多样化应用场景。 提供两种规格&#xff1a; gpt-oss-120b&#xff1a;面向生产与高推理需求&#xff0c;单卡 80GB GPU&#xff08;如 NVIDIA …

实现EtherNet/IP网络与Modbus TCP网络之间数据互通

硬件连接与配置使用工业以太网网关&#xff08;如ENE-350&#xff09;作为桥接设备&#xff0c;通过以太网交换机实现硬件互联。 网关需根据应用场景配置为EtherNet/IP从站或Modbus TCP主/从站模式。案例1&#xff1a;EtherNet IP主站PLC和Modbus TCP主站PLC的互联网关配置&…

zookeeper因jute.maxbuffer启动异常问题排查处理

#作者&#xff1a;程宏斌 文章目录一、前言二、问题描述三、定位过程四、问题根因五、解决方案根本解决方案应急处理方案调大参数可能出现的问题六、总结为什么超出会报错官方对于jute.maxbuffer的解释注意事项官方建议一、前言 在分布式系统中&#xff0c;ZooKeeper作为关键的…

Java基础十三: List

目录 1.Java LinkedList 的高级应用与示例 1.1 LinkedList的基本使用 基本操作示例 1.2 LinkedList独有的方法 特定方法示例 1.3 队列模式&#xff08;先进先出&#xff09; 队列模式示例 1.4 栈模式&#xff08;先进后出&#xff09; 栈模式示例 总结 2.Java Vecto…

[机器学习]03-基于核密度估计(KDE)的鸢尾花数据集分类

关键点&#xff1a;使用核密度估计&#xff08;KDE&#xff09; 估计类别条件概率密度&#xff08;高斯核&#xff0c;带宽0.2&#xff09;采用最大后验概率&#xff08;MAP&#xff09; 决策准则进行分类程序代码&#xff1a;import random import matplotlib from sklearn.ne…

jmeter怎么实现多个请求真正的同时发送

1.首先在插件管理器Plugins Manager中搜索插件Parallel Controller&Sampler&#xff0c;勾选上对应的插件后&#xff0c;在右下角点击Apply Changes and Restart JMeter&#xff0c;安装插件2.插件安装完毕后&#xff0c;然后在线程组上面右击&#xff0c;点击添加--逻辑控…

复杂环境下车牌识别准确率↑29%:陌讯动态特征融合算法实战解析

原创声明本文为原创技术解析&#xff0c;核心技术参数与架构设计引用自《陌讯技术白皮书》&#xff0c;转载需注明来源。一、行业痛点&#xff1a;车牌识别的现实挑战在智慧交通、停车场管理等场景中&#xff0c;车牌识别作为关键技术环节&#xff0c;长期面临多重环境干扰。据…

Express中间件和路由及响应方法

1.中间件分类 应用程序级别中间件 通过 app.use() 或 app.METHOD()&#xff08;如 app.get&#xff09;绑定的中间件&#xff0c;作用于整个应用程序。例如 记录请求日志、解析请求体等全局功能。例如&#xff1a; app.use((req, res, next) > {console.log(Request URL:…

Dokcer创建中间件环境

简而言之&#xff0c;用docker来搞中间件环境比价好使&#xff0c;不用关心各种环境了 rabbitmqsudo docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \rabbitmq:3.8-managementredis 5.0.3 docker start my-redisdocker run --name my-redis -d -p 6379:6379 \…

Linux高级编程-文件操作

1.Linux下的文件类型7种文件类型&#xff1a;b 块设备文件 -------> 存储类设备&#xff08;硬盘&#xff09; c 字符设备文件 ------->如输入输出设备&#xff08;鼠标键盘显示器...&#xff09; d 目录文件 ------->文件夹 - 普通文件 -------&g…