kafka--基础知识点--9.1--consumer 至多一次、至少一次、精确一次

1 自动提交

1.1 原理:

Kafka 消费者后台线程每隔 auto.commit.interval.ms 自动提交最近一次 poll() 的 offset
无需开发者干预

1.2 示例:

enable.auto.commit=true
auto.commit.interval.ms=5000 # 每 5 秒自动提交一次

from confluent_kafka import Consumer, KafkaError
import sys# 配置消费者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': True,   # 自动提交'auto.commit.interval.ms': 5000  # 每个5s自动提交一次
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# consumer.commit(msg)  不需要这行,这是手动提交时需要用的# 业务处理逻辑print(f"处理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中断信号已接收")finally:consumer.close()

2 手动提交

2.1 至多一次

2.1.1 原理

消息处理后立即提交偏移量;
即使处理失败也不会重试;
适合对消息丢失容忍度高的场景(如日志采集)。

2.1.2 示例:

enable.auto.commit=False:禁用自动提交偏移量
手动调用consumer.commit(msg)提交偏移量
auto.offset.reset=‘earliest’:从最早消息开始消费

from confluent_kafka import Consumer, KafkaError
import sys# 配置消费者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'mygroup','auto.offset.reset': 'earliest','enable.auto.commit': False
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])while True:msg = consumer.poll(1.0)if msg is None:continueif msg.error():if msg.error().code() == KafkaError._PARTITION_EOF:continueprint(f"Error: {msg.error()}", file=sys.stderr)breaktry:# 手动提交偏移量(最多一次核心)consumer.commit(msg)print(f"已提交偏移量: {msg.offset()}")# 业务处理逻辑print(f"处理消息: {msg.value().decode('utf-8')}")except KeyboardInterrupt:print("中断信号已接收")finally:consumer.close()

2.2 最少一次

2.2.1 原理

通过重试机制+手动提交偏移量实现:

  • 消息处理失败时自动重试(最多3次)
  • 成功处理后批量提交偏移量
  • 延长轮询间隔避免重平衡

2.2.2 示例

该示例是批量处理消息,批量提交;当然也可以一次处理一条消息,并一次提交一条消息偏移量

from confluent_kafka import Consumer, KafkaError, TopicPartition
import time
import sys# 配置消费者
conf = {'bootstrap.servers': 'localhost:9092','group.id': 'my_group','auto.offset.reset': 'earliest','enable.auto.commit': False,  # 手动提交控制'max.poll.interval.ms': 300000,  # 延长轮询间隔'session.timeout.ms': 10000,'heartbeat.interval.ms': 3000
}consumer = Consumer(conf)
consumer.subscribe(['my_topic'])def process_with_retry(msg):"""带重试的消息处理"""for attempt in range(3):try:# 替换为实际业务逻辑print(f"处理消息: {msg.value().decode('utf-8')}")return Trueexcept Exception:time.sleep(1)  # 指数退避可在此实现return Falsetry:while True:# 批量拉取10条消息msgs = consumer.consume(num_messages=10, timeout=1.0)processed = []for msg in msgs:if msg.error():continue# 处理消息(带重试)if process_with_retry(msg):processed.append(TopicPartition(msg.topic(), msg.partition(), msg.offset()))# 批量提交偏移量if processed:consumer.commit(offsets=processed)print(f"已提交偏移量: {[p.offset for p in processed]}")except KeyboardInterrupt:pass
finally:consumer.close()

补充:延长轮询间隔避免重平衡

核心概念解析:

  • 轮询间隔:指Kafka消费者两次调用poll()方法拉取消息的时间间隔,由max.poll.interval.ms参数控制(默认5分钟)。
  • 重平衡(Rebalance):当消费者组成员变动、主题/分区变化或心跳超时时,Kafka会触发分区重新分配,导致消费者暂停消费。

为什么延长轮询间隔能避免重平衡?

  • 防止误判消费者宕机
    • Kafka通过session.timeout.ms(默认10秒)和heartbeat.interval.ms(默认3秒)检测消费者存活。
    • 若消息处理耗时超过max.poll.interval.ms(默认5分钟),Kafka会认为消费者已宕机,触发重平衡。
    • 延长轮询间隔(如设为10分钟)可允许更长的消息处理时间,避免因业务逻辑耗时过长导致的误判重平衡。
  • 避免频繁重平衡的连锁反应
    • 重平衡期间消费者暂停消费,导致消息堆积和延迟。
    • 频繁重平衡(如每5分钟触发一次)会显著降低吞吐量,延长端到端延迟。

关键参数配置建议:

参数默认值推荐值作用
max.poll.interval.ms5分钟根据业务处理时间调整(如10-30分钟)控制两次poll的最大间隔,防止处理超时触发重平衡
session.timeout.ms10秒30秒-1分钟心跳超时时间,需大于heartbeat.interval.ms
heartbeat.interval.ms3秒2-5秒心跳发送频率,建议设为session.timeout.ms的1/3

