【Kafka使用方式以及原理】

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

Kafka生产者发送消息的方式

Kafka生产者发送消息主要通过以下三种方式:

同步发送
生产者发送消息后,会阻塞等待Broker的响应,确认消息是否成功写入。这种方式可靠性高,但吞吐量较低。代码示例:

ProducerRecord<String, String> record = new ProducerRecord<>("topic", "key", "value");
RecordMetadata metadata = producer.send(record).get();

异步发送
生产者发送消息后立即返回,通过回调函数处理Broker的响应。这种方式吞吐量高,但需要自行处理失败情况。代码示例:

producer.send(record, (metadata, exception) -> {if (exception != null) {// 处理失败逻辑}
});

异步发送(无回调)
生产者直接发送消息而不关心结果,适用于对可靠性要求不高的场景。吞吐量最高,但可能丢失消息。代码示例:

producer.send(record);

Kafka生产者发送消息的特点

分区策略
生产者可以通过指定分区键(Key)控制消息写入的分区。若未指定Key,则采用轮询策略分配分区。支持自定义分区器(Partitioner)。

消息确认机制(acks)

  • acks=0:生产者不等待Broker确认,消息可能丢失。
  • acks=1:Leader副本写入成功后即返回响应。
  • acks=all/-1:需所有ISR副本写入成功,可靠性最高。

批量发送(Batch)
生产者会将多条消息合并为一个批次发送,减少网络开销。通过linger.msbatch.size参数控制批处理行为。

消息重试
网络异常或Leader切换时,生产者会自动重试发送消息。可通过retriesretry.backoff.ms参数配置重试策略。

幂等性与事务

  • 幂等性:通过启用enable.idempotence=true避免消息重复发送。
  • 事务:支持跨分区原子性写入,需配置transactional.id

缓冲区机制
生产者维护一个内存缓冲区(buffer.memory),暂存待发送消息。缓冲区满时,发送调用会被阻塞或抛出异常。

Kafka消费者的基本工作流程

Kafka消费者通过订阅主题(Topic)或特定分区(Partition)来消费消息。消费者组(Consumer Group)机制允许并行处理消息,每个消费者组内的消费者独立消费不同分区的数据。

消费者启动时会向Kafka集群发送元数据请求,获取订阅主题的分区信息。消费者与分区建立连接后,通过轮询(Poll)机制从分区拉取消息。消费者会定期提交偏移量(Offset)到Kafka,记录消费进度。

消费者配置关键参数

  • bootstrap.servers: Kafka集群的地址列表。
  • group.id: 消费者所属的组名称。
  • auto.offset.reset: 当无初始偏移量时如何处理(earliestlatest)。
  • enable.auto.commit: 是否自动提交偏移量(默认true)。
  • max.poll.records: 单次Poll返回的最大消息数。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

消息消费模式

订阅主题模式
消费者订阅一个或多个主题,Kafka自动分配分区:

consumer.subscribe(Arrays.asList("topic1", "topic2"));
while (true) {ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record : records) {System.out.printf("offset=%d, key=%s, value=%s%n", record.offset(), record.key(), record.value());}
}

手动分配分区模式
直接指定消费的分区,绕过消费者组协调:

TopicPartition partition = new TopicPartition("topic1", 0);
consumer.assign(Arrays.asList(partition));

偏移量管理

自动提交
配置enable.auto.commit=true,Kafka定期提交偏移量(默认5秒一次)。

手动同步提交
精确控制提交时机,确保消息处理完成后再提交:

consumer.commitSync();

手动异步提交
非阻塞式提交,需处理回调:

consumer.commitAsync((offsets, exception) -> {if (exception != null) {System.err.println("Commit failed: " + offsets);}
});

消费者再平衡(Rebalance)

当消费者组内成员变化(如新增或下线消费者)时,Kafka触发再平衡,重新分配分区。可通过ConsumerRebalanceListener接口实现自定义逻辑:

