还在重启应用改 Topic?Spring Boot 动态 Kafka 消费的“终极形态”

图片

场景描述:
你的一个微服务正在稳定地消费 Kafka 的 order_topic。现在,上游系统为了做业务隔离,新增加了一个 order_topic_vip,并开始向其中投递 VIP 用户的订单。你需要在不重启、不发布新版本的情况下,让你现有的消费者同时开始消费 order_topic_vip 的消息。

这是一个典型的动态运维需求。静态的 @KafkaListener(topics = "order_topic") 注解无法满足这个要求。本文将提供一套完整的解决方案,教你如何利用配置中心(以 Nacos 为例)和 Spring Kafka 的底层 API,实现消费者 Topic 列表的“热更新”。

1. 核心原理:销毁并重建 (Destroy and Rebuild)

Spring Kafka 的消费者容器 (MessageListenerContainer) 在创建时,其核心配置(如监听的 Topic)就已经确定。在运行时直接修改一个正在运行的容器的 Topic 列表,是一种不被推荐且存在风险的操作。

最稳健、最可靠的方案是:

  1. 1. 停止注销监听旧 Topic 的消费者容器。

  2. 2. 根据原始的消费者配置和新传入的 Topic 列表,以编程方式创建一个全新的消费者容器。

  3. 3. 启动这个新的容器。

整个过程对外界来说是“无感”的,最终效果就是消费者监听的 Topic 列表发生了变化。

2. 方案架构

要实现上述流程,我们需要三个关键组件:

  1. 1. 元数据采集器 (BeanPostProcessor): 在应用启动时,扫描并缓存所有 @KafkaListener 的“配置蓝图”(包括 idgroupId, 原始 topics 等)。

  2. 2. 配置中心 (Nacos): 作为动态 Topic 配置的“真理之源”。

  3. 3. 动态刷新服务: 监听 Nacos 的配置变更,并调用 Spring Kafka 的 KafkaListenerEndpointRegistry API 来完成“销毁并重建”的操作。

3. 完整代码实现

这是一个可以直接集成的、完整的解决方案代码。

步骤 3.1: 定义元数据存储

EndpointMetadata.java

package com.example.kafka.dynamic.core;import java.io.Serializable;
import java.lang.reflect.Method;// 用于存储 @KafkaListener 的“蓝图”
public class EndpointMetadata implements Serializable {private String id;private String groupId;private String[] topics;private Object bean;private Method method;// ... 可按需添加 concurrency, autoStartup 等其他属性// Getters and Setters...public String getId() { return id; }public void setId(String id) { this.id = id; }public String getGroupId() { return groupId; }public void setGroupId(String groupId) { this.groupId = groupId; }public String[] getTopics() { return topics; }public void setTopics(String[] topics) { this.topics = topics; }public Object getBean() { return bean; }public void setBean(Object bean) { this.bean = bean; }public Method getMethod() { return method; }public void setMethod(Method method) { this.method = method; }
}

KafkaListenerMetadataRegistry.java (元数据采集与注册)

package com.example.kafka.dynamic.processor;import com.example.kafka.dynamic.core.EndpointMetadata;
import org.springframework.aop.support.AopUtils;
import org.springframework.beans.BeansException;
import org.springframework.beans.factory.config.BeanPostProcessor;
import org.springframework.core.annotation.AnnotationUtils;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;import java.lang.reflect.Method;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;@Component
public class KafkaListenerMetadataRegistry implements BeanPostProcessor {private final Map<String, EndpointMetadata> metadataStore = new ConcurrentHashMap<>();@Overridepublic Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {Class<?> targetClass = AopUtils.getTargetClass(bean);for (Method method : targetClass.getMethods()) {KafkaListener kafkaListener = AnnotationUtils.findAnnotation(method, KafkaListener.class);if (kafkaListener != null && kafkaListener.id() != null && !kafkaListener.id().isEmpty()) {EndpointMetadata metadata = new EndpointMetadata();metadata.setId(kafkaListener.id());metadata.setTopics(kafkaListener.topics());metadata.setGroupId(kafkaListener.groupId());metadata.setBean(bean);metadata.setMethod(method);metadataStore.put(kafkaListener.id(), metadata);}}return bean;}public EndpointMetadata getMetadata(String listenerId) {return metadataStore.get(listenerId);}
}
步骤 3.2: 核心实现:动态刷新服务