2.3 精确一次

2.3.1 幂等性消费 (Idempotent Consumption) - 推荐且最常用

思路:承认消息可能会被重复传递,但从业务逻辑上保证重复执行不会产生负面效果。

做法:在消费者的处理逻辑中,设计幂等性。例如:

为每条消息生成一个唯一 ID(可以是消息key,或自定义UUID)。

在处理前,先检查这个 ID 是否已经被处理过(比如在数据库里查一下)。

如果已处理,就直接跳过并提交位移(视为成功);如果未处理,则执行业务逻辑。

这是最有效、最通用的方法,因为它不依赖于任何特定技术,而是从业务设计上根本性地解决问题。

例如:
a) 对于流程中的消息,每条消息中包含唯一id,比如业务id,在数据库中将业务id作为Unique key,插入重复时会报duplicate key异常,不会导致数据库中出现脏数据
b) Redis中使用set存储业务id,天然幂等性
c) 如果不是上面两个场景,需要让生产者发送每条数据的时候,里面加一个全局唯一的 id,然后你这里消费到了之后,先根据这个 id 去比如 Redis 里查一下消费过吗?如果没有消费过,就执行相应业务进行处理,然后这个 id 写 Redis,最后提交偏移。如果消费过了,那如果消费过了,那就别处理了,保证不重复处理相同的消息即可

2.3.2 事务性输出 (Transactional Output) / 两阶段提交 (2PC) - 复杂且受限

思路:将消费者的“业务处理”和“位移提交”绑定为一个分布式事务。

做法:例如,使用 Kafka 的事务性生产者,将处理结果和位移提交到外部系统(如另一个Kafka主题)的操作放在一个事务里。但这通常需要外部系统(如数据库)也支持参与 Kafka 事务(通过 Kafka Connect),实现复杂度非常高,性能和可用性也会受影响。不推荐普通应用使用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/98244.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/98244.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Python中的类:从入门到实战,掌握面向对象编程的核心

