Kafka 重复消费与 API 幂等消费解决方案

Kafka 是一个高性能的分布式消息系统,但消费者重启、偏移量(offset)未正确提交或网络问题可能导致重复消费。API 幂等性设计则用于防止重复操作带来的副作用。本文从 Kafka 重复消费和 API 幂等性两个方面提供解决方案,重点深入探讨 事务性偏移量管理 如何实现精确一次消费(exactly-once),并结合其他方法确保消息可靠性和一致性。

1. Kafka 重复消费问题

Kafka 的重复消费问题通常由以下原因引发:消费者异常退出导致偏移量未提交、网络抖动、消费者组再平衡(rebalance)等。以下是解决重复消费的几种方法,重点聚焦事务性偏移量管理。

1.1 启用消费者幂等性

  • 手动提交偏移量

    • 设置 enable.auto.commit=false,在消息处理成功后手动提交偏移量(commitSynccommitAsync),确保消费与业务处理一致,减少重复消费风险。
    • commitSync:同步提交,阻塞直到 Broker 确认,适合高一致性场景,但可能降低吞吐量。
    • commitAsync:异步提交,非阻塞,适合高吞吐场景,但需通过回调(OffsetCommitCallback)监控提交失败并重试,以避免偏移量丢失导致重复消费。
    • 示例:
      Properties props = new Properties();
      props.put("enable.auto.commit", "false");
      KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
      consumer.subscribe(Arrays.asList("my-topic"));
      while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {// 处理消息processRecord(record);}consumer.commitSync(); // 同步提交偏移量
      }
      
  • 事务性消费(重点:事务性偏移量管理)

    • 核心原理:通过 Kafka 的事务机制,将消息生产、消费和偏移量提交绑定在一个原子操作中,确保消息只被处理一次(exactly-once)。这依赖于生产者事务(transactional.id)和消费者隔离级别(isolation.level=read_committed)。
    • 事务性偏移量管理的实现
      • 生产者事务:生产者配置 transactional.idenable.idempotence=true,通过 initTransactions()beginTransaction()commitTransaction() 等操作管理事务。生产者使用 sendOffsetsToTransaction() 将消费者偏移量纳入事务,确保偏移量提交与消息写入原子性一致。
      • 消费者隔离级别:消费者设置 isolation.level=read_committed,只读取已提交的事务消息,未提交或回滚的消息对消费者不可见。
      • 偏移量存储:消费者偏移量存储在 Kafka 内部主题 __consumer_offsets 中,事务性提交通过生产者的事务机制记录,确保偏移量与消息处理同步。
  • 代码示例:

    public class TransUse {public static void main(String[] args) {Consumer<String, String> consumer = createConsumer();Producer<String, String> producer = createProduceer();// 初始化事务producer.initTransactions();while(true) {try {// 1. 开启事务producer.beginTransaction();// 2. 定义Map结构,用于保存分区对应的offsetMap<TopicPartition, OffsetAndMetadata> offsetCommits = new HashMap<>();// 2. 拉取消息ConsumerRecords<String, String> records = consumer.poll(2000);for (ConsumerRecord<String, String> record : records) {// 3. 保存偏移量offsetCommits.put(new TopicPartition(record.topic(), record.partition()),new OffsetAndMetadata(record.offset() + 1));// 4. 进行转换处理String[] fields = record.value().split(",");fields[1] = fields[1].equalsIgnoreCase("1") ? "男":"女";String message = fields[0] + "," + fields[1] + "," + fields[2];// 5. 生产消息到dwd_userproducer.send(new ProducerRecord<>("dwd_user", message));}// 6. 提交偏移量到事务producer.sendOffsetsToTransaction(offsetCommits, "ods_user");// 7. 提交事务producer.commitTransaction();} catch (Exception e) {// 8. 放弃事务producer.abortTransaction();}}}// 1. 创建消费者public static Consumer<String, String> createConsumer() {// 1. 创建Kafka消费者配置Properties props = new Properties();props.setProperty("bootstrap.servers", "node-1:9092");props.setProperty("group.id", "ods_user");props.put("isolation.level","read_committed");props.setProperty("enable.auto.commit", "false");props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");// 2. 创建Kafka消费者KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);// 3. 订阅要消费的主题consumer.subscribe(Arrays.asList("ods_user"));return consumer;}// 2. 创建生产者public static Producer<String, String> createProduceer() {// 1. 创建生产者配置Properties props = new Properties();props.put("bootstrap.servers", "node-1:9092");props.put("transactional.id", "dwd_user");props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");// 2. 创建生产者Producer<String, String> producer = new KafkaProducer<>(props);return producer;}}
    
    • 深入理解事务性偏移量管理
      • 原子性:事务性偏移量提交将消息写入、业务处理和偏移量提交绑定在一个事务中,确保三者要么全成功,要么全失败。例如,若消费者处理消息后数据库操作失败,事务回滚,偏移量不会提交,消费者可重新消费。
      • 去重机制:Broker 根据 transactional.id 和序列号(Sequence Number)对生产者消息去重,防止重复写入。消费者通过 read_committed 隔离级别避免读取未提交消息。
      • 偏移量持久化:偏移量记录在 __consumer_offsets 主题中,事务性提交通过事务协调器(Transaction Coordinator)管理,确保偏移量与消息一致。
      • 故障恢复:消费者重启后,从 __consumer_offsets 中读取最后提交的偏移量开始消费。由于事务性提交保证偏移量与消息处理一致,不会重复消费。
    • 适用场景
      • 金融系统:如支付、转账,确保每笔交易只处理一次。
      • 订单处理:防止重复创建订单。
      • 数据同步:确保数据从源到目标的精确一次传递。
    • 性能考量
      • 事务增加日志写入和协调开销,适合高一致性场景。
      • 建议保持事务范围短,避免长时间占用资源。
    • 版本要求:Kafka 0.11.0+ 支持事务,推荐 2.0+ 版本以获得更稳定的事务支持。

1.2 业务层去重

  • 方法:在消息中添加唯一标识(如消息ID、业务ID),消费者端通过数据库(如 Redis、MySQL)或内存记录已处理的消息ID,消费前检查是否重复。
  • 数据库表结构示例
    CREATE TABLE consumed_messages (message_id VARCHAR(64) PRIMARY KEY,consume_time TIMESTAMP
    );
    
    消费时查询 message_id 是否存在,若存在则跳过。
  • Redis 实现
    if (redis.exists(messageId)) {return; // 跳过重复消息
    }
    // 处理消息
    processMessage(message);
    redis.set(messageId, "processed", EXPIRE_TIME_SECONDS);
    
  • 优势:简单易实现,适合无事务支持的旧版本 Kafka 或非严格 exactly-once 场景。
  • 局限:增加存储和查询开销,需定期清理去重记录。

1.3 偏移量管理

  • 可靠提交
    • 使用 commitSync() 确保偏移量提交成功,适合高一致性场景。
    • 使用 commitAsync() 提高吞吐量,但需通过回调监控失败并重试:
      consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + exception);// 重试或记录日志}
      });
      
  • 外部存储
    • 将偏移量存储到外部系统(如 Redis、ZooKeeper),异常恢复时从外部读取正确偏移量。
    • 示例(Redis):
      redis.set("consumer:group:offset", offset);
      
  • 注意:外部存储需保证一致性,可能增加复杂度,事务性偏移量管理更推荐。