DynamicKafkaConsumerService.java

package com.example.kafka.dynamic.service;import com.alibaba.nacos.api.config.ConfigService;
import com.alibaba.nacos.api.config.listener.Listener;
import com.example.kafka.dynamic.core.EndpointMetadata;
import com.example.kafka.dynamic.processor.KafkaListenerMetadataRegistry;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.ObjectMapper;
import jakarta.annotation.PostConstruct;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
import org.springframework.kafka.listener.MessageListenerContainer;
import org.springframework.messaging.handler.annotation.support.DefaultMessageHandlerMethodFactory;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;import java.util.Arrays;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.Executor;@Service
public class DynamicKafkaConsumerService {private static final Logger log = LoggerFactory.getLogger(DynamicKafkaConsumerService.class);@Autowiredprivate KafkaListenerEndpointRegistry listenerRegistry;@Autowiredprivate KafkaListenerContainerFactory<?> kafkaListenerContainerFactory;@Autowiredprivate KafkaListenerMetadataRegistry metadataRegistry;@Autowiredprivate ConfigService configService; // Nacos Config Serviceprivate final ObjectMapper objectMapper = new ObjectMapper();private final String DATA_ID = "dynamic-kafka-topics.json";private final String GROUP = "DEFAULT_GROUP";@PostConstructpublic void init() throws Exception {// 1. 应用启动时,先拉取一次配置String initialConfig = configService.getConfig(DATA_ID, GROUP, 5000);if (StringUtils.hasText(initialConfig)) {refreshListeners(initialConfig);}// 2. 注册 Nacos 监听器configService.addListener(DATA_ID, GROUP, new Listener() {@Overridepublic Executor getExecutor() { return null; }@Overridepublic void receiveConfigInfo(String configInfo) {log.info("接收到 Kafka Topic 配置变更:\n{}", configInfo);refreshListeners(configInfo);}});}public synchronized void refreshListeners(String configInfo) {try {Map<String, String> configMap = objectMapper.readValue(configInfo, new TypeReference<>() {});configMap.forEach((listenerId, topics) -> {log.info("准备刷新 Listener ID '{}' 的 Topics 为 '{}'", listenerId, topics);MessageListenerContainer container = listenerRegistry.getListenerContainer(listenerId);String[] newTopics = topics.split(",");// 如果容器存在,且 Topic 列表发生了变化if (container != null) {if (!Arrays.equals(container.getContainerProperties().getTopics(), newTopics)) {recreateAndRegisterContainer(listenerId, newTopics);}} else {// 如果容器不存在 (可能被手动停止或首次创建),也进行创建recreateAndRegisterContainer(listenerId, newTopics);}});} catch (Exception e) {log.error("动态刷新 Kafka 消费者配置失败", e);}}private void recreateAndRegisterContainer(String listenerId, String[] topics) {log.info("开始重建并注册 Listener ID '{}'", listenerId);// 1. 停止并销毁旧容器MessageListenerContainer container = listenerRegistry.getListenerContainer(listenerId);if (container != null) {container.stop();// 在 Spring Kafka 2.8+ 中,注销是内部操作,我们只需创建并注册新的即可。}// 2. 从我们的“蓝图”中获取元数据EndpointMetadata metadata = metadataRegistry.getMetadata(listenerId);if (metadata == null) {log.error("找不到 Listener ID '{}' 的元数据,无法重建。", listenerId);return;}// 3. 创建一个全新的 EndpointMethodKafkaListenerEndpoint<String, String> newEndpoint = new MethodKafkaListenerEndpoint<>();newEndpoint.setId(metadata.getId());newEndpoint.setGroupId(metadata.getGroupId());newEndpoint.setTopics(topics); // <-- 核心:使用新 TopicnewEndpoint.setBean(metadata.getBean());newEndpoint.setMethod(metadata.getMethod());newEndpoint.setMessageHandlerMethodFactory(new DefaultMessageHandlerMethodFactory());// 4. 注册新的 EndpointlistenerRegistry.registerListenerContainer(newEndpoint, kafkaListenerContainerFactory, true);log.info("成功重建并启动 Listener ID '{}',现在监听 Topics: {}", listenerId, Arrays.toString(topics));}
}

