基于Kafka实现动态监听topic功能

生命无罪,健康万岁,我是laity。

我曾七次鄙视自己的灵魂:

第一次,当它本可进取时,却故作谦卑;

第二次,当它在空虚时,用爱欲来填充;

第三次,在困难和容易之间,它选择了容易;

第四次,它犯了错,却借由别人也会犯错来宽慰自己;

第五次,它自由软弱,却把它认为是生命的坚韧;

第六次,当它鄙夷一张丑恶的嘴脸时,却不知那正是自己面具中的一副;

第七次,它侧身于生活的污泥中,虽不甘心,却又畏首畏尾。

基于Kafka实现动态监听topic功能

业务场景:导条根据各家接口进行数据分发其中包含动态kafka-topic,各家通过监听topic实现获取数据从而实现后续业务。

实现逻辑

pom

yaml 方案1 接收的是String

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.StringDeserializerkey-deserializer: org.apache.kafka.common.serialization.StringDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

yaml 方案2 接收的是Byte

  kafka:bootstrap-servers: youKafkaIp:9092 # 指定 Kafka Broker 地址,可以设置多个,以逗号分隔listener:type: batchconsumer:enable-auto-commit: falsevalue-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerkey-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializerauto-offset-reset: earliestgroup-id: consumer-sbproducer:value-serializer: org.apache.kafka.common.serialization.StringSerializerkey-serializer: org.apache.kafka.common.serialization.StringSerializer

收消息CODE

KafkaConfig.java

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.annotation.KafkaListenerAnnotationBeanPostProcessor;
import org.springframework.kafka.annotation.KafkaListenerConfigurer;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;import java.util.HashMap;
import java.util.Map;/*** @author laity*/
@EnableKafka
@Configuration
public class KafkaConfig {// 解决 Could not create message listener - MessageHandlerMethodFactory not set  TODO:WWS 不好使/*@Beanpublic KafkaListenerAnnotationBeanPostProcessor kafkaListenerAnnotationBeanPostProcessor() {KafkaListenerAnnotationBeanPostProcessor processor = new KafkaListenerAnnotationBeanPostProcessor();processor.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());return processor;}*/@Beanpublic ConsumerFactory<String, String> consumerFactory() {Map<String, Object> map = new HashMap<>();map.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "youKafkaIp:9092");map.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-laity");map.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());map.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());return new DefaultKafkaConsumerFactory<String, String>(map);}@Beanpublic ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() {ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();factory.setConsumerFactory(consumerFactory());factory.setConcurrency(5);// new DefaultMessageHandlerMethodFactory()return factory;}// implements KafkaListenerConfigurer + 解决 Could not create message listener - MessageHandlerMethodFactory not set/*@Overridepublic void configureKafkaListeners(KafkaListenerEndpointRegistrar registrar) {registrar.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());}*/
}

KafkaListenerController.java

package cn.iocoder.yudao.server.controller.admin.szbl;import cn.iocoder.yudao.framework.common.pojo.CommonResult;
import cn.iocoder.yudao.server.controller.admin.szbl.common.config.kafka.MyComponent;
import cn.iocoder.yudao.server.controller.admin.szbl.vo.InitSceneVO;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;import javax.annotation.security.PermitAll;/*** @author laity*/
@RestController
@RequestMapping("/kafka")
public class KafkaListenerController {private final MyComponent component;public KafkaListenerController(MyComponent component) {this.component = component;}private String topic;// 用于接收导条分发数据接口@PostMapping("/reception")@PermitAllpublic CommonResult<Boolean> putAwayL(@RequestBody InitSceneVO vo) {// …… 业务逻辑// 去执行 监听固定的topiccomponent.startListening(vo.getGzTopicName());return CommonResult.success(true);}
}

DynamicKafkaListenerService.java

import org.springframework.aop.framework.Advised;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;import java.lang.reflect.Method;
import java.time.LocalDateTime;
import java.util.Objects;/*** @author laity 动态管理Kafka监听器*/
@Service
public class DynamicKafkaListenerService {private final KafkaListenerEndpointRegistry registry;private final ConcurrentKafkaListenerContainerFactory<String, String> factory;@Autowiredpublic DynamicKafkaListenerService(KafkaListenerEndpointRegistry registry, ConcurrentKafkaListenerContainerFactory<String, String> factory) {this.registry = registry;this.factory = factory;}public void addListener(String topic, String groupId, Object bean, Method method) {if (AopUtils.isAopProxy(bean)) {try {bean = ((Advised) bean).getTargetSource().getTarget();} catch (Exception e) {throw new RuntimeException(e);}}MethodKafkaListenerEndpoint<String, String> endpoint = new MethodKafkaListenerEndpoint<>();assert bean != null;endpoint.setBean(bean);endpoint.setMethod(method);endpoint.setTopics(topic);endpoint.setGroup(groupId);endpoint.setId(method.getName() + "_" + LocalDateTime.now());endpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory()); // 之前怎么点都点不出来这个属性 突然又出来了……无语registry.registerListenerContainer(endpoint, factory, true); // 指定容器工厂}public void removeListener(String beanName) {// 断言Objects.requireNonNull(registry.getListenerContainer(beanName)).stop();registry.unregisterListenerContainer(beanName);}
}

