Kafka如何配置生产者拦截器和消费者拦截器

Kafka 的生产者拦截器和消费者拦截器允许你在消息发送前后以及消息消费前后嵌入自定义逻辑,用于实现监控、审计、消息修改等功能。本文我们就用一个最常见的传递TraceId的案例来说明下这两类拦截器如何来使用。

生产者发送拦截器

生产者拦截器需要实现 org.apache.kafka.clients.producer.ProducerInterceptor 接口。在这个拦截器中,我们把保存到ThreadLocal中的traceId设置到消息的header中。

步骤 1:实现拦截器类

创建一个类,实现 ProducerInterceptor 接口。该接口有两个核心方法:

  • onSend(ProducerRecord record): 在消息被序列化和计算分区之前调用。你可以修改或记录消息。

  • onAcknowledgement(RecordMetadata metadata, Exception exception): 在消息被服务器确认(成功或失败)之后调用。这会在生产者回调触发之前调用。注意:该方法不要在 ProducerInterceptor 中实现耗时逻辑,因为它会阻塞生产者。

public class SendTraceIdInterceptor implements ProducerInterceptor<String, String> {@Overridepublic ProducerRecord<String, String> onSend(ProducerRecord<String, String> producerRecord) {// 把TheadLocal中traceId设置到header中producerRecord.headers().add(RequestContext.TRACE_ID, RequestContext.getTraceId().getBytes());return producerRecord;}@Overridepublic void onAcknowledgement(RecordMetadata recordMetadata, Exception e) {if(e == null){log.info("send successfully");} else {log.error("send error : {}", e);}}@Overridepublic void close() {}// 这里可以拿到所有的producer的配置信息@Overridepublic void configure(Map<String, ?> map) {log.info("configure:{}", map);}
}
步骤 2:在生产者配置中指定拦截器
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址producer:key-serializer: org.apache.kafka.common.serialization.StringSerializervalue-serializer: org.apache.kafka.common.serialization.StringSerializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.SendTraceIdInterceptor

消费者接收拦截器

消费者拦截器需要实现 org.apache.kafka.clients.consumer.ConsumerInterceptor 接口。在这个拦截器中我们读取消息中的header并重新设置到ThreadLocal中。

步骤 1:实现拦截器类

创建一个类,实现 ConsumerInterceptor 接口。该接口也有两个核心方法:

  • onConsume(ConsumerRecords records): 在消息被反序列化之后、传递给消费者poll()方法返回之前调用。你可以修改或过滤消息。
  • onCommit(Map offsets): 在消费者提交偏移量之后调用。
public class ReceiveTraceIdInterceptor implements ConsumerInterceptor<String, String> {private static Logger log = LoggerFactory.getLogger(ReceiveTraceIdInterceptor.class);@Overridepublic ConsumerRecords<String, String> onConsume(ConsumerRecords<String, String> records) {for(Iterator<ConsumerRecord<String, String>> recordIterator = records.iterator(); recordIterator.hasNext();){ConsumerRecord<String, String> record = recordIterator.next();Headers headers = record.headers();if(headers == null){continue;}for(Iterator<Header> headerIterator = headers.iterator(); headerIterator.hasNext();){Header header = headerIterator.next();// 从header中获取traceId, 并保存到ThreadLocal          if(Objects.equals(header.key(), RequestContext.TRACE_ID)){RequestContext.setTraceId(new String(header.value()));}}}return records;}@Overridepublic void onCommit(Map<TopicPartition, OffsetAndMetadata> offsets) {}@Overridepublic void close() {}// 这里可以拿到所有的消费者的配置@Overridepublic void configure(Map<String, ?> configs) {log.info("consumer configure:{}", configs);}
}
步骤 2:在消费者配置中指定拦截器
spring:kafka:bootstrap-servers: localhost:9092  # Kafka服务器地址consumer:group-id: my-group  # 默认的消费者组IDauto-offset-reset: earliest  # 如果没有初始偏移量或偏移量已失效,从最早的消息开始读取key-deserializer: org.apache.kafka.common.serialization.StringDeserializervalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerproperties:interceptor.classes: com.github.xjs.kafka.interceptor.ReceiveTraceIdInterceptor    

总结

位置
  • 生产者拦截器:在消息序列化和分区之前(onSend)以及确认之后(onAcknowledgement)调用。
  • 消费者拦截器:在消息反序列化之后、返回给用户之前(onConsume)以及提交偏移量之后(onCommit)调用。
配置

使用 ProducerConfig.INTERCEPTOR_CLASSES_CONFIGConsumerConfig.INTERCEPTOR_CLASSES_CONFIG 属性进行配置。
值是该拦截器类的全限定名,多个拦截器用逗号分隔,它们会按照配置的顺序执行。

用途
  • 监控和审计:记录消息发送/接收的成功失败、延迟等。
  • 消息修改:在发送前给消息添加统一前缀或头信息。
  • 自定义指标:与监控系统(如 Prometheus)集成,收集特定指标。
  • 过滤:消费者端可以尝试过滤消息,比如:本地local开发环境和测试服务器的test环境可能使用的是同一套kafka服务,我们可以在消息头中传递环境标识,在消费者端去过滤只属于自己这个环境的消息,从而防止引起混乱。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/97900.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/97900.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

vue表单弹窗最大化无法渲染复杂组件内容

背景&#xff1a;最大化后选然后复杂组件内容丢失&#xff0c;如下拉框、图片上传组件修复方案&#xff1a;使用深拷贝核心代码this.maximizeDialog {visible: true,title: 患者申请 - 最大化查看,formModel: JSON.parse(JSON.stringify(this.formModel || [])),formLogic: JS…

经典俄罗斯方块游戏 | 安卓三模式畅玩,暂时无广告!

大家好&#xff0c;今天想跟大家分享一款安卓版的俄罗斯方块游戏。适合无聊的时候玩玩&#xff0c;换换脑子&#xff0c;这款游戏太经典。80、90都玩过这个游戏。之前我也给大家推荐过一些离线小游戏&#xff0c;但有些用着用着就开始出现弹窗广告&#xff0c;这就有点烦&#…

今天开始学习新内容“服务集群与自动化”--crond服务、--syslog服务以及DHCP协议

一.crond简介1、基本介绍crond是linux下用来周期性的执行某种任务或等待处理某些事件的一个守护进程&#xff0c;与windows下的计划任务类似&#xff0c;当安装完成操作系统后&#xff0c;默认会安装此服务工具&#xff0c;并且会自动启动crond进程&#xff0c;crond进程每分钟…

从go语言出发,搭建多语言云原生场景下全链路观测体系

一、方案背景 在公司内部devops平台的微服务化改造过程中&#xff0c;我们遇到了典型的分布式系统观测难题&#xff1a;服务间调用链路复杂、性能瓶颈难以定位、故障排查效率低下。特别是在生产环境出现问题时&#xff0c;往往需要花费大量时间在各个服务的日志中寻找蛛丝马迹。…

Vue 进阶实战:从待办清单到完整应用(路由 / 状态管理 / 性能优化全攻略)

Vue 进阶实战&#xff1a;从待办清单到完整应用&#xff08;路由 / 状态管理 / 性能优化全攻略&#xff09; 在上一篇博客里&#xff0c;我们一起实现了能本地存储的待办清单&#xff0c;不少朋友留言说&#xff1a;“学会了基础&#xff0c;但遇到‘登录后才能访问页面’‘多…

uniApp开发XR-Frame微信小程序 | 动态加载与删除模型

在使用xr-frame开发3D小程序时&#xff0c;我们经常需要根据需求去动态加载模型或删除模型&#xff0c;在官方的说明中&#xff0c;提到了相关方法&#xff0c;但并不太明确&#xff0c;也没有确切的实例。 我们先来看一下官方给出的说明。 一. Shadow元素 我们需要用代码动…

把多个 PPT 合并在一起,三步告别复制粘贴

制作部门汇报分册、项目阶段文件等工作需要将多个零散的PPT合并为一份完整文档。手动复制粘贴不仅效率低下&#xff0c;还容易导致格式错乱、动画丢失。本文介绍一种高效方法&#xff0c;三步操作即可将多个PPT文件快速合并为单一文档。无论是整合汇报材料&#xff0c;还是准备…

安卓旋转屏幕后如何防止数据丢失-ViewModel入门

Android ViewModel 入门教程 在日常开发中&#xff0c;当 Activity 因为旋转屏幕或内存回收被销毁重建时&#xff0c;UI 中的数据也会丢失。 这时候&#xff0c;Android Jetpack 提供的 ViewModel 就能帮我们解决这个问题。 1. 什么是 ViewModel ViewModel 是一种架构组件。它专…

Linux 下的 Vim 使用与网络安全配置详解

目录 引言 一、Vim 编辑器的使用 1. Vim 的模式 2. 常用操作命令 3. 保存与退出 4. 多窗口与 Shell 切换 二、Linux 网络基础 1. 网络分类 2. IP 地址与分类 三、网络配置与工具 1. ifconfig 2. netstat 3. wget 4. 主机名与 IP 映射 四、Linux 防火墙与安全设置…

Docker 容器传输文件的常用方法

Docker 容器传输文件的常用方法 在 Docker 日常使用中&#xff0c;经常需要在主机与容器之间传输文件&#xff08;如配置文件、代码包、日志等&#xff09;。以下是四种最常用的实现方式&#xff0c;覆盖临时传输、持久共享、构建集成等不同场景。 1. 使用 docker cp 命令&…

视频转音频在线工具大比拼,哪家体验更胜一筹?

最近工作上遇到了个挺有意思的需求&#xff0c;需要从几个教学视频里提取出音频内容&#xff0c;方便做成播客形式&#xff0c;让学员能随时随地学习。一开始&#xff0c;我以为这活儿挺简单的&#xff0c;不就是把视频里的声音单独弄出来嘛&#xff0c;结果一上手才发现&#…

KafKa02:Kafka配置文件server.properties介绍

一、配置文件位置二、配置文件介绍默认下&#xff1a;9092 是处理消息队列核心业务&#xff08;客户端与 broker 交互&#xff09;的端口9093 是集群内部控制器通信的端口# 指定节点角色&#xff0c;这里同时作为 broker&#xff08;消息代理&#xff09;和 controller&#xf…

哈尔滨云前沿服务器租用托管

黑龙江前沿数据&#xff0c;始建于2005年&#xff0c;多年的历史&#xff0c;专业从事域名注册&#xff0c;虚拟主机&#xff0c;服务器租用&#xff0c;云主机&#xff0c;网站建设等互联网服务。电信/联通/双线/机房/众多机房供您选择&#xff0c;总有一个适合您的服务器&…

Qt开发经验 --- Qt 修改控件样式的方式(16)

文章目录[toc]1 概述2 Qt Style Sheets (QSS)3 使用 QStyle 和 QProxyStyle4 设置 Palette (调色板)5 使用预定义的 QStyle6 直接设置控件属性7 自定义控件绘制更多精彩内容&#x1f449;内容导航 &#x1f448;&#x1f449;Qt开发经验 &#x1f448;1 概述 Qt 提供了多种修改…

Vue3》》Svg图标 封装和使用

SVG 安装插件 npm i vite-plugin-svg-icons // vite.config.ts import { defineConfig } from vite import vue from vitejs/plugin-vue import { createSvgIconsPlugin } from vite-plugin-svg-icons import { resolve } from path export default defineConfig({//配置路径别…

【04】AI辅助编程完整的安卓二次商业实战-寻找修改替换新UI首页图标-菜单图标-消息列表图标-优雅草伊凡

【04】AI辅助编程完整的安卓二次商业实战-寻找修改替换新UI首页图标-菜单图标-消息列表图标-优雅草伊凡引言本次二开布局没有变&#xff0c;但是下一次整体布局会有变&#xff0c;不过本次开发发现朋友圈跳转功能的流程步骤也做了一定的变化。原生项目复杂就复杂于就算一个颜色…

龙蜥8.10中spark各种集群及单机模式的搭建spark3.5.6(基于hadoop3.3.6集群)

先说最终的访问端口&#xff0c;如我这里ip为172.20.94.37、172.20.94.38、172.20.94.39&#xff0c;主机名分别为&#xff1a;hadoop37、hadoop38、hadoop39. 最终访问&#xff08;默认端口&#xff09;&#xff1a; hadoop webui 172.20.94.37:9870 hdfs 端口 8020 yarn 172.…

关于我重新学习 react 的第一遍

今天是25年9月11号&#xff0c;很久很久没有学习前端知识了&#xff0c;坦诚来说还清楚记得在大学里因为前端技术第一次获奖的心情&#xff0c;也清晰记得写完第一篇博客后的心情&#xff0c;工作和运动给我最大程度的成就感。 打破自己 重新开始 完全地 版本一 25.9.11 文章目…

第2课:Agent系统架构与设计模式

第2课&#xff1a;Agent系统架构与设计模式 课程目标 理解Agent的基本概念和特性掌握多Agent系统的设计模式学习Agent通信协议和消息传递实践创建简单的Agent框架 课程内容 2.1 Agent基础概念 什么是Agent&#xff1f; Agent是一个具有自主性、反应性、社会性和主动性的计…

Day42 51单片机中断系统与8×8 LED点阵扫描显示

day42 51单片机中断系统与88 LED点阵扫描显示一、51单片机引脚位操作 —— sbit 关键字 作用 sbit 是专用于 51 单片机架构的 C 语言扩展关键字&#xff0c;用于定义特殊功能寄存器&#xff08;SFR&#xff09;中的某一位&#xff0c;从而实现对单个 I/O 引脚的直接位操作。 示…