4. 实践演练

步骤 4.1: 业务代码

在你的 Spring Boot 应用中,正常定义你的消费者,但务必提供唯一的 id

@Service
public class OrderEventListener {@KafkaListener(id = "order-listener", topics = "order_topic", groupId = "my-group")public void handleOrderEvent(String message) {System.out.println("收到订单消息: " + message);}
}
步骤 4.2: application.yml 配置

确保你的应用连接到了 Nacos。

spring:cloud:nacos:config:server-addr: 127.0.0.1:8848
# ... kafka server acls
步骤 4.3: Nacos 配置

在 Nacos 中,创建一个 Data ID 为 dynamic-kafka-topics.jsonGroup 为 DEFAULT_GROUP 的配置,内容为 JSON 格式:

{"order-listener": "order_topic"
}

Key (order-listener) 必须与 @KafkaListener 的 id 完全一致。

步骤 4.4: 启动与验证
  1. 1. 启动应用。此时,order-listener 消费者会正常启动,并开始消费 order_topic 的消息。

  2. 2. 动态变更! 去 Nacos 控制台,将配置修改为:
    {"order-listener": "order_topic,order_topic_vip"
    }
  3. 3. 点击“发布”。

  4. 4. 观察应用日志。 你会看到类似下面的日志:
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 接收到 Kafka Topic 配置变更: ...
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 准备刷新 Listener ID 'order-listener' 的 Topics 为 'order_topic,order_topic_vip'
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 开始重建并注册 Listener ID 'order-listener'
    ... (旧容器停止的日志) ...
    INFO --- [pool-1-thread-1] c.e.k.d.s.DynamicKafkaConsumerService  : 成功重建并启动 Listener ID 'order-listener',现在监听 Topics: [order_topic, order_topic_vip]
  5. 5. 验证结果。 现在,你的 order-listener 已经开始同时消费 order_topic 和 order_topic_vip 两个 Topic 的消息了,整个过程应用没有重启

总结

通过巧妙地结合 BeanPostProcessorKafkaListenerEndpointRegistry 和动态配置中心,我们实现了一个功能极其强大的动态 Kafka 消费管理方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/96767.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/96767.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

使用vllm部署neo4j的text2cypher-gemma-2-9b-it-finetuned-2024v1模型

使用vllm部署neo4j的text2cypher-gemma-2-9b-it-finetuned-2024v1模型 系统环境准备 由于使用的基于 nvcr.io/nvidia/cuda:12.1.1-cudnn8-runtime-ubuntu22.04 的 workbench,需要进行以下准备(其他系统环境可忽略) ldconfig -p | grep libcudnn 找到 libcudnn 的so库,然…

Coze源码分析-资源库-创建知识库-前端源码-核心组件

概述 本文深入分析Coze Studio中用户创建知识库功能的前端实现。该功能允许用户在资源库中创建、编辑和管理知识库资源&#xff0c;为开发者提供了强大的知识管理和数据处理能力。通过对源码的详细解析&#xff0c;我们将了解从资源库入口到知识库配置弹窗的完整架构设计、组件…

