消费者API

目录

  • 独立消费者案例(订阅主题)
  • 独立消费者案例(订阅分区)
  • 消费者组案例

独立消费者案例(订阅主题)

在这里插入图片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("first");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

独立消费者案例(订阅分区)

在这里插入图片描述

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumerPartition {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);ArrayList<TopicPartition> topicPartitions = new ArrayList<>();topicPartitions.add(new TopicPartition("first",0));kafkaConsumer.assign(topicPartitions);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者组案例

测试同一个主题的分区数据,只能由于一个消费者组中的一个一个消费

消费者1

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者2

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer1 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

消费者3

package com.tsg.kafka.consumer;import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;public class CustomConsumer2 {public static void main(String[] args) {Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"master:9092,slave1:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class.getName());properties.put(ConsumerConfig.GROUP_ID_CONFIG,"test");KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<String, String>(properties);ArrayList<String> topics = new ArrayList<String>();topics.add("four");kafkaConsumer.subscribe(topics);while (true){ConsumerRecords<String, String> consumerRecords = kafkaConsumer.poll(Duration.ofSeconds(1));for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {System.out.println(consumerRecord);}}}
}

三个消费者的组ID相同,会形成消费者组,每个消费者消费一个分区数据

生产者发送数据

package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",2,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",1,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}