目录 一、类的概念:从“模板”到“个体” 1.1 什么是类? 1.2 类与对象的关系:模板与实例 1.3 类的核心价值:封装与抽象 二、类的形式:Python中的类定义语法 2.1 类的基本定义 2.2 关键组成解析 (1&a…

用户争夺与智能管理:定制开发开源AI智能名片S2B2C商城小程序的战略价值与实践路径

摘要 在零售行业数字化转型的浪潮中,用户争夺已从传统流量竞争转向对用户24小时时间分配权的深度渗透。本文以定制开发开源AI智能名片S2B2C商城小程序为核心研究对象,系统探讨其通过技术赋能重构用户接触场景、提升转化效率、增强会员黏性的作用机制。结…

数学_向量投影相关

Part 1 你的问题是:设相机光心的朝向 w (0, 0, 1)(即朝向正前方,Z 轴正方向), 在 相机坐标系下有一个平面,其法向量为 n_cam, 问:w 在该平面上的投影的单位向量 w_p,是不…

从RTSP到HLS:构建一个简单的流媒体转换服务(java spring)

从RTSP到HLS:构建一个简单的流媒体转换服务(java spring) 在当今的网络环境中,实时视频流媒体应用越来越广泛,从在线直播到安防监控,都离不开流媒体技术的支持。然而,不同的流媒体协议有着各自的特点和适用场景。本文…

【代码随想录算法训练营——Day15】二叉树——110.平衡二叉树、257.二叉树的所有路径、404.左叶子之和、222.完全二叉树的节点个数

LeetCode题目链接 https://leetcode.cn/problems/balanced-binary-tree/ https://leetcode.cn/problems/binary-tree-paths/ https://leetcode.cn/problems/sum-of-left-leaves/ https://leetcode.cn/problems/count-complete-tree-nodes/ 题解 110.平衡二叉树想到用左子树的高…

JVM新生代/老年代垃圾回收器、内存分配与回收策略

新生代垃圾收集器 1. Serial收集器 serial收集器即串行收集器,是一个单线程收集器。 串行收集器在进行垃圾回收时只使用一个CPU或一条收集线程去完成垃圾回收工作,并且会暂停其他的工作线程(stop the world),直至回收完…

Unity Mirror 多人同步 基础教程

Unity Mirror 多人同步 基础教程MirrorNetworkManager(网络管理器)Configuration:配置Auto-Start Options:自动启动Scene Management:场景管理Network Info:网络信息Authentication:身份验证Pla…

基于红尾鹰优化的LSTM深度学习网络模型(RTH-LSTM)的一维时间序列预测算法matlab仿真

目录 1.程序功能描述 2.测试软件版本以及运行结果展示 3.部分程序 4.算法理论概述 5.完整程序 1.程序功能描述 红尾鹰优化的LSTM(RTH-LSTM)算法,是将红尾鹰优化算法(Red-Tailed Hawk Optimization, RTHO)与长短期…

深度学习“调参”黑话手册:学习率、Batch Size、Epoch都是啥?

点击 “AladdinEdu,同学们用得起的【H卡】算力平台”,注册即送-H卡级别算力,80G大显存,按量计费,灵活弹性,顶级配置,学生更享专属优惠。 引言:从"炼丹"到科学,…

【网络实验】-MUX-VLAN

实验拓扑实验要求: 在企业网络中,企业员工和企业客户可以访问企业的服务器,对于企业来说,希望员工之间可以互相交流,但是企业用户之间相互隔离,不能够访问。为了实现所有用户都可以访问企业服务器&#xff…

Java泛型:类型安全的艺术与实践指南

Java泛型&#xff1a;类型安全的艺术与实践指南 前言&#xff1a;一个常见的编译错误 最近在开发中遇到了这样一个编译错误&#xff1a; Required type: Callable<Object> Provided: SalesPitchTask这个看似简单的错误背后&#xff0c;隐藏着Java泛型设计的深层哲学。今天…

UMI企业智脑 2.1.0:智能营销新引擎,图文矩阵引领内容创作新潮流

在数字营销日益激烈的今天&#xff0c;企业如何在信息洪流中脱颖而出&#xff1f;UMI企业智脑 2.1.0 的发布为企业提供了全新的解决方案。这款智能营销工具结合了先进的AI技术与数据驱动策略&#xff0c;帮助企业优化营销流程、提升效率&#xff0c;并通过图文矩阵实现内容创作…

Lustre Ceph GlusterFS NAS 需要挂载在k8s容器上,数据量少,选择哪一个存储较好

在 K8s 容器环境中&#xff0c;数据量 不大的 规模下&#xff0c;Lustre、Ceph、GlusterFS 和 NAS 的选择需结合性能需求、运维成本、扩展性和K8s 适配性综合判断。以下是针对性分析及推荐&#xff1a;一、核心对比与适用场景二、关键决策因素1. 性能需求高并发 / 高吞吐&#…

深入解析 Apache Doris 写入原理:一条数据的“落地之旅”

在日常的数据分析场景中&#xff0c;我们经常会向 Apache Doris 写入大量数据&#xff0c;无论是实时导入、批量导入&#xff0c;还是通过流式写入。但你是否想过&#xff1a;一条数据从客户端发出&#xff0c;到最终稳定落盘&#xff0c;中间到底经历了哪些步骤&#xff1f; …

基于MATLAB的视频动态目标跟踪检测实现方案

一、系统架构设计 视频动态目标跟踪系统包含以下核心模块&#xff1a; 视频输入模块&#xff1a;支持摄像头实时采集或视频文件读取预处理模块&#xff1a;灰度转换、降噪、光照补偿目标检测模块&#xff1a;背景建模、运动区域提取跟踪算法模块&#xff1a;卡尔曼滤波、粒子滤…

【Python】Python文件操作

Python文件操作 文章目录Python文件操作[toc]1.文件的编码2.文件打开、读取&#xff08;r模式&#xff09;、关闭3.文件的写入&#xff08;w模式&#xff09;4.文件的追加写入&#xff08;a模式&#xff09;5.综合案例1.文件的编码 意义&#xff1a;计算机只能识别0和1&#x…

CES Asia的“五年计划”:打造与北美展比肩的科技影响力

在全球科技产业版图中&#xff0c;展会一直是前沿技术展示、行业趋势探讨以及商业合作达成的关键平台。CES Asia&#xff08;亚洲消费电子技术展&#xff09;作为亚洲科技领域的重要展会&#xff0c;近日明确提出其“五年计划”&#xff0c;目标是打造与北美展会比肩的科技影响…

【计算机网络 | 第16篇】DNS域名工作原理

文章目录3.5 域名系统工作原理主机的标识方式&#xff1a;域名 vs IP 地址标识转换机制&#xff1a;DNS系统因特网的域名系统&#xff1a;层次域名空间&#x1f426;‍&#x1f525;顶级域名分类低级域名与管理域名与IP的区别因特网的域名系统&#xff1a;域名服务器&#x1f9…

YASKAWA安川机器人铝材焊接节气之道

在铝材焊接领域&#xff0c;保护气体的合理使用对焊接质量与成本控制至关重要。安川焊接机器人凭借高精度与稳定性成为行业常用设备&#xff0c;而WGFACS节气装置的应用&#xff0c;则为其在铝材焊接过程中实现高效节气提供了创新路径。掌握二者结合的节气之道&#xff0c;对提…

GooseDB,一款实现服务器客户端模式的DuckDB

在网上看到韩国公司开发的一款GooseDB&#xff0c; 官方网站对它的介绍是DuckDB™ 的功能扩展分支&#xff0c;具有服务器/客户端、多会话和并发写入支持&#xff0c;使用 PostgreSQL 有线协议&#xff08;DuckDB™是 DuckDB 基金会的商标&#xff09; 使用也很简单&#xff…