BlueKafkaConsumer.java

import org.springframework.stereotype.Component;/*** @author laity*/
@Component
public class BlueKafkaConsumer {// @KafkaListener(topics = "#{__listener.getTopicName()}", groupId = "consumer-laity")public void listen(Object record) {System.out.println("======================= 接收动态KafkaTopics Received message ========================");System.out.println(record.toString());}}

MyComponent.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;import javax.annotation.PostConstruct;
import java.lang.reflect.Method;/*** @author laity*/
@Component
public class MyComponent {private final DynamicKafkaListenerService kafkaListenerService;private final BlueKafkaConsumer blueKafkaConsumer;@Autowiredpublic MyComponent(DynamicKafkaListenerService kafkaListenerService, BlueKafkaConsumer blueKafkaConsumer) {this.kafkaListenerService = kafkaListenerService;this.blueKafkaConsumer = blueKafkaConsumer;}public void startListening(String topic) {try {Method blueMethod = BlueKafkaConsumer.class.getMethod("listen", Object.class);kafkaListenerService.addListener(topic, "consumer-laity", blueKafkaConsumer, blueMethod);} catch (NoSuchMethodException e) {throw new RuntimeException(e);}}public void stopListening(String beanName) {kafkaListenerService.removeListener(beanName);}// init@PostConstruct // 这个是服务启动时调用 但我想要的时实时可变的public void init() {}}

世界上最可贵的两个词,一个叫认真,一个叫坚持,认真的人改变自己,坚持的人改变命运,有些事情不是看到了希望才去坚持,而是坚持了才有希望。我是Laity,正在前进的Laity。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/pingmian/90784.shtml
繁体地址,请注明出处:http://hk.pswp.cn/pingmian/90784.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

机械学习初识--什么是机械学习--机械学习有什么重要算法

一、什么是机械学习机器学习&#xff08;Machine Learning&#xff09;是人工智能&#xff08;AI&#xff09;的一个重要分支&#xff0c;它使计算机能够通过数据自动学习规律、改进性能&#xff0c;并在没有明确编程的情况下完成特定任务。其核心思想是让机器从数据中 “学习”…

普通大学生大三这一年的想法

目录 大三期间的经历与反思 公益活动&#xff1a;社会责任感的体现 比赛&#xff1a;个人成长的助推器 培训与思想提升 大学教育的本质与人才培养 构建自我的道与未来规划 大学教育的未来与个人定位 结语 大三期间的经历与反思 大三&#xff0c;大学生活的分水岭&#…

Python——入门

目录 变量 变量类型 动态类型 注释 输出输入 运算符 算术运算符 关系运算符 逻辑运算符 赋值运算符 条件语句 循环语句 函数 函数作用域 函数嵌套调用 函数默认参数 关键字参数 列表 切片 列表遍历 新增元素 查找元素 删除元素 列表拼接 元组…

华为荣耀部分机型从鸿蒙降回EMUI的一种方法

一、准备说明 1、这里介绍使用华为手机助手、海外代理软件结合固件将部分华为荣耀手机鸿蒙系统降级回EMUI系 统的一种方式&#xff1b; 2、需要降级的手机需要再出厂时内置系统为EMUI&#xff0c;出厂时为鸿蒙系统的无法进行降级操作&#xff1b; 3、降级有风险&#xff0…

maven <dependencyManagement>标签的作用

作用 dependencyManagement标签的作用&#xff1a;在父工程pom文件中声明依赖&#xff0c;但不引入&#xff1b;在子工程中用到声明的依赖时&#xff0c;可以不加依赖的版本号&#xff0c;这样可以统一管理工程中用到的依赖版本。 示例 先创建一个项目 dependencyManagement-de…

JSON格式化与结构对比

说明 功能格式化json字符串为最简格式&#xff0c;并标识值类型&#xff1b;比对json字符串结构。第三方依赖fastjson: 用于解析json、判断json值类型&#xff1b;springframework自带的字符串判断&#xff0c;可以不依赖该方法&#xff0c;改为自行实现&#xff1b;slf4j: 用于…

编程与数学 03-002 计算机网络 03_物理层基础

编程与数学 03-002 计算机网络 03_物理层基础一、物理层的作用与任务&#xff08;一&#xff09;传输媒体的类型&#xff08;二&#xff09;信号的传输方式二、数据编码技术&#xff08;一&#xff09;数字数据的数字信号编码&#xff08;二&#xff09;模拟数据的数字信号编码…

c语言--文件操作

思维导图:1. 为什么使用文件&#xff1f; 如果没有文件&#xff0c;我们写的程序的数据是存储在电脑的内存中&#xff0c;如果程序退出&#xff0c;内存回收&#xff0c;数据就丢失了&#xff0c;等再次运⾏程序&#xff0c;是看不到上次程序的数据的&#xff0c;如果要将数据进…

SQL中的占位符、@Param注解和方法参数

代码中出现的多个 username 和 password 代表不同层面的变量&#xff0c;具体含义如下&#xff08;按执行顺序&#xff09;&#xff1a;### 1. Param("username") String username - 位置 &#xff1a;方法参数前的注解 - 作用 &#xff1a;- Param("username&q…

【SpringAI实战】FunctionCalling实现企业级自定义智能客服

一、前言 二、实现效果 三、代码实现 3.1 后端实现 3.2 前端实现 一、前言 Spring AI详解&#xff1a;【Spring AI详解】开启Java生态的智能应用开发新时代(附不同功能的Spring AI实战项目)-CSDN博客 二、实现效果 一个24小时在线的AI智能客服&#xff0c;可以给用户提供培…

kotlin基础【2】

变量类型var 和 val 的核心区别&#xff1a;关键字含义能否重新赋值类似概念&#xff08;Java&#xff09;varvariable&#xff08;可变变量&#xff09;可以普通变量&#xff08;无 final&#xff09;valvalue&#xff08;不可变变量&#xff09;不可以被 final 修饰的变量var…

【Spring AI】阿里云DashScope灵积模型

DashScope&#xff08;灵积模型&#xff09;是阿里云提供的大模型服务平台&#xff0c;集成了阿里自研的 通义千问&#xff08;Qwen&#xff09;系列大语言模型&#xff08;LLM&#xff09;以及多模态模型&#xff0c;为企业与开发者提供开箱即用的 AI 能力。官网地址 https://…

Rust Web框架性能对比与实战指南

Rust Actix Web Rust Web 框架的实用对比分析 以下是 Rust Web 框架的实用对比分析,涵盖主要框架(如 Actix-web、Rocket、Warp、Axum 等)的常见使用场景示例,按功能分类整理: 基础路由设置 Actix-web use actix_web::{get, App, HttpResponse, HttpServer, Responder}…

【解决vmware ubuntu不小心删boot分区,进不去系统】

如果仍然提示 Unable to locate package testdisk&#xff0c;有可能是源中不包含该工具&#xff08;LiveCD 使用的是“最小环境”&#xff09;。 &#x1fa9b; 解决方法&#xff1a;切换到国内完整软件源&#xff08;推荐&#xff09; 编辑 sources.list&#xff1a; sudo na…

04-netty基础-Reactor三种模型

1 基本概念Reactor模型是一种事件驱动&#xff08;Event-Driven&#xff09;的设计模式&#xff0c;主要用于高效处理高并发、I/O密集型场景&#xff08;如网络、服务器、分布式等&#xff09;。其核心思想就是集中管理事件&#xff0c;将I/O操作与业务逻辑解耦&#xff0c;避免…

踩坑无数!NFS服务从入门到放弃再到真香的血泪史

前言 说起NFS&#xff0c;我估计很多搞运维的兄弟都有一肚子话要说。这玩意儿吧&#xff0c;看起来简单&#xff0c;用起来坑多&#xff0c;但是真正搞明白了又觉得挺香的。 前几天有个朋友问我&#xff0c;说他们公司要搭建一个文件共享系统&#xff0c;问我推荐什么方案。我…

矩阵谱分解的证明及计算示例

1. 矩阵谱分解的条件矩阵的谱分解&#xff08;也称为特征分解&#xff09;是将一个矩阵分解为一系列由其特征向量和特征值构成的矩阵乘积的过程。进行谱分解的前提条件包括&#xff1a;<1.> 矩阵是可对角化的&#xff08;Diagonalizable&#xff09;&#xff0c;即矩阵存…

Leetcode 07 java

169. 多数元素 给定一个大小为 n 的数组 nums &#xff0c;返回其中的多数元素。 多数元素是指在数组中出现次数 大于 ⌊ n/2 ⌋ 的元素。 你可以假设数组是非空的&#xff0c;并且给定的数组总是存在多数元素。 示例 1&#xff1a; 输入&#xff1a;nums [3,2,3] 输出&a…

CS231n-2017 Lecture6训练神经网络(一)笔记

本节主要讲的是模型训练时的算法设计数据预处理&#xff1a;关于数据预处理&#xff0c;我们有常用的3个符号&#xff0c;数据矩阵X&#xff0c;假设其尺寸是&#xff0c;N是数据样本的数量&#xff0c;D是数据的维度均值减法(Mean subtraction)&#xff1a;是预处理最常用的形…

C++ 中实现 `Task::WhenAll` 和 `Task::WhenAny` 的两种方案

&#x1f4da; C 中实现 Task::WhenAll 和 Task::WhenAny 的两种方案 引用&#xff1a; 拈朵微笑的花 想一番人世變換 到頭來輸贏又何妨日與夜互消長 富與貴難久長 今早的容顏老於昨晚C 标准库异步编程示例&#xff08;一&#xff09;C TAP&#xff08;基于任务的异步编程…