Kafka如何保证「消息不丢失」,「顺序传输」,「不重复消费」,以及为什么会发生重平衡(reblanace)

前言

上一篇文章总结了kafka为什么快,下面来总结一下,kafka高频的常见的问题。内容有点多,全部看完需要有一定的耐心。

kafka如何保证消息不丢失

Producer端

要保证消息不丢失,第一点要做的就是要保证消息从producer端发送到了kafka的broker中,并且broker把消息保存了下来。
由于在发送消息的过程中有可能会发生网络故障,broker故障等原因导致消息发送失败,因此在producer端有两种方式来避免消息丢失。

接收发送消息回执

我们在使用kafka发送消息的时候,通常是使用producer.send(msg)方法,但是这个方法其实是一种异步发送,调用此方法发送消息的时候,虽然会立即返回,但是并不代表消息真的发送成功了。
1、所以可以使用同步发送消息,producer.send(msg).get()此方法会执行同步发生消息,并等待结果返回
2、也可以使用带回调函数的异步方法,producer.send(msg,callback),用回调函数来监听消息的发送结果,如果发送失败了,可以在回调函数里面进行重试。

producer参数配置

producer也提供了一些配置参数来避免消息丢失。

// 此配置表示,Leader和Follower全部成功接收消息后才确认收到消息,
// 可以最大限度保证消息不丢失,但是吞吐量会下降
acks = -1 
// producer 发送消息失败后,自动重试次数
retries = 3
// 发送消息失败后的重试时间间隔
retry.backoff.ms = 300
Broker端

当消息发送到broker后,broker需要保证此消息不会丢失,我们都知道,kafka是会将消息持久化到磁盘中的。
但是kafka为了保持性能采用了,页缓存+异步刷盘的形式将消息持久化到磁盘的。也就是批量定时将消息持久化到磁盘。
但是页缓存如果还没来的及将消息刷到磁盘,broker就挂了,还是会有消息丢失的风险,因此kafka又提供了partition的ISR(同步副本机制),即每一个patrtition都会有一个唯一的Leader和一到多个Follower,Leader专门处理一些事务类型的请求,Follower负责同步Leader的数据。当leader挂了后,会重新从Follower中选举出新的Leader,保证消息能够最终持久化。
另外,在producer中的配置参数acks,配置不同的值,broker也是会做不同的处理的。

acks=0:表示Producer请求立即返回,不需要等待Leader的任何确认。这种方案有最高的吞吐率,但是不保证消息是否真的发送成功。
acks =-1: 表示分区Leader必须等待消息被成功写入到所有的ISR副本(同步副本)中才认为Producer请求成功。这种方案提供最高的消息持久性保证,但是理论上吞吐率也是最差的。
acks=1: 表示Leader副本必须应答此Producer请求并写入消息到本地日志,之后Producer请求被认为成功。如果此时Leader副本应答请求之后挂掉了,消息会丢失。这个方案,提供了不错的持久性保证和吞吐。

producer中还有一些参数的配置也是会起到避免消息丢失的作用

//表示分区副本的个数,replication.factor>1,当Leader挂了,follower会被选举为leader继续提供服务
replication.factor=2//表示 ISR 最少的副本数量,通常设置 replication.factormin.insync.replicas>1,这样才有可用的follower副本执行替换,保证消息不丢失
replication.factormin.insync.replicas=2//是否可以把非ISR集合中的副本选举为leader
min.inunclean.leader.election.enable= false
Consumer端

Consumer端,只要保证消息接收到不胡乱的提交offset就行,kafka本身也是会记录每个pratition的偏移量,但是为了业务的可靠性,也可以自己存储一份offset,防止由于业务代码的问题导致消息没有处理就提交的offset,有自己存储才这一份offset就可以对偏移量进行一个回拨。

为了避免消息丢失,建议使用手动提交偏移量的方式,防止消息的业务逻辑未处理完,提交偏移量后消费者挂了的问题。

enable.auto.commit=false

kafka如何保证消息的顺序传输

我们知道,kafka的消息实际是存在某个topic的partition中的,一个topic有多个partition分区,同一个partition中的消息是有序的,跨partition的消息是无序的。
这个是怎么实现的呢?
因为我们在【Kafka为什么吞吐量大,速度快?】这篇文章里面总结了,kafka写入磁盘时是顺序写的,并且会被分配一个唯一的offset,所以同一个partition保存的数据都是有序的。而在读取消息时,消费者会从该分区最早的offset开始,依次读取消息,保证了消息顺序消费。

具体实现顺序发送消息有两种方式:
1、在使用kafka时,对需要保证顺序消费的topic,只创建一个partition,这样消息就都会顺序的存储到这一个partition中,也就能保证顺序消费了。
2、当一个topic有多个partition时,对需要保证顺序的消息,都发到指定的partition即可,这样也能保证顺序消费。

