kafka学习笔记(三、消费者Consumer使用教程——消费性能多线程提升思考)

在这里插入图片描述


1.简介

KafkaConsumer是非线程安全的,它定义了一个acquire()方法来检测当前是否只有一个线程在操作,如不是则会抛出ConcurrentModifcationException异常。

acquire()可以看做是一个轻量级锁,它仅通过线程操作计数标记的方式来检测线程是否发生了并发操作,以此保证只有一个线程在操作。acquire()方法和release()方法成对出现,表示相应的加锁操作和解锁操作。

KafkaConsumer虽然是单线程的执行方式,但是在某些情况下如:生产者发送消息的速度远大于消费者消费的速度,这样长时间可能会造成消息的丢失,此时我们就需要消费者采用多线程消费的方式增加消费速度。

2.多线程实现的方式

2.1.线程封闭多线程

即为每个线程实例化一个KafkaConsumer,如图所示,一个线程对应一个KafkaConsumer实例,所有的消费线程都属于同一个消费者组。

这种方式的并发度受限分区的实际个数

在这里插入图片描述
实现代码示例:

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}// 消费线程public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;public KafkaConsumerThread(Properties prop, String topic) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Arrays.asList(topic));}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));for (ConsumerRecord<String, String> record: records) {// 处理消息}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}
}

2.1.消息处理模块多线程

此方法是对上面方法的进一步优化,在消息处理模块增加多线程来处理消息,进一步提升消息消费的速度。
在这里插入图片描述