```c
package com.tsg.kafka.producer;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;public class CustomProducerCallback {public static void main(String[] args) {Properties properties = new Properties();// 连接集群properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "master:9092,slave1:9092");// 指定对应的key和value的序列化类型properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());// 设置acksproperties.put(ProducerConfig.ACKS_CONFIG,"all");// 重试次数retries,默认是int最大值,2^31 - 1properties.put(ProducerConfig.RETRIES_CONFIG,3);properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.tsg.kafka.producer.MyPartitioner");// 创建kafka生产者对象KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);// 发送数据for (int i = 0; i < 5; i++) {kafkaProducer.send(new ProducerRecord<String, String>("four",0,"","分区2"), new Callback() {public void onCompletion(RecordMetadata metadata, Exception exception) {if(exception == null){System.out.println("主题:" +metadata.topic() + " 分区: " +metadata.partition());}}});}// 关闭资源kafkaProducer.close();}
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/93693.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/93693.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

C# NX二次开发:操作按钮控件Button和标签控件Label详解

大家好&#xff0c;今天介绍ug二次开发过程中的一个叫操作按钮的控件&#xff0c;这个控件在块UI编辑器中可以使用。 ​ Button这个控件的属性和方法如下所示&#xff1a; namespace NXOpen.BlockStyler { public class Label : UIBlock { protected intern…

Vue.prototype 的作用

在 Vue.js 中&#xff0c;Vue.prototype 是用来向所有 Vue 实例添加属性或方法的机制。通过它添加的属性或方法可以在所有 Vue 组件实例中通过 this 访问。主要作用添加全局方法或属性&#xff1a;可以在所有组件中使用的工具方法或常量扩展 Vue 功能&#xff1a;添加 Vue 本身…

Javaee 多线程 --进程和线程之间的区别和联系

文章目录进程和线程进程线程进程和线程的区别创建线程的五种写法继承Thread,重写run实现Runnable(接口)&#xff0c;重写run继承Thread,重写run,但是使用匿名内部类实现Runnable(接口)&#xff0c;重写run&#xff0c;但是使用匿名内部类使用lambda表达式请说明Thread类中run和…

企业如何让内部视频仅限指定域名播放,确保视频不被泄露?

在数字化办公时代&#xff0c;企业内部的培训视频、产品演示或机密会议录像等敏感内容&#xff0c;一旦被非法传播或泄露&#xff0c;可能带来严重的商业风险。如何确保这些视频只能在公司官网或指定域名播放&#xff0c;防止被恶意下载、盗链或二次传播&#xff1f;今天介绍一…

端口映射原理操作详解教程:实现外网访问内网服务,本地路由器端口映射公网ip和软件端口映射域名2种方法

端口映射作为一种不同网络间通信的关键网络技术&#xff0c;在远程访问和内外网连接服务需求日益增长的如今&#xff0c;理解端口映射的原理和设置方法是确保网络服务可用性的必要技能。本文将深入探讨端口映射的基本概念、路由器端口映射设置步骤以及无公网IP用端口映射软件映…

【PyTorch】多对象分割项目

对象分割任务的目标是找到图像中目标对象的边界。实际应用例如自动驾驶汽车和医学成像分析。这里将使用PyTorch开发一个深度学习模型来完成多对象分割任务。多对象分割的主要目标是自动勾勒出图像中多个目标对象的边界。 对象的边界通常由与图像大小相同的分割掩码定义&#xf…

SSH 使用密钥登录服务器

用这种方法远程登陆服务器的时候无需手动输入密码 具体步骤 客户端通过 ssh-keygen 生成公钥和私钥 ssh-keygen -t rsa 生成的时候会有一系列问题&#xff0c;根据自己的需要选择就行。生成的结果为两个文件&#xff1a; 上传公钥至服务器&#xff0c;上述两个文件一般在客户…

MySQL 8.4 企业版启用TDE功能和表加密

一、系统环境操作系统&#xff1a;Ubuntu 24.04 数据库:8.4.4-commercial for Linux on x86_64 (MySQL Enterprise Server - Commercial)二、安装TDE组件前提&#xff1a;检查组件文件是否存在ls /usr/lib/mysql/plugin/component_keyring_encrypted_file.so1.配置全局清单文件…

【Altium designer】导出的原理图PDF乱码异常的解决方法

一、有些电源名字无法显示或器件丢失 解决办法 (1)首先AD18以及以上的新版本AD不存在该问题。 (2)其次AD17以及更旧版本的AD很可能遇到该问题,参考如下博客笔记进行操作即可: 大致的操作如下:DXP → Preferences → Schematic → Options里面“Render Text with GDI+”…

4.Ansible自动化之-部署文件到主机

4 - 部署文件到受管主机 实验环境 先通过以下命令搭建基础环境&#xff08;创建工作目录、配置 Ansible 环境和主机清单&#xff09;&#xff1a; # 在控制节点&#xff08;controller&#xff09;上创建web目录并进入&#xff0c;作为工作目录 [bqcontroller ~]$ mkdir web &a…

Vuex的使用

Vuex 超详细使用教程&#xff08;从入门到精通&#xff09;一、Vuex 是什么&#xff1f;Vuex 是专门为 Vue.js 设计的状态管理库&#xff0c;它采用集中式存储管理应用的所有组件的状态。简单来说&#xff0c;Vuex 就是一个"全局变量仓库"&#xff0c;所有组件都可以…

pytorch 数据预处理,加载,训练,可视化流程

流程定义自定义数据集类定义训练和验证的数据增强定义模型、损失函数和优化器训练循环&#xff0c;包括验证训练可视化整个流程模型评估高级功能扩展混合精度训练​分布式训练​{:width“50%” height“50%”} 定义自定义数据集类 # #1. 自定义数据集类 # class CustomImageD…

Prompt工程:OCR+LLM文档处理的精准制导系统

在PDF OCR与大模型结合的实际应用中&#xff0c;很多团队会发现一个现象&#xff1a;同样的OCR文本&#xff0c;不同的Prompt设计会产生截然不同的提取效果。有时候准确率能达到95%&#xff0c;有时候却只有60%。这背后的关键就在于Prompt工程的精细化程度。 &#x1f3af; 为什…

RecSys:粗排模型和精排特征体系

粗排 在推荐系统链路中&#xff0c;排序阶段至关重要&#xff0c;通常分为召回、粗排和精排三个环节。粗排作为精排前的预处理阶段&#xff0c;需要在效果和性能之间取得平衡。 双塔模型 后期融合&#xff1a;把用户、物品特征分别输入不同的神经网络&#xff0c;不对用户、…

spring声明式事务,finally 中return对事务回滚的影响

finally 块中使用 return 是一个常见的编程错误&#xff0c;它会&#xff1a; 跳过正常的事务提交流程。吞掉异常&#xff0c;使错误处理失效 导致不可预测的事务行为Java 中 finally 和 return 的执行机制&#xff1a;1. finally 块的基本特性 在 Java 中&#xff0c;finally …

WPF 打印报告图片大小的自适应(含完整示例与详解)

目标&#xff1a;在 FlowDocument 报告里&#xff0c;根据 1~6 张图片的数量&#xff0c; 自动选择 2 行 3 列 的最佳布局&#xff1b;在只有 1、2、4 张时保持“占满感”&#xff0c;打印清晰且不变形。规则一览&#xff1a;1 张 → 占满 23&#xff08;大图居中&#xff09;…

【AI大模型前沿】百度飞桨PaddleOCR 3.0开源发布,支持多语言、手写体识别,赋能智能文档处理

系列篇章&#x1f4a5; No.文章1【AI大模型前沿】深度剖析瑞智病理大模型 RuiPath&#xff1a;如何革新癌症病理诊断技术2【AI大模型前沿】清华大学 CLAMP-3&#xff1a;多模态技术引领音乐检索新潮流3【AI大模型前沿】浙大携手阿里推出HealthGPT&#xff1a;医学视觉语言大模…

迅为RK3588开发板Android12 制作使用系统签名

在 Android 源码 build/make/target/product/security/下存放着签名文件&#xff0c;如下所示&#xff1a;将北京迅为提供的 keytool 工具拷贝到 ubuntu 中&#xff0c;然后将 Android11 或 Android12 源码build/make/target/product/security/下的 platform.pk8 platform.x509…

Day08 Go语言学习

1.安装Go和Goland 2.新建demo项目实践语法并使用git实践版本控制操作 2.1 Goland配置 路径**&#xff1a;** GOPATH workspace GOROOT golang 文件夹&#xff1a; bin 编译后的可执行文件 pkg 编译后的包文件 src 源文件 遇到问题1&#xff1a;运行 ‘go build awesomeProject…

Linux-文件创建拷贝删除剪切

文章目录Linux文件相关命令ls通配符含义touch 创建文件命令示例cp 拷贝文件rm 删除文件mv剪切文件Linux文件相关命令 ls ls是英文单词list的简写&#xff0c;其功能为列出目录的内容&#xff0c;是用户最常用的命令之一&#xff0c;它类似于DOS下的dir命令。 Linux文件或者目…