Kafka消费者组位移重设指南

#作者:张桐瑞

文章目录

  • 一、Kafka 与传统消息引擎的核心差异
  • 二、重设消费者组位移的核心原因
  • 三、重设位移的两大维度与七种策略
  • 四、重设位移的实现方式
    • (一)Java API 方式
    • (二)命令行脚本方式(Kafka 0.11+)
  • 五、注意事项

一、Kafka 与传统消息引擎的核心差异

特性Kafka传统消息引擎(如 RabbitMQ、ActiveMQ)
消息处理方式基于日志结构,只读不删除,支持消息重演破坏性处理,成功消费后删除消息
位移控制消费者自主控制位移,可灵活修改实现重复消费由中间件自动管理,通常无法回溯
适用场景高吞吐量、低单消息处理耗时、强顺序性要求复杂消息处理逻辑、弱顺序性要求

二、重设消费者组位移的核心原因

  1. 重复消费历史数据
    1)修正消费逻辑错误后,需要重新处理历史消息。
    2)业务需求变更(如数据重新计算、补写下游存储)。
  2. 跳过异常消息
    1)处理 corrupted 消息或消费逻辑抛出异常时,通过指定位移跳过无效消息。
  3. 动态调整消费进度
    2)基于时间维度(如消费近 30 分钟数据)或位移维度(如从最新 / 最早位置开始)灵活调整消费起点。
  4. 回滚消费进度
    1)代码变更失败后,需回滚到历史位移继续消费。

三、重设位移的两大维度与七种策略

(一)位移维度策略

策略说明典型场景
Earliest重置到主题当前最早位移(可能大于 0,受日志保留策略影响)重新消费主题所有可保留的历史消息
Latest重置到主题最新末端位移跳过所有历史消息,从最新消息开始消费
Current重置到消费者当前提交的最新位移回滚代码变更后,恢复到重启前的消费位置
Specified-Offset指定绝对位移值手动跳过某条异常消息(如位移 1234)
Shift-By-N指定相对位移偏移量(N 可正可负)向前跳过 100 条(N=-100)或向后跳过 50 条(N=50)

(二)时间维度策略

策略说明格式要求典型场景
DateTime重置到指定时间之后的最小位移YYYY-MM-DDTHH:mm:ss.SSS(如2023-10-01T12:00:00.000)重新消费昨天 0 点之后的数据
Duration重置到相对当前时间的间隔位移符合 ISO-8601 的PnDTnHnMnS(如PT15M表示 15 分钟前)消费 30 分钟前的所有消息

四、重设位移的实现方式

(一)Java API 方式

核心方法

方法作用
seek(TopicPartition partition, long offset)为单个分区设置绝对位移
seekToBeginning(Collection<TopicPartition> partitions)将多个分区重置到最早位移
seekToEnd(Collection<TopicPartition> partitions)将多个分区重置到最新位移
offsetsForTimes(Map<TopicPartition, Long> timestamps)根据时间戳查找对应的位移

示例代码

  1. Earliest 策略
Properties props = new Properties();
props.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) {consumer.subscribe(Collections.singleton("test-topic"));consumer.poll(0); // 触发元数据更新List<TopicPartition> partitions = consumer.partitionsFor("test-topic").stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toList());consumer.seekToBeginning(partitions); // 重置所有分区到最早位移
}
  1. DateTime 策略(重设到 2023-10-01 12:00:00)
long timestamp = LocalDateTime.of(2023, 10, 1, 12, 0).toInstant(ZoneOffset.ofHours(8)).toEpochMilli();
Map<TopicPartition, Long> timeMap = consumer.partitionsFor("test-topic").stream().map(info -> new TopicPartition(info.topic(), info.partition())).collect(Collectors.toMap(tp -> tp, tp -> timestamp));
Map<TopicPartition, OffsetAndTimestamp> offsets = consumer.offsetsForTimes(timeMap);
offsets.forEach((tp, oa) -> consumer.seek(tp, oa.offset()));

(二)命令行脚本方式(Kafka 0.11+)

bin/kafka-consumer-groups.sh --bootstrap-server <broker地址> --group <消费组名> --reset-offsets [策略参数] --execute
策略	命令示例
Earliest	--to-earliest
Latest	--to-latest
Current	--to-current
Specified-Offset	--to-offset 1234
Shift-By-N	--shift-by -100(向前跳 100 条)
DateTime	--to-datetime "2023-10-01T12:00:00.000"
Duration	--by-duration PT30M(30 分钟前)