public class kafkaConsumer1 {public void ConsuermMultithread1() {Properties props = initConsifg(); // 此处初始化消费者配置参数省略int consumerThreadNum = 5;for (int i = 0; i < consumerThreadNum; i++) {new KafkaConsumerThread(props, topic).start();}}public static class KafkaConsumerThread extends Thread {private KafkaConsumer<String, String> kafkaConsumer;private ExecutorService executorService;private int threadNumber;public KafkaConsumerThread(Properties prop, String topic, int threadNumber) {this.kafkaConsumer = new KafkaConsumer<>(prop);this.kafkaConsumer.subscribe(Collections.singletonList(topic));this.threadNumber = threadNumber;executorService = new ThreadPoolExecutor(threadNumber, threadNumber,0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(1000), new ThreadPoolExecutor.CallerRunsPolicy());}@Overridepublic void run() {try {while (true) {ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(100));if (!records.isEmpty()) {executorService.submit(new RecordsHandler(records));}}} catch (Exception e) {e.printStackTrace();} finally {kafkaConsumer.close();}}}public static class RecordsHandler extends Thread {public final ConsumerRecords<String, String> records;public RecordsHandler(ConsumerRecords<String, String> records) {this.records = records;}@Overridepublic void run() {/// 处理records  }}
}

此方法需要引入一个共享的offsets来参与提交。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/83429.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

百度蜘蛛池的作用是什么?技术@baidutopseo

百度蜘蛛池的作用 百度蜘蛛池是一种用于集中管理和调度百度搜索引擎蜘蛛&#xff08;爬虫&#xff09;的技术工具。其主要目的是优化网站内容的收录效率&#xff0c;帮助搜索引擎更高效地发现和抓取网页内容。技术baidutopseo 提升收录效率 通过集中大量百度蜘蛛的访问请求&…

黑马Java面试笔记之 消息中间件篇(Kafka)

一. Kafka保证消息不丢失 Kafka如何保证消息不丢失 使用Kafka在消息的收发过程中都会出现消息丢失&#xff0c;Kafka分别给出了解决方案 生产者发送消息到Brocker丢失消息在Brocker中存储丢失消费者从Brocker接收消息丢失 1.1 生产者发送消息到Brocker丢失 设置异步发送 消息…

dis css port brief 命令详细解释

华为交换机命令 display css port brief 详细解释 display css port brief 是华为交换机中用于 快速查看堆叠&#xff08;CSS&#xff0c;Cluster Switch System&#xff09;端口状态及关键参数 的命令&#xff0c;适用于日常运维、堆叠链路健康检查及故障定位。以下是该命令的…

Redis 缓存问题及其解决方案

1. 缓存雪崩 概念&#xff1a;缓存雪崩是指在缓存层出现大范围缓存失效或缓存服务器宕机的情况下&#xff0c;大量请求直接打到数据库&#xff0c;导致数据库压力骤增&#xff0c;甚至可能引发数据库宕机。 影响&#xff1a;缓存雪崩会导致系统性能急剧下降&#xff0c;甚至导…

使用Python进行函数作画

前言 因为之前通过deepseek绘制一下卡通的人物根本就不像&#xff0c;又想起来之前又大佬通过函数绘制了一些图像&#xff0c;想着能不能用Python来实现&#xff0c;结果发现可以&#xff0c;不过一些细节还是需要自己调整&#xff0c;deepseek整体的框架是没有问题&#xff0…

关于list集合排序的常见方法

目录 1、list.sort() 2、Collections.sort() 3、Stream.sorted() 4、进阶排序技巧 4.1 空值安全处理 4.2 多字段组合排序 4.3. 逆序 5、性能优化建议 5.1 并行流加速 5.2 原地排序 6、最佳实践 7、注意事项 前言 Java中对于集合的排序操作&#xff0c;分别为list.s…

Java高级 | (二十二)Java常用类库

参考&#xff1a;Java 常用类库 | 菜鸟教程 一、核心Java类库 二、常用第三方库 以下是 Java 生态系统中广泛使用的第三方库&#xff1a; 类别库名称主要功能官方网站JSON 处理JacksonJSON 序列化/反序列化https://github.com/FasterXML/jacksonGsonGoogle 的 JSON 库https:…

几种常用的Agent的Prompt格式

一、基础框架范式&#xff08;Google推荐标准&#xff09; 1. 角色与职能定义 <Role_Definition> 你是“项目专家”&#xff08;Project Pro&#xff09;&#xff0c;作为家居园艺零售商的首席AI助手&#xff0c;专注于家装改造领域。你的核心使命&#xff1a; 1. 协助…

蛋白质结构预测软件openfold介绍

openfold 是一个用 Python 和 PyTorch 实现的 AlphaFold2 的开源复现版&#xff0c;旨在提升蛋白质结构预测的可复现性、可扩展性以及研究友好性。它允许研究者在不开源 DeepMind 原始代码的情况下&#xff0c;自由地进行蛋白结构预测的训练和推理&#xff0c;并支持自定义模型…

AD转嘉立创EDA

可以通过嘉立创文件迁移助手进行格式的转换 按照它的提示我们整理好文件 导出后是这样的&#xff0c;第一个文件夹中有原理图和PCB&#xff0c;可以把它们压缩成一个压缩包 这个时候我们打开立创EDA&#xff0c;选择导入AD 这样就完成了

MySQL(50)如何使用UNSIGNED属性?

在 MySQL 中&#xff0c;UNSIGNED 属性用于数值数据类型&#xff08;如 TINYINT、SMALLINT、MEDIUMINT、INT 和 BIGINT&#xff09;&#xff0c;表示该列只能存储非负整数。使用 UNSIGNED 属性可以有效地扩展列的正整数范围&#xff0c;因为它不需要为负数保留空间。 1. 定义与…

什么是链游,链游系统开发价格以及方案

2025 Web3钱包开发指南&#xff1a;从多版本源码到安全架构实战 在数字资产爆发式增长的今天&#xff0c;Web3钱包已成为用户进入链上世界的核心入口。作为开发者&#xff0c;如何高效构建安全、跨链、可扩展的钱包系统&#xff1f;本文结合前沿技术方案与开源实践&#xff0c…

文件IO流

IO使用函数 标准IO文件IO(低级IO)打开fopen, freopen, fdopenopen关闭fcloseclose读getc, fgetc, getchar, fgets, gets, fread printf fprintfread写putc, fputc, putchar, fputs, puts, fwrite scanf fscanfwrite操作文件指针fseeklseek其它fflush rewind ftell 文件描述符 …

云原生DMZ架构实战:基于AWS CloudFormation的安全隔离区设计

在云时代,传统的DMZ(隔离区)概念已经演变为更加灵活和动态的架构。本文通过解析一个实际的AWS CloudFormation模板,展示如何在云原生环境中构建现代化的DMZ安全架构。 1. 云原生DMZ的核心理念 传统DMZ是网络中的"缓冲区",位于企业内网和外部网络之间。而在云环境…

一、虚拟货币概述

1. 定义 - 虚拟货币是一种基于网络技术、加密技术和共识机制的数字货币&#xff0c;它不依赖传统金融机构发行&#xff0c;而是通过计算机算法生成&#xff0c;例如比特币、以太坊等。 2. 特点 - 去中心化&#xff1a;没有一个单一的机构或个人控制整个虚拟货币系统&#xff0c…

Make All Equal

给定一个循环数组 a1,a2,…,ana1​,a2​,…,an​。 你可以对 aa 至多执行 n−1n−1 次以下操作&#xff1a; 设 mm 为 aa 的当前大小&#xff0c;你可以选择任何两个相邻的元素&#xff0c;其中前一个不大于后一个&#xff08;特别地&#xff0c;amam​ 和 a1a1​ 是相邻的&a…

任务中心示例及浏览器强制高效下载实践

1. 效果展示 这里的进度展示&#xff0c;可以通过我们之前讲到的Vue3实现类ChatGPT聊天式流式输出(vue-sse实现) SSE技术实现&#xff0c;比如用户点击全量下载时&#xff0c;后台需要将PDF文件打包为ZIP文件&#xff0c;由于量较大&#xff0c;需要展示进度&#xff0c;用户点…

SpringBoot整合Flowable【08】- 前后端如何交互

引子 在第02篇中&#xff0c;我通过 Flowable-UI 绘制了一个简单的绩效流程&#xff0c;并在后续章节中基于这个流程演示了 Flowable 的各种API调用。然而&#xff0c;在实际业务场景中&#xff0c;如果要求前端将用户绘制的流程文件发送给后端再进行解析处理&#xff0c;这种…

2025 Java面试大全技术文章大纲

2025 Java面试大全技术文章大纲 基础篇 Java核心语法 数据类型与包装类自动装箱与拆箱原理String、StringBuffer、StringBuilder区别final关键字作用场景 面向对象特性 多态的实现机制抽象类与接口的异同设计模式&#xff1a;单例的七种写法泛型擦除与桥接方法 进阶篇 J…

Python aiohttp 全面指南:异步HTTP客户端/服务器框架

边写代码零食不停口 盼盼麦香鸡味块 、卡乐比&#xff08;Calbee&#xff09;薯条三兄弟 独立小包、好时kisses多口味巧克力糖、老金磨方【黑金系列】黑芝麻丸 边写代码边贴面膜 事业美丽两不误 DR. YS 野森博士【AOUFSE/澳芙雪特证】377专研美白淡斑面膜组合 优惠劵 别光顾写…