consumer.subscribe(Arrays.asList("topic1"), new ConsumerRebalanceListener() {@Overridepublic void onPartitionsRevoked(Collection<TopicPartition> partitions) {// 分区被回收前的处理(如提交偏移量)}@Overridepublic void onPartitionsAssigned(Collection<TopicPartition> partitions) {// 新分区分配后的处理(如恢复状态)}
});

处理消费延迟与积压

  • 调整max.poll.records减少单次Poll的数据量。
  • 优化消息处理逻辑,避免阻塞Poll线程。
  • 增加消费者实例数量(不超过分区数)。
  • 监控消费者延迟指标(如consumer_lag)。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87286.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87286.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【ChatTTS】ChatTTS使用体验

ChatTTS 使用体验&#xff1a;初始使用真的十分惊艳。可以尝试官网调用试一试。部署的好处是&#xff0c;遇到好听的音色可以把参数自动存储在本地。 苦恼&#xff1a;相同参数生成的音色不一致&#xff0c;需要多次调整&#xff0c;但最终效果非常满意。 ⭐ GitHub Star数变化…

华为云Flexus+DeepSeek征文| 基于华为云Dify-LLM高可用平台开发运维故障处理智能体

华为云FlexusDeepSeek征文&#xff5c; 基于华为云Dify-LLM高可用平台开发运维故障处理智能体 1. 概述2. 创建工作流2.1. 创建开始节点2.2. 创建搜索节点2.3. 创建LLM大模型节点2.4. 创建结束节点 3. 测试工作流4. 应用发布5. 总结 1. 概述 Dify是一款开源的LLM应用开发平台&am…

vue中scss下载方式与引入方式

1. scss下载 npm install sass-loader --save-devnpm install node-sass --save-dev 2. 在style标签里面加入lang“scss” 测试下&#xff01;

Day04_C语言IO进程线程

01.思维导图 02.创建一个分支线程&#xff0c;在主线程中拷贝文件的前一部分&#xff0c;主线程拷贝文件的后一部分 #include <25051head.h> void* callback(void *arg) {off_t size*(off_t*)arg;//打开一个文件读//打开一个文件写int fd_r2open("./my.txt",O_…

金牛区数字文创梦工厂:国际数字影像产业园的先行服务

在金牛区数字文创梦工厂的实践中&#xff0c;先行服务作为创新引擎&#xff0c;为企业提供预启动阶段的全方位支持。其核心理念是通过前置化咨询和资源整合&#xff0c;降低试错成本&#xff0c;赋能产业升级。 先行服务的三大核心优势 通过主动介入项目启动前环节&#xff0…

使用RSA对网址url栏加密以及二维码的网址内容加密

JSEncrypt 库 &#xff1a; - 引入了 jsencrypt/bin/jsencrypt.min - 需要在项目中安装 jsencrypt 包 npm install jsencrypt import JSEncrypt from jsencrypt/bin/jsencrypt.min// 密钥对生成 http://web.chacuo.net/netrsakeypairconst publicKey MFwAAQconst privateKe…

如何用 Kafka Manager 实现 Kafka 集群全面监控

1. 前言:为什么需要 Kafka 集群监控? Apache Kafka 是现代大数据架构中不可或缺的组件,广泛用于日志收集、流处理、消息队列等场景。随着 Kafka 集群规模的增长和业务复杂度的提升,对 Kafka 的实时监控变得尤为重要。 1.1 Kafka 在大数据架构中的核心地位 Kafka 被广泛应…

MyBatis架构原理解析:核心对象与执行流程深度剖析

一、开篇&#xff1a;理解MyBatis的核心价值 在当今Java持久层框架生态中&#xff0c;MyBatis凭借其灵活的SQL控制能力和简洁的ORM实现成为企业级应用的首选。与JPA的全自动ORM不同&#xff0c;MyBatis采用半自动化映射理念&#xff0c;在保持SQL灵活性的同时&#xff0c;通过…

移远通信携手高通:以全栈车载解决方案,共绘智能出行新蓝图

6月26日至27日&#xff0c;2025高通汽车技术与合作峰会于苏州盛大举办。本次峰会以 “我们一起&#xff0c;行稳智远” 为主题&#xff0c;全方位呈现智能汽车全栈技术、全产业链生态与全场景体验。作为高通长期稳定的战略合作伙伴&#xff0c;移远通信携全栈车载智能解决方案深…

拿来就能用的python 课程 1

拿来就能用的python 课程 引言 python是很多人入门计算机语言的首选。 但是繁文缛节&#xff0c;很多人从怎么装python开始学起&#xff0c;然后python计算&#xff0c;然后什么是函数&#xff0c;然后什么是类&#xff0c;然后就因为太难放弃了。&#xff08;说的是不是你&a…

openssh-server

默认地&#xff0c;Ubuntu桌面版不带SSH服务器 1 检查服务是否存在 ls /usr/sbin/sshd2 安装服务 apt install openssh-server3 关闭防火墙 ufw disable 4 启动服务 service ssh start

html虚拟滚动,解决dom渲染过多卡顿的问题

<!DOCTYPE html> <html lang"zh"><head><meta charset"UTF-8" /><title>极简虚拟滚动</title><style>.container {width: 300px;height: 300px;border: 1px solid #ccc;overflow: auto;position: relative;}.pl…

华锐互动:全方位定制化 VR 内容制作服务流程剖析​

华锐互动始终坚持以客户为中心&#xff0c;为客户提供全方位、定制化的 VR 内容制作服务。从项目的最初阶段开始&#xff0c;华锐互动就会深入了解客户的需求和目标&#xff0c;与客户进行充分的沟通和交流&#xff0c;挖掘项目背后的故事和文化内涵&#xff0c;然后根据客户的…

50天50个小项目 (Vue3 + Tailwindcss V4) ✨ | DragNDrop(拖拽占用组件)

&#x1f4c5; 我们继续 50 个小项目挑战&#xff01;—— DragNDrop组件 仓库地址&#xff1a;https://github.com/SunACong/50-vue-projects 项目预览地址&#xff1a;https://50-vue-projects.vercel.app/ 使用 Vue 3 的 Composition API 和 <script setup> 语法结合…

springboot应用即使使用了连接池,MySQL数据库仍然有大量sleep状态的连接

springboot应用即使使用了连接池&#xff0c;MySQL数据库仍然有大量sleep状态的连接 问题背景概念理解MySQL配置参数wait_timeout概念Hikari配置参数&#xff08;项目使用hikari作为数据库连接池&#xff09; 实践出真知总结和解决思路 问题背景 近期客户生产环境报&#xff1…

windows下安装和使用git

本文为windows下git的下载安装和使用。 git下载和安装 参考&#xff1a; windows安装git&#xff08;全网最详细&#xff0c;保姆教程&#xff09;-CSDN博客 【学了就忘】Git介绍 — 4.Git的安装 - 简书 先解决下载时的一些疑惑&#xff1a; 选择哪个架构&#xff1f; 电脑ARM6…

借助工具给外语视频加双语字幕的实用指南​

给外语视频配上双语字幕&#xff0c;能让不同语言背景的观众更轻松理解内容&#xff0c;也能让视频在传播时更受欢迎。现在有不少智能工具能帮我们高效完成这项工作&#xff0c;比如 ViiTor AI 平台&#xff0c;它在处理双语字幕方面有不少实用功能&#xff0c;下面就结合其功能…

Claude 4 与 Gemini 2.5 Pro:开发者深度比较

Claude 4 与 Gemini 2.5 Pro&#xff1a;开发者深度比较 在使用相同的编码挑战对Claude Sonnet 4和Gemini 2.5 Pro Preview进行广泛的正面测试后&#xff0c;我发现了每个开发人员都应该了解的显著性能差异。我的发现揭示了执行速度、成本效率以及最重要的&#xff0c;精确执行…

怎么进入9870端口

在实验时想进入9870端口查看safe状态 但是输入localhost:9870后显示&#xff1a; 首先使用jps确认hadoop状态&#xff1a; 从 jps 的输出来看&#xff0c;Hadoop 的核心服务&#xff08;NameNode、DataNode、ResourceManager、NodeManager 等&#xff09;都已经正常运行&…

Windows、Linux、macOS 三大系统安装 Git 的常见坑点及解决方案,附带 具体操作示例,帮助新手快速避坑

以下是 Windows、Linux、macOS 三大系统安装 Git 的常见坑点及解决方案,附带 具体操作示例,帮助新手快速避坑。 一、Windows 系统安装 Git 1. 安装路径含空格或中文 坑点:默认路径 C:\Program Files\Git 可能导致某些脚本报错。 解决:自定义路径(如 D:\DevTools\Git)。…