1.4 消费者组优化

  • 唯一消费者组ID:确保 group.id 唯一,避免多个消费者组重复消费同一分区。
  • 配置超时参数
    • session.timeout.ms:建议 10-20 秒(如 10000ms),避免消费者因网络延迟被踢出组。
    • max.poll.interval.ms:建议 5-10 分钟(如 300000ms),适应消息处理耗时,避免超时触发再平衡。
    • 示例:
      props.put("session.timeout.ms", "10000");
      props.put("max.poll.interval.ms", "300000");
      
  • 监控再平衡:通过日志或 JMX 指标检查再平衡频率,优化参数以减少偏移量混乱。

2. API 幂等消费问题

API 幂等性确保多次调用同一 API 产生相同结果,防止重复操作的副作用。结合 Kafka,解决方法如下:

2.1 Kafka 生产者幂等性

  • 配置
    • 设置 enable.idempotence=true,Kafka 自动为消息分配序列号和分区标识,Broker 端去重。
    • 配置 retries=5acks=-1,确保消息可靠投递:
      props.put("enable.idempotence", "true");
      props.put("retries", "5");
      props.put("acks", "all");
      
  • 作用:生产者重试不会导致消息重复写入,Broker 根据序列号去重。

2.2 API 层幂等设计

  • 唯一请求ID
    • 为每个 API 请求生成唯一 ID(如 UUID),服务端用 Redis 或数据库记录已处理请求。
    • 示例(Redis):
      if (redis.exists(requestId)) {return cachedResult;
      }
      redis.set(requestId, result, EXPIRE_TIME_SECONDS);
      
  • 数据库约束
    • 使用唯一约束(如订单号)防止重复插入:
      CREATE TABLE orders (order_id VARCHAR(64) PRIMARY KEY,amount DECIMAL,create_time TIMESTAMP
      );
      
      插入时捕获唯一约束异常并返回。