基于时空数据的网约车订单需求预测与调度优化

一、引言随着共享出行行业的蓬勃发展&#xff0c;网约车已成为城市交通的重要组成部分。如何精准预测订单需求并优化车辆调度&#xff0c;是提升平台运营效率、改善用户体验的关键。本文提出一种基于时空数据的网约车订单需求预测与调度优化方案&#xff0c;通过网格化城市空间…

数据结构 Java对象的比较

在Java中&#xff0c;凡是涉及到比较的&#xff0c;可以分为两类情况&#xff1a;一类是基本数据类型的比较&#xff0c;另一类是引用数据类型的比较。对于基本数据类型的比较&#xff0c;我们通过关系运算符&#xff08;、>、<、!、>、<&#xff09;进行它们之间的…

企智汇建筑施工项目管理系统:全周期数字化管控,赋能工程企业降本增效!​建筑工程项目管理软件!建筑工程项目管理系统!建筑项目管理软件企智汇软件

在建筑施工行业&#xff0c;项目进度滞后、成本超支、质量安全隐患频发、多方协同不畅等问题&#xff0c;一直是制约企业发展的痛点。传统依赖人工记录、Excel 统计的管理模式&#xff0c;不仅效率低下&#xff0c;更易因信息断层导致决策失误。企智汇建筑施工项目管理系统凭借…

k8s-临时容器学习

临时容器学习1. 什么是临时容器2. 实验1. 什么是临时容器 在官网&#xff1a;https://kubernetes.io/zh-cn/docs/concepts/workloads/pods/ephemeral-containers/ 中有介绍 临时容器是用于调试Pod中崩溃的容器或者不具备调试工具&#xff0c;比如在一个运行着业务的容器中&am…

Python 2025:低代码开发与自动化运维的新纪元

从智能运维到无代码应用&#xff0c;Python正在重新定义企业级应用开发范式在2025年的企业技术栈中&#xff0c;Python已经从一个"开发工具"演变为业务自动化的核心平台。根据Gartner 2025年度报告&#xff0c;68%的企业在自动化项目中使用Python作为主要开发语言&am…

Netty 在 API 网关中的应用篇(请求转发、限流、路由、负载均衡)

Netty 在 API 网关中的应用篇&#xff08;请求转发、限流、路由、负载均衡&#xff09;随着微服务架构的普及&#xff0c;API 网关成为服务之间通信和安全控制的核心组件。在构建高性能网关时&#xff0c;Netty 因其高吞吐、低延迟和异步非阻塞 IO 的特性&#xff0c;成为不少开…

基于STM32设计的青少年学习监控系统(华为云IOT)_282

文章目录 一、前言 1.1 项目介绍 【1】项目开发背景 【2】设计实现的功能 【3】项目硬件模块组成 【4】设计意义 【5】国内外研究现状 【6】摘要 1.2 设计思路 1.3 系统功能总结 1.4 开发工具的选择 【1】设备端开发 【2】上位机开发 1.5 参考文献 1.6 系统框架图 1.7 系统原理…

手写Spring底层机制的实现【初始化IOC容器+依赖注入+BeanPostProcesson机制+AOP】

摘要&#xff1a;建议先看“JAVA----Spring的AOP和动态代理”这个文章&#xff0c;解释都在代码中&#xff01;一&#xff1a;提出问题依赖注入1.单例beans.xml<?xml version"1.0" encoding"UTF-8"?> <beans xmlns"http://www.springframe…

5G NR-NTN协议学习系列:NR-NTN介绍(2)

NTN网络作为依赖卫星的通信方式&#xff0c;需要面对的通信距离&#xff0c;通信双方的移动速度都和之前TN网络存在巨大差异。在距离方面相比蜂窝地面网络Terrestrial Network通信距离从最小几百米到最大几十km的情况&#xff0c;NTN非地面网络的通信距离即使是近地轨道的LEO卫…