五、注意事项

  1. 消费组状态
    1)重设位移时,确保消费组未处于运行状态,避免位移冲突。
  2. 日志保留策略
    1)Earliest策略受log.retention.hours等配置限制,可能无法重置到 0 位移。
  3. 分区分配
    1)API 方式需显式处理所有分区(如通过partitionsFor获取分区列表),避免遗漏。
  4. 事务性消息
    1)若消费事务性主题,需结合isolation.level=read_committed确保一致性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/84573.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

分类模型:逻辑回归

1、针对设计&#xff1a;二分类 Logistic 回归最初是为二分类问题设计的&#xff0c; Logistic 回归基于概率&#xff0c;通过 Sigmoid 函数转换输入特征的线性组合&#xff0c;将任意实数映射到 [0, 1] 区间内。 通过引入一个决策规则&#xff08;通常是概率的阈值&#xff…

CppCon 2015 学习:C++ WAT

这段代码展示了 C 中的一些有趣和令人困惑的特性&#xff0c;尤其是涉及数组访问和某些语法的巧妙之处。让我们逐个分析&#xff1a; 1. assert(map[“Hello world!”] e;) 这一行看起来很不寻常&#xff0c;因为 map 在这里被用作数组下标访问器&#xff0c;但是在前面没有…

vscode自定义主题语法及流程