2.3 结合 Kafka 事务

  • 方法:使用事务性生产者(transactional.id),将 API 操作(如数据库写入)和消息发送绑定在同一事务中,确保原子性。
  • 示例
    producer.initTransactions();
    producer.beginTransaction();
    try {producer.send(new ProducerRecord<>("topic", message));db.save(order); // 数据库操作producer.commitTransaction();
    } catch (Exception e) {producer.abortTransaction();throw e;
    }
    
  • 作用:事务失败时,消息和数据库操作均回滚,避免不一致。

3. 综合建议

  • 短事务:尽量减少事务范围(如仅包含必要操作),降低资源占用。
  • 分布式锁:在分布式系统中,使用 Redis 或 ZooKeeper 实现锁,防止并发重复处理。
  • 监控与日志:记录消息ID、处理时间等日志,便于排查重复消费问题。
  • 超时与重试:设置合理超时(如 request.timeout.ms)和重试次数(如 retries),避免无限重试。

4. 注意事项

  • 性能与一致性权衡
    • Redis 适合高性能去重,数据库适合强一致性场景。
    • 事务性机制增加开销,适合高一致性需求场景(如金融、订单)。
  • Kafka 版本:exactly-once 语义需 Kafka 0.11.0+,推荐 2.0+。
  • 清理去重记录:设置 Redis 过期时间或定期清理数据库记录,避免存储膨胀。
  • Broker 配置
    • min.insync.replicas=2:确保 acks=-1 的可靠性。
    • transaction.state.log.replication.factor=3:事务日志高可用。
    • num.partitions__consumer_offsets__transaction_state):建议 ≥50,提高并发性。

5. 深入理解事务性偏移量管理的优势

  • 一致性:事务性偏移量提交确保消息处理、偏移量更新和外部操作(如数据库写入)原子性一致,消除了重复消费和消息丢失的风险。
  • 容错性:消费者重启后,从 __consumer_offsets 中读取最后提交的偏移量,确保从正确位置继续消费。
  • 可扩展性:事务机制支持分布式环境,生产者和消费者可跨节点协作,适合复杂系统。
  • Broker 支持
    • 事务协调器(Transaction Coordinator)管理事务状态,存储在 __transaction_state 主题。
    • Broker 去重机制(基于 transactional.id 和序列号)防止重复写入。
  • 实现复杂度
    • 需要生产者和消费者协同配置(transactional.idisolation.level)。
    • 事务性偏移量提交通常由生产者通过 sendOffsetsToTransaction() 完成,消费者仅需确保 read_committed 和手动提交。

6. 总结

通过事务性偏移量管理,Kafka 结合生产者事务(transactional.idenable.idempotence=trueacks=-1)和消费者配置(isolation.level=read_committedenable.auto.commit=false),实现消息从生产到消费的精确一次语义。事务性偏移量提交将消息写入、业务处理和偏移量更新绑定在一个原子事务中,确保不重复、不丢失。结合业务层去重、偏移量管理和消费者组优化,可进一步提升系统可靠性。Broker 端通过事务协调器和内部主题(__consumer_offsets__transaction_state)支持事务性机制,确保高一致性场景下的可靠投递和消费。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916925.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916925.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