注:需要注意一点,虽然发送时保证了顺序,也都写到了同一个partition中,但在消费端,也要保证顺序消费,即单线程处理消息。

目前kafka4.0,可以允许一个consumer group下的多个消费者同时消费同一个partition了。
其借助新推出的共享组(Shared Group)特性来达成这一功能,且支持逐条消息确认,从而让消费模式更具灵活性,还能助力提升吞吐量。以往版本默认仅允许一个消费者组内单个消费者消费一个特定分区,当消费者多于分区时,多余消费者会闲置,共享组则可避免出现该类资源浪费情况。

将消息发到指定partition中也有几种方式。
1、发送消息,组装producerRecord时,指定partition

// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要发送消息的topic
String topic ="jimer_topic";
// 发送的消息内容
String message =Hello World!";
// 指定发送消息的分区
int partition =0;// 创建包含分区信息的ProducerRecord
ProducerRecordProducerRecord<String,String> record = new ProducerRecord<>(topic, partition, message);
// 发送消息
producer.send(record);
//关闭Kafka生产者
producer.close();

2、指定消息的key,保证相同key的消息发送到同一个partition

// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(getProperties());
// 指定要发送消息的topic
String topic ="jimer_topic";
// 发送的消息内容
String message =Hello World";
// 指定发送消息的key
String msg_key = "order_msg";// 创建包含消息key的producerRecord
ProducerRecordProducerRecord<String,String> record = 
new ProducerRecord<>(topic, null,msg_key, message);
// 发送消息
producer.send(record);
//关闭Kafka生产者
producer.close();

3、自定义Partitioner
除了上面两种方式外,还可以自定义指定分区的方式。通过实现Partitioner这个接口,具体实现partition方法,就可以了。具体使用的时候,在初始化Producer时,指定具体的partition实现类即可。
例如:

public class MyPartitioner implatents Partitioner{@Override
public void configure(Map<String,?> configs){// 可以在这里处理和获取分区器的配置参数
}
@0verride
public int partition(String topic, Object key, byte[] keyBytes, 
Object value,byte[] valueBytes,Cluster cluster){int partition =  int ss = new Random().nextInt(2);// 返回分区编号return partition;
}
@0verride
public void close(){// 可以在这里进行一些清理操作
}

使用时

Properties propsProducer = new Properties();propsProducer.put("acks", "all"); // 全部ISR列表中的副本接收成功后返回propsProducer.put("retries", 3);//失败时重试次数propsProducer.put("partitioner.class", "com.jimoer.MyPartitioner"); // 指定自定义分区器类
// 创建Kafka生产者
Producer<String, String> producer = new KafkaProducer<>(propsProducer);

kafka如何保证消息不重复消费

什么情况下会导致消息被重复消费呢?

1、生产者,生产者可能重复推送了一条消息到kafka,例如:某接口未做幂等处理,接口中会发送kafka消息。
2、kafka,在消费者消费完消息后,提交offset时,kafka突然挂了,导致kafka认为此消息还未消费,又重新推送了该条消息,导致了重复消费消息。
3、消费者,在消费者消费完消息后,提交offset时,Consumer突然宕机挂掉,这个时候,kafka未接收到已处理的offset值,当Consumer恢复后,会重新消费此部分消息。
4、还有一种情况,Kafka 存在 Partition Balance 机制,会将多个 Partition 均衡分配给多个消费者。若 Consumer 在默认 5 分钟内未处理完一批消息,会触发 Rebalance 机制,导致 offset 自动提交失败,重新 Rebalance 后,消费者会从之前未提交的 offset 位置开始消费,从而造成消息重复消费。

那么我们该如何防止消息被重复消费呢

其实上面的1、2、3、4这些情况都可以用幂等机制来防止消息被重复消费。为消息生成 一个唯一标识,并保存到 mysql 或 redis 中,处理消息前先到 mysql 或 redis 中判断该消息是否已被消费过。

但是第4种情况,前提是要先优化消费端处理性能,避免触发 Rebalance,例如:采用异步方式处理消息、缩短单个消息消费时长、调整消息处理超时时间、减少一次性从 Broker 拉取的数据条数等。

kafka什么情况下会发生reblanace(重平衡)

Kafka 的重平衡(Rebalance)是指消费者组(Consumer Group)内的消费者与分区(Partition)之间的分配关系发生重新调整的过程
主要有以下几种情况会触发:
1、消费者组成员数量发生变化。((新消费者的加入或者退出)
2、订阅主题(Topic)数量发生变化。
3、订阅主题的分区(Partition)数发生变化。

还有某些异常情况也会触发Rebalance:
1、消费端处理消息超时,上面我们说到过,若消费者未在设定时间内处理完消息,消费者组会认为当前消费者有问题了,会触发Rebalance,重新分配消息。又或者当前消费者挂了,也是一样会触发Rebalance。
2、组协调器(Group Coordinator)是 Kafka 负责管理消费者组的 Broker 节点。如果它崩溃或者发生故障。Kafka 需要重新选举新的 Group Coordinator ,并进行重平衡。

当Kafka 集群要触发重平衡机制时,大致的步骤如下:
1.暂停消费:在重平衡开始之前,Kafka 会暂停所有消费者的拉取操作,以确保不会出现重平衡期间的消息丢失或重复消费。
2.计算分区分配方案:Kafka 集群会根据当前消费者组的消费者数量和主题分区数量,计算出每个消费者应该分配的分区列表,以实现分区的负载均衡。
3.通知消费者:一旦分区分配方案确定,Kafka 集群会将分配方案发送给每个消费者,告诉它们需要消费的分区列表,并请求它们重新加入消费者组。
4.重新分配分区:在消费者重新加入消费者组后,Kafka 集群会将分区分配方案应用到实际的分区分配中,重新分配主题分区给各个消费者。
5.恢复消费:最后,Kafka 会恢复所有消费者的拉取操作,允许它们消费分配给自己的分区。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/94466.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/94466.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

原子操作汇编实现:原理、流程与代码解析

&#x1f52c; 原子操作汇编实现&#xff1a;原理、流程与代码解析 引用&#xff1a;VC/C Intel x86 内联汇编实现 “Interlocked” 原子变量各种操作 &#x1f31f; 引言&#xff1a;原子操作的重要性 在多线程编程中&#xff0c;原子操作是确保数据一致性的关键机制。本文…

【WRF理论第十九期】内陆湖泊、水体的处理方式

目录 WRF 模型中湖泊模拟概述 湖泊模型(Lake Model)集成 新增湖泊数据支持(如 WUDAPT + MODIS) LAKE_DEPTH Noah-MP + 湖泊模型联合使用 namelist.input 配置说明 WRF 代码更新 参考 论坛-WRF 湖泊模型(WRF-Lake model)与 SST 更新 WRF 模型中湖泊模拟概述 湖泊模型(La…

【渗透测试】SQLmap实战:一键获取MySQL数据库权限

注&#xff1a;所有技术仅用于合法安全测试与防御研究&#xff0c;未经授权的攻击行为属违法犯罪&#xff0c;将承担法律责任。一、SQLmap常规用法注意存放路径&#xff1a;C:\Users\neo\AppData\Local\sqlmap\output1、列出详细过程和数据库列表sqlmap -u http://192.168.61.2…

LeetCode 第464场周赛 第三天

1. 3658 奇数和与偶数和的最大公约数&#xff08;欧几里得&#xff09; 链接&#xff1a;题目链接 题解&#xff1a; 题解时间复杂度O(logmin(a, b))&#xff1a; 获得前n个奇、偶数的总和&#xff0c;由于数列为等差数列&#xff0c;等差数列和公式&#xff1a;(a1 an) * n …

IntelliJ IDEA 集成 ApiFox 操作与注解规范指南

一、IDEA装入Apifox 1.安装Apifox Helper 说明:在 IntelliJ IDEA 中安装 ApiFox Helper 插件。 2.打开Apifox 说明:点击 设置,在菜单中选择 API访问令牌。在弹出的窗口中输入任意名称,并选择令牌的有效期(为了方便,我这里选择了 无期限)。生成令牌后,由于 令牌只能复…

C++---双指针

在C编程中&#xff0c;双指针算法是一种高效的解题思路&#xff0c;其核心是通过设置两个指针&#xff08;或索引&#xff09;遍历数据结构&#xff08;如数组、链表、字符串等&#xff09;&#xff0c;利用指针的移动规则减少无效操作&#xff0c;从而将时间复杂度从暴力解法的…

【LLM】GLM-4.5模型架构和原理

note 文章目录note一、GLM-4.5模型二、Slime RL强化学习训练架构Reference一、GLM-4.5模型 大模型进展&#xff0c;GLM-4.5技术报告,https://arxiv.org/pdf/2508.06471&#xff0c;https://github.com/zai-org/GLM-4.5&#xff0c;包括GLM-4.5&#xff08;355B总参数&#xff…

LLM 中增量解码与模型推理解读

在【LLM】LLM 中 token 简介与 bert 实操解读一文中对 LLM 基础定义进行了介绍&#xff0c;本文会对 LLM 中增量解码与模型推理进行解读。 一、LLM 中增量解码定义 增量解码&#xff08;Incremental Decoding&#xff09;是指在自回归文本生成过程中&#xff0c;模型每次只计…

1.Spring Boot:超越配置地狱,重塑Java开发体验

目录 一、Spring框架&#xff1a;伟大的基石 历史背景与挑战 Spring的革命性贡献 新的挑战&#xff1a;配置地狱 二、Spring Boot&#xff1a;约定大于配置的革命 四大核心特性 1. 快速创建独立应用 2. 自动配置&#xff1a;智能化的魔法 3. 起步依赖&#xff1a;依赖管…

assert使用方法

assert 是 Python 中用来进行 调试 和 验证 的一个关键字&#xff0c;它用于测试一个 条件表达式 是否为真。如果条件为假&#xff0c;assert 会抛出一个 AssertionError 异常&#xff0c;通常带有错误信息。语法&#xff1a;assert condition, "Error message"condi…

【实习总结】快速上手Git:关键命令整理

目录 git的四大工作区域 git首次配置 克隆远程仓库 提交代码到远程仓库 查看文件状态&#xff08;可选&#xff09; 添加文件到暂存区 将暂存区的内容提交到本地仓库 将本地的提交上传到远程仓库 拉取并合并代码 第一种方式 第二种方式 分支管理 查看与创建分支 …

02-开发环境搭建与工具链

第2课&#xff1a;开发环境搭建与工具链 &#x1f4da; 课程目标 掌握DevEco Studio的下载、安装和配置熟悉HMS Core&#xff08;华为移动服务&#xff09;的使用了解鸿蒙模拟器与真机调试环境掌握必备开发工具的使用 &#x1f6e0;️ DevEco Studio环境搭建 2.1 下载与安装…

删掉一个元素以后全为1的最长子数组-滑动窗口

1493. 删掉一个元素以后全为 1 的最长子数组 - 力扣&#xff08;LeetCode&#xff09; Solution #include<iostream> #include<vector> using namespace std;class Solution { public://滑动窗口//动态维护一个窗口&#xff0c;窗口内只能有1个0&#xff0c;记录窗…

【计算机网络 | 第8篇】编码与调制

文章目录通信系统中的编码与调制&#xff1a;从信道基础到信号传输技术一、信道与通信电路&#x1f342;二、三种基本通信方式&#x1f4d6;1. 单向通信&#xff08;单工通信&#xff09;2. 双向交替通信&#xff08;半双工通信&#xff09;3. 双向同时通信&#xff08;全双工通…

当AI遇上终端:Gemini CLI的技术魔法与架构奥秘

"代码不仅仅是指令的集合&#xff0c;更是思想的载体。当AI与终端相遇&#xff0c;会碰撞出怎样的火花&#xff1f;" 在这个AI技术日新月异的时代&#xff0c;Google推出的Gemini CLI无疑是一颗璀璨的明星。它不仅仅是一个命令行工具&#xff0c;更是一个将人工智能无…

ViLU: Learning Vision-Language Uncertainties for Failure Prediction

研究方向&#xff1a;Image Captioning1. 论文介绍本文提出ViLU&#xff08;Vision-Language Uncertainties&#xff09;&#xff0c;一个用于学习视觉语言不确定性量化&#xff08;UQ&#xff09;和检测视觉语言模型故障的事后框架。使用VLMs进行量化&#xff08;UQ&#xff0…

数据集笔记:百度地图高德地图坐标互转

1 为什么会有高德坐标系和百度坐标系&#xff1f;根据《测绘法》和国家保密法规&#xff0c;在中国大陆范围内的地理坐标数据必须做加密处理&#xff0c;不允许直接使用 WGS84&#xff08;openstreetmap&#xff09;所以出现了GCJ-02 和 BD-09高德、腾讯、谷歌中国都遵循 GCJ-0…

SkyWalking高效线程上下文管理机制:确保调用链中traceId来自同一个请求

SkyWalking Agent 能确保获取到“正确”的 traceId,其核心在于它建立并维护了一套高效的线程上下文管理机制。这套机制确保了即使在复杂的多线程、异步环境下,也能将正确的上下文(包含 traceId)与当前正在执行的代码逻辑关联起来。 其工作原理可以概括为下图所示的流程: …

Kafka-Eagle安装

目录Eagle环境安装Mysql环境准备Kafka环境准备Eagle安装Kafka-Eagle框架可以监控Kafka集群的整体运行情况&#xff0c;在生产环境中经常使用 Eagle环境安装 Mysql环境准备 Eagle的安装依赖于Mysql&#xff0c;Mysql主要用来存储可视化展示的数据 将mysql文件夹及里面所有内…

Matlab系列(005) 一 归一化

目录1、前言2、什么是归一化&#xff1f;3、为什么要进行归一化4、归一化方法详解与Matlab实现5、总结1、前言 ​   归一化技术是数据预处理的核心环节&#xff0c;本文将深度解析主流归一化方法&#xff0c;提供可复现Matlab代码&#xff0c;并探讨其在各领域中的应用场景。…