线扫相机采集图像起始位置不正确原因总结

1、帧触发开始时间问题 问题描述: 由于帧触发决定了线扫相机的开始采集图像位置,比如正确的位置是A点开始采集,结果你从B点开始触发帧信号,这样出来的图像起始位置就不对 解决手段: 软件需要记录帧触发时轴的位置 1)控制卡控制轴 一般使用位置比较触发,我们可以通过监…

校园管理系统练习项目源码-前后端分离-【node版】

今天给大家分享一个校园管理系统&#xff0c;前后端分离项目。这是最近在练习前端编程&#xff0c;结合 node 写的一个完整的项目。 使用的技术&#xff1a; Node.js&#xff1a;版本要求16.20以上。 后端框架&#xff1a;Express框架。 数据库&#xff1a; MySQL 8.0。 Vue2&a…

【项目】 :C++ - 仿mudou库one thread one loop式并发服务器实现(模块划分)

【项目】 &#xff1a;C - 仿mudou库one thread one loop式并发服务器实现一、HTTP 服务器与 Reactor 模型1.1、HTTP 服务器概念实现步骤难点1.2、Reactor 模型概念分类1. 单 Reactor 单线程2. 单 Reactor 多线程3. 多 Reactor 多线程目标定位总结二、功能模块划分2.1、SERVER …

浴室柜市占率第一,九牧重构数智卫浴新生态

作者 | 曾响铃文 | 响铃说2025年上半年&#xff0c;家居市场在政策的推动下展现出独特的发展态势。国家出台的一系列鼓励家居消费的政策&#xff0c;如“以旧换新”国补政策带动超6000万件厨卫产品焕新&#xff0c;以及我国超2.7亿套房龄超20年的住宅进入改造周期&#xff0c;都…

源码分析之Leaflet中TileLayer

概述 TileLayer 是 Layer 的子类&#xff0c;继承自GridLayer基类&#xff0c;用于加载和显示瓦片地图。它提供了加载和显示瓦片地图的功能&#xff0c;支持自定义瓦片的 URL 格式和参数。 源码分析 源码实现 TileLayer的源码实现如下&#xff1a; export var TileLayer GridL…

php学习(第二天)

一.网站基本概念-服务器 1.什么是服务器? 1.1定义 服务器&#xff08;server&#xff09;,也称伺服器&#xff0c;是提供计算服务的设备。 供计算服务的设备” 这里的“设备”不仅指物理机器&#xff08;如一台配有 CPU、内存、硬盘的计算机&#xff09;&#xff0c;也可以指…

C++(友元和运算符重载)

目录 友元&#xff1a; 友元函数&#xff1a; 示例&#xff1a; 友元类&#xff1a; 示例&#xff1a; 优点&#xff1a; 注意事项&#xff1a; 运算符重载&#xff1a; 注意&#xff1a; 示例&#xff1a; 友元&#xff1a; C中如果想要外部函数或者类对一个类的pr…

和平精英风格射击游戏开发指南

本教程将完整讲解如何开发一款和平精英风格的HTML射击游戏&#xff0c;涵盖核心设计理念、代码架构与关键实现细节。 核心设计架构 游戏机制系统 角色控制系统&#xff1a;通过键盘实现玩家移动战斗系统&#xff1a;子弹发射与碰撞检测道具系统&#xff1a;武器、弹药和医疗包收…

21.1 《24GB显存搞定LLaMA2-7B指令微调:QLoRA+Flash Attention2.0全流程实战》

24GB显存搞定LLaMA2-7B指令微调:QLoRA+Flash Attention2.0全流程实战 实战 LLaMA2-7B 指令微调 一、指令微调技术背景 指令微调(Instruction Tuning)是大模型训练中的关键技术突破点。与传统全量微调(Full Fine-Tuning)相比,指令微调通过特定格式的指令-响应数据训练,…