win11推迟更新

1、按住WINR2、输入以下命令&#xff1a;reg add "HKEY_LOCAL_MACHINE\SOFTWARE\Microsoft\WindowsUpdate\UX\Settings" /v FlightSettingsMaxPauseDays /t reg_dword /d 10000 /f3、点击确定4、打开搜索框5、在搜索框里边输入更新&#xff0c;选择检查更新6、在暂停…

【uniapp】---- 使用 uniapp 实现视频和图片上传且都可以预览展示

1. 前言 接手得 uniapp 开发的微信小程序项目,新的开发需求是需要同时上传图片和视频,但是之前的上传都没有进行封装,都是每个页面需要的时候单独实现,现在新的需求,有多个地方都需要上传图片、视频或语音等,这样就需要封装一个组件,然后发现部分地方使用了 uni-file-p…

(nice!!!) (LeetCode 每日一题) 2411. 按位或最大的最小子数组长度(位运算+滑动窗口)

2411. 按位或最大的最小子数组长度 思路&#xff1a;位运算滑动窗口&#xff0c;时间复杂度0(n*32)。 **遍历每一个元素nums[i]&#xff0c;然后看能否改变它前面的元素nums[j]&#xff08; j<i &#xff09;&#xff0c; 当(nums[j]|nums[i])nums[j]时&#xff0c;说明当前…

算法竞赛阶段二-数据结构(36)数据结构双向链表模拟实现

//#include<bits/stdc.h> #include<iostream> using namespace std; const int N1e510; //定义 int e[N],pre[N],ne[N],h,id; int mp[N]; //头插 // 兵 y // x void push_front (int x) {id;e[id]x;mp[x]id;pre[id]h;ne[id]ne[h];//先修改新节点…

津发科技带你了解皮肤电信号中的SCL与SCR

皮肤电&#xff08;Electrodermal Activity, EDA&#xff09;作为一种非常容易获取的基本生理信号&#xff0c;可以很好地量化我们的情绪反应&#xff0c;被广泛应用于情感识别研究中。它代表机体受到刺激时皮肤电传导的变化。皮肤电反应作为交感神经系统功能的直接指标&#x…

spark的broadcast variables

在 Spark 中&#xff0c;广播变量&#xff08;Broadcast Variables&#xff09; 是一种特殊类型的共享变量&#xff0c;用于高效地在集群中的所有节点间分发大型只读数据集。它解决了 Spark 任务中频繁传输重复数据的性能问题&#xff0c;特别适用于需要在多个任务中重用相同数…

Python爬虫实战:研究Haul库相关技术构建电商数据采集与分析系统

1. 引言 1.1 研究背景与意义 随着电子商务的迅速发展,电商平台上的商品数据呈现爆炸式增长。这些数据蕴含着丰富的商业价值,如消费者行为分析、市场趋势预测、竞争对手监测等。然而,如何从海量的电商数据中获取有价值的信息,成为当前电商企业面临的重要挑战。 网络爬虫技…

Java:高频面试知识分享1

一、Java 语言核心特性&#xff08;面向对象编程&#xff09;核心知识点梳理&#xff1a;面向对象三大特性&#xff1a;封装&#xff1a;隐藏对象内部实现&#xff0c;通过 public 方法暴露接口&#xff08;例&#xff1a;类的 private 字段 get/set 方法&#xff09;。继承&a…

MybatisPlus-核心功能

目录 条件构造器 QueryWrapper UpdateWrapper LambdaQueryWrapper 自定义SQL 基本用法 多表关联 Service接口 CRUD 基本用法 Lambda 批量新增 条件构造器 除了新增以外&#xff0c;修改、删除、查询的SQL语句都需要指定where条件。因此BaseMapper中提供的相关方法…

RHCE综合项目:分布式LNMP私有博客服务部署

一、项目概述本次项目基于LNMP&#xff08;linux&#xff0c;nginx&#xff0c;mariadb&#xff0c;php&#xff09;搭建了一个私有的博客平台&#xff0c;本篇博客详细记录了该博客平台的服务部署全流程。在该项目中&#xff0c;使用了两台linux&#xff08;openeuler&#xf…