vscode c/c 主题 DIY 启用自己的主题(最后步骤) 重启生效 文件–>首选项–>主题–>颜色主题: 也可以在插件里找到哈 手把手教你制作 在C:\Users\jlh.vscode\extensions下自己创建一个文件夹 里面有两个文件和一个文件夹 具体内容: package.json: {"name&…

前端传递日期范围(开始时间和结束时间),后端解析及查询

前端技术&#xff1a;Vue3 TypeScript Element Plus 后端技术&#xff1a;Java Spring Boot MyBatis 应用效果&#xff1a; 原来方案 1、前端日期控件使用 el-date-picker&#xff0c;日期显示格式和日期值返回格式都为&#xff1a;YYYY-MM-DD <el-form :model"…

零基础设计模式——行为型模式 - 命令模式

第四部分&#xff1a;行为型模式 - 命令模式 (Command Pattern) 接下来&#xff0c;我们学习行为型模式中的命令模式。这个模式能将“请求”封装成一个对象&#xff0c;从而让你能够参数化客户端对象&#xff0c;将请求排队或记录请求日志&#xff0c;以及支持可撤销的操作。 …

禁止 Windows 更新后自动重启

Windows 默认会在安装重要更新后自动重启&#xff0c;但你可以调整设置来避免这种情况&#xff1a; ​​方法 1&#xff1a;通过组策略&#xff08;适用于 Windows 专业版/企业版&#xff09;​​ 按 Win R&#xff0c;输入 gpedit.msc 打开 ​​本地组策略编辑器​​。导航…

GoldenDB简述

GoldenDB是国产的分布式数据库。它彻底解决了事务一致性&#xff0c;数据实时一致性的问题。采用的是Shared Nothing&#xff08;分片式存储&#xff09;的分布式架构。就是不共享数据&#xff0c;各自节点持有各自的数据。对比不共享的&#xff0c;还有其他两种分布式架构&…

训练过程中的 Loss ?

文章目录 在我们训练的过程中&#xff0c;设置好这个epochs也就是训练的轮次&#xff0c;然后计算这个损失函数&#xff0c;我们可以知道这个具体的训练的情况&#xff0c;那么在训练的过程中&#xff0c;这个损失函数的变化有哪些情况&#xff1f;对应的一个解释情况是怎么样的…

S2B2B农产品供应链交易多平台开发有哪些发展前景?如何维护?

一、S2B2B农产品供应链交易多平台开发的未来发展前景 本文将由小编为您介绍关于S2B2B农产品供应链交易多平台开发的内容&#xff0c;希望能够帮助大家。在数字化时代&#xff0c;农产品供应链的数字化转型成为了一种必然趋势。S2B2B(Supplier to Business to Business)模式通过…

关于有害的过度使用 std::move

翻译&#xff1a;2023 11 月 24 日On harmful overuse of std::move cppreference std::move 论 std::move 的有害过度使用 - The Old New Thing C 的 std::move 函数将其参数转换为右值引用&#xff0c;这使得其内容可以被另一个操作“消费”&#xff08;移动&#xff09;。…

Ubuntu24.04 onnx 模型转 rknn

前面的环境配置有点懒得写&#xff0c;教程也很多&#xff0c;可以自己找 rknn-toolkit2 gitee 地址&#xff1a;pingli/rknn-toolkit2 试了很多开源的代码&#xff0c;都没办法跑通&#xff0c; 最后自己改了一版 微调后的 qwen2 模型适用 from rknn.api import RKNN impor…

Electron通信流程

前言 今天讲Electron框架的通信流程&#xff0c;首先我们需要知道为什么需要通信。这得益于Electron的多进程模型&#xff0c;它主要模仿chrome的多进程模型如下图&#xff1a; 作为应用开发者&#xff0c;我们将控制两种类型的进程&#xff1a;主进程和渲染器进程 。 …

uni-app项目实战笔记1--创建项目和实现首页轮播图功能

ps:本笔记来自B站咸虾米壁纸项目 一.创建项目&#xff0c;完成项目初始化搭建 1.在HBuilder X创建wallper项目&#xff0c;使用默认模块&#xff0c;选择vue&#xff1b; 2.在项目根目录下创建common目录&#xff0c;用于存放静态资源&#xff0c;创建项目时自动生成static目…

机械制造系统中 PROFINET 与 PROFIBUS-DP 的融合应用及捷米科技解决方案

在机械制造领域&#xff0c;工业通信网络的兼容性与灵活性直接影响产线的自动化水平与生产效率。当前&#xff0c;多数机械制造系统采用PROFINET 控制器构建核心网络架构&#xff0c;并通过微波无线连接实现设备互联。随着工业网络的发展&#xff0c;系统中常需同时集成PROFINE…

MCP 协议系列序言篇:开启 AI 应用融合新时代的钥匙

文章目录 序言&#xff1a;AI 应用层进入 MCP 时代为什么 MCP 开启 AI 应用融合新时代的钥匙为什么是 MCP&#xff1f;它与 Function Calling、Agent 有什么区别&#xff1f;Function CallingAI AgentMCP&#xff08;Model Context Protocol&#xff09; MCP 如何工作MCP Serve…

【threejs】每天一个小案例讲解:光照

代码仓 GitHub - TiffanyHoo/three_practices: Learning three.js together! 可自行clone&#xff0c;无需安装依赖&#xff0c;直接liver-server运行/直接打开chapter01中的html文件 运行效果图 知识要点 常见光照类型及其特点如下&#xff1a; 1. 环境光&#xff08;Ambi…

大模型在输尿管下段积水预测及临床应用的研究

目录 一、引言 1.1 研究背景与意义 1.2 研究目的 1.3 研究范围与限制 1.4 文献综述 1.5 研究方法和框架 二、相关理论与概念 2.1 大模型技术原理 2.2 输尿管下段积水病理机制 2.3 大模型在医学预测领域的应用 三、大模型预测输尿管下段积水的方法 3.1 数据收集 3.…

gitlab相关操作

2025.06.11今天我学习了如何在终端使用git相关操作&#xff1a; 一、需要修改新的仓库git地址的时候&#xff1a; &#xff08;1&#xff09;检查当前远程仓库 git remote -v 输出示例&#xff1a; origin https://github.com/old-repo.git (fetch) origin https://github.c…

51c自动驾驶~合集58

我自己的原文哦~ https://blog.51cto.com/whaosoft/13967107 #CCA-Attention 全局池化局部保留&#xff0c;CCA-Attention为LLM长文本建模带来突破性进展 琶洲实验室、华南理工大学联合推出关键上下文感知注意力机制&#xff08;CCA-Attention&#xff09;&#xff0c;…

通过共享内存在多程序之间实现数据通信

注&#xff1a;以下内容为与 GPT-4O 共同创作完成 以共享内存的方式实现多程序之间的数据通信&#xff0c;尤其适合在一台机器上的多程序之间进行高频数据交换。 以下示例展示了 sender.py 向 receiver.py 发送数据并接收经 receiver.py 处理后的数据&#xff0c;以及如何通过…