5种安全方法:如何删除三星手机上的所有内容

随着新的三星设备不断推出&#xff0c;在出售或捐赠旧手机之前&#xff0c;彻底清除旧手机上的数据以保护隐私至关重要。许多人不知道的是&#xff0c;简单的删除操作并不能完全清除三星设备上的数据&#xff0c;被删除的文件可能会处于不可见状态。本文介绍了如何彻底删除三星…

Vue 3 入门教程 2- Vue 组件基础与模板语法

一、Vue 组件基础在 Vue 中&#xff0c;组件是构建用户界面的基本单位&#xff0c;它可以将页面拆分成多个独立、可复用的部分。一个 Vue 组件通常以 .vue 文件名结尾&#xff0c;包含三个核心部分&#xff1a;模板&#xff08;Template&#xff09;、脚本&#xff08;Script&a…

Linux 进程管理与计划任务详解

Linux 进程管理与计划任务详解 一、程序与进程的基本概念 程序&#xff1a;保存在外部存储介质中的可执行机器代码和数据的静态集合&#xff0c;是静态的文件实体进程&#xff1a;在 CPU 及内存中处于动态执行状态的计算机程序&#xff0c;是程序的动态执行实例关联关系&#x…

分层解耦(Controller,Service,Dao)

1. 三层架构核心职责层级职责说明关键技术 / 注解Controller&#xff08;控制器&#xff09;1. 接收前端请求&#xff08;HTTP&#xff09; 2. 封装参数、校验 3. 调用 Service 处理业务 4. 返回视图 / 数据给前端Controller、GetMapping等Service&#xff08;业务层&#xff0…

镁金属接骨螺钉注册检测:骨科植入安全的科学基石

在骨科治疗领域&#xff0c;镁金属接骨螺钉凭借其可降解性与生物相容性&#xff0c;成为传统金属植入物的革新替代方案。然而&#xff0c;作为Ⅲ类高风险无源植入器械&#xff08;分类编码13-01-01&#xff09;&#xff0c;其注册检测需覆盖生物相容性、化学表征、降解性能、力…

模具开发和管理系统(c#)

以前编写的一个管理模具开发和进度的程序&#xff0c;可以跟踪模具开发进度&#xff0c;可以查询模具具体情况&#xff0c;也可以用水晶报表查询。OS&#xff1a;microsoft windows IDE&#xff1a;microsoft visual studio programming language&#xff1a;C# DataBase&#…

【WRF-Chem 实例1】namelist.input 详解- 模拟CO2

目录 &time_control(时间控制) &physics(物理过程参数化方案) &fdda(四维数据同化) 工作机制简述 &dynamics(WRF 动力核心的数值方法和选项) &bdy_control(边界控制设置) &chem(WRF-Chem 主要化学设置) &namelist_quilt(并行 I/O 控制…

数据中心-时序数据库InfluxDB

目录 一、InfluxDB介绍 1.1 什么是InfluxDB&#xff1f; 1.2 应用场景 1.3 特点 1.4 版本差异 二、数据模型和存储架构 2.1 相关概念 2.2 存储架构 三、InfluxDB基础操作 3.1 数据库操作 3.2 数据表操作 显示所有表 新建表 删除表 3.3 数据保存策略 查看保存策…

webpack-高级配置

多入口文件 如何输出多个html文件 输入位置 需要写两个entryoutput位置也要改一下 加一个name避免重名 在生成html时 要根据每一个入口都写一个插件 并且chunks要写好 当前html引入哪些文件如何抽离压缩css文件 安装插件在rules里面添加插件plugins中添加css抽离代码压缩css抽离…

WinForm组件之Label 控件

Label 控件Label 控件是 WinForm 中最基础、最常用的控件之一&#xff0c;主要用于在界面上显示文本信息&#xff0c;通常作为说明、提示或标题&#xff0c;不直接接受用户输入。它是构建用户界面的基础组件&#xff0c;在引导用户操作、展示状态信息等方面发挥重要作用。Label…