rabbitmq动态创建交换机、队列、动态绑定,销毁

 // 缓存已创建的绑定,避免重复声明private final Map<String, Date> createdBindings = new ConcurrentHashMap<>(); 
public void createAndBindQueueToExchange(String type,String clinetId,  String routingKey) {String   queueName = routingKey;log.info("初始化类型:{}",type);QueueInformation queueInformation = rabbitAdmin.getQueueInfo(queueName);if(queueInformation == null) {//队列不存在则创建Map<String, Object> args = new HashMap<>();args.put("x-max-priority", maxPriority); // 设置优先级范围Queue queue = new Queue(queueName, true, false, false, args);rabbitAdmin.declareQueue(queue);log.info("创建队列: {} ", queueName);}else{log.info("队列已存在: {} ", queueName);}String containerKey = queueName + ":" + mainDirectExchange + ":" + routingKey + ":"+clinetId;//还要判断监听容器是否存在if (createdBindings.containsKey(containerKey) && registry.getListenerContainerIds().contains(containerKey)) {log.info("绑定已存在缓存中,容器中也存在 queue: {} to exchange: {} with routing key: {},time={}",queueName, mainDirectExchange, routingKey,createdBindings.get(containerKey));createdBindings.put(containerKey,new Date());}else{//stopContainerListenerAndCleanCash(containerKey,"缓存 无Key(有无监听容器)或(有缓存Key无监听容器)");// 2. 声明绑定到已存在的交换机Binding binding = new Binding(queueName,Binding.DestinationType.QUEUE,mainDirectExchange,routingKey,null);rabbitAdmin.declareBinding(binding);// 添加到缓存createdBindings.put(containerKey,new Date());log.info("成功创建绑定 for queue: {} to exchange: {} with routing key: {}",queueName, mainDirectExchange, routingKey);}// 3. 注册监听器if (!registry.getListenerContainerIds().contains(containerKey)) {SimpleRabbitListenerEndpoint endpoint = new SimpleRabbitListenerEndpoint();endpoint.setId(containerKey);endpoint.setQueueNames(queueName);//endpoint.setAutoStartup(true);// 使用手动ACK的消息监听器endpoint.setMessageListener(new ChannelAwareMessageListener() {@Overridepublic void onMessage(Message message, Channel channel) throws Exception {//处理消息完成后,不退回为trueboolean okMessage = false;String consumerTag = "";Exception exception = null;FlowInstanceNode flowInstanceNode = null;try {consumerTag = message.getMessageProperties().getConsumerTag();String clientId = consumerTag.split(":")[3];String messageBody = new String(message.getBody());flowInstanceNode = processMessage(clientId,messageBody);if("0".equalsIgnoreCase(flowInstanceNode.getExceptionType())) {//消费正常channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);okMessage = true;}} catch (Exception e) {log.error("指令包消息处理失败: {}", truncateString(new String(message.getBody())), e);exception = e;}finally {// 根据异常类型决定是否重新入队// 网络异常的情况都应返回队列重新消费boolean exceptionFlag = shouldRequeue(exception,flowInstanceNode);log.info("指令执行包异常标志exceptionFlag: {}",exceptionFlag);if (exceptionFlag) {okMessage = false;// 消息重新入队channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);log.warn("指令包消息处理失败,已重新入队: {}", truncateString(new String(message.getBody())));} else {// 消息丢弃(不重新入队)if(!okMessage){channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);log.error("指令包消息处理失败,已丢弃: {}", truncateString(new String(message.getBody())));}okMessage = true;}if (okMessage) {// 消费消息序列ID写入缓存Long sequence = flowInstanceNode.getSequence();String professionSoftwareCode = flowInstanceNode.getProfessionalSoftwareType();String sequenceKey = professionSoftwareCode+":message:finishedSequence";cacheService.set(sequenceKey, sequence);}String exceptionType =  flowInstanceNode.getExceptionType();//网络异常if(exceptionFlag){//调用专业软件API网络异常//停止监听stopContainerListenerAndCleanCash(consumerTag,"网络异常");//数据返回队列}//判断心跳是否超时stopContainer(consumerTag,"每次检查心跳超时");log.info("本次消费执行完毕");}}});registry.registerListenerContainer(endpoint, factory, false);SimpleMessageListenerContainer simpleMessageListenerContainer = (SimpleMessageListenerContainer)registry.getListenerContainer(containerKey);simpleMessageListenerContainer.setConsumerTagStrategy(new ConsumerTagStrategy() {@Overridepublic String createConsumerTag(String queue) {return containerKey;}});simpleMessageListenerContainer.start();// 设置监听器容器log.info("设置监听器容器并启动监听 queue: {},containerKey :{}" , queueName,containerKey);}else {// 启动已存在的监听器if (!registry.getListenerContainer(containerKey).isRunning()) {//启动监听registry.getListenerContainer(containerKey).start();log.info("启动监听  queue: {},containerKey:{}" , queueName,containerKey);} else {log.info("监听已运行 queue: {},containerKey:{}" , queueName,containerKey);}}}

动态销毁:

   public void stopContainerListenerAndCleanCash(String containerKey,String tip) {try {log.info(tip+",清除缓存,停止监听容器:{}", containerKey);if (registry.getListenerContainerIds().contains(containerKey)) {registry.getListenerContainer(containerKey).stop();registry.unregisterListenerContainer(containerKey);}// 删除队列// rabbitAdmin.deleteQueue(queueName);// 清理该队列相关的绑定缓存createdBindings.remove(containerKey);}catch (Exception e){log.error("清除缓存,停止监听容器异常",e);}}

容器事件监听:

//容器异常

@Component
@Slf4j
public class RabbitListenerContainerExceptionHandler implements ApplicationListener<ListenerContainerConsumerFailedEvent> {@AutowiredRabbitMQService rabbitMQService;@Overridepublic void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {Throwable t = event.getThrowable();Object object = event.getSource();if (object instanceof SimpleMessageListenerContainer){SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) object;String queueName = container.getQueueNames()[0];String listenerId = container.getListenerId();rabbitMQService.stopContainerListenerAndCleanCash(listenerId,"容器异常");}log.error("RabbitMQ监听容器异常", t);// 这里可以判断异常类型,比如队列不存在、连接断开等if (t instanceof ShutdownSignalException) {// 处理队列被删除、服务失联等}}
}

//

    @PostConstructpublic void init() {//factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);//factory.setReceiveTimeout();//factory.setReceiveTimeout(1L);factory.setContainerCustomizer(container -> {// 设置消费者超时时间(需RabbitMQ服务端支持)container.setConsumerArguments(Collections.singletonMap("consumer_timeout", 60000L));});factory.setErrorHandler(t -> {// 这里可以捕获到消息处理时的异常log.error("RabbitMQ消息处理异常", t);// 可以根据异常类型做不同处理});connectionFactory.addConnectionListener(new ConnectionListener() {@Overridepublic void onClose(Connection connection) {stopAllContainerListeners();log.warn("RabbitMQ连接关闭");}@Overridepublic void onCreate(Connection connection) {stopAllContainerListeners();log.info("RabbitMQ连接创建");}@Overridepublic void onShutDown(ShutdownSignalException signal) {stopAllContainerListeners();log.error("RabbitMQ连接异常关闭", signal);}});}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/web/87039.shtml
繁体地址,请注明出处:http://hk.pswp.cn/web/87039.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

云效代码仓库导入自建gitlab中

登录自建GitLab 在浏览器中输入GitLab访问地址http://192.168.1.111:81/users/sign_in&#xff0c;输入账号和密码登录GitLab服务&#xff0c;如下图&#xff1a; 新建一个空的代码库 按照以下截图顺序&#xff0c;创建一个新的空项目&#xff0c;如下&#xff1a; 克隆镜像 …

业界优秀的零信任安全管理系统产品介绍

腾讯 iOA 零信任安全管理系统 简介&#xff1a;腾讯 iOA 零信任安全管理系统是腾讯终端安全团队针对企业安全上云和数字化转型&#xff0c;提供的企业网络边界处的应用访问管控系统&#xff0c;为企业应用提供统一、安全、高效的访问入口&#xff0c;同时提供终端安全加固、软…

从设计到开发一个小程序页面

巧妇难为无米之炊&#xff0c;想写功能但是没有好看的设计&#xff0c;边写边设计效率又不够高。mastergoAi生成的页面又不够好看&#xff0c;而且每月给的免费积分用得又超快&#xff0c;so决定自给自足。能有多难&#xff0c;先做&#xff0c;做了再改。 于是决定踏足设计&a…

Linux系统 / Ubuntu虚拟机 安装DHCP服务

一、安装DHCP服务 xxx:~$ sudo apt install isc-dhcp-server 正在读取软件包列表... 完成 正在分析软件包的依赖关系树 正在读取状态信息... 完成 将会同时安装下列软件&#xff1a; libirs-export161 libisccfg-export163 建议安装&#xff1a; isc-dhcp-s…

Spring中 BeanFactory和FactoryBean分别是什么?

Spring 中 BeanFactory 是什么? BeanFactory其实就是IoC的底层容器&#xff0c;它本身只是一个接口&#xff0c;顾名思义Bean工厂&#xff0c;定义了Spring的基本功能框架&#xff0c;主要功能就是 负责从配置源中读取 Bean 的定义&#xff0c;并创建、管理这些 Bean 的生命周…

langchain从入门到精通(三十二)——RAG优化策略(八)自查询检索器实现动态数据过滤

1. 查询构建与自查询检索器 在 RAG 应用开发中&#xff0c;检索外部数据时&#xff0c;前面的优化案例中&#xff0c;无论是生成的 子查询、问题分解、生成假设性文档&#xff0c;最后在执行检索的时候使用的都是固定的筛选条件&#xff08;没有附加过滤的相似性搜索&#xff…

面向安全产品测试的静态混淆型 Shellcode Loader 设计与对抗分析

github 地址&#xff1a;https://github.com/LilDean17/ShellcodeLoader2025 一、项目背景 近年来&#xff0c;随着 C2 框架广泛应用于安全对抗模拟&#xff0c;各大安全厂商也不断提升其检测能力&#xff0c;那么安全厂商自研的安全软件&#xff0c;是否能有效防御此类威胁&…

深度强化学习DRL——策略学习

一、策略网络 策略函数 π \pi π的输入是状态 s s s和动作 a a a&#xff0c;输出是一个介于0和1之间的概率值&#xff0c;用神经网络 π ( a ∣ s ; θ ) \pi(a \mid s; \boldsymbol{\theta}) π(a∣s;θ)近似策略函数 π ( a ∣ s ) \pi(a\mid s) π(a∣s)&#xff0c; θ …

ISP Pipeline(5): Auto White Balance Gain Control (AWB) 自动白平衡

G_gain 1.0 # 常作为参考通道 R_gain G_avg / R_avg B_gain G_avg / B_avgAuto White Balance Gain Control&#xff08;AWB&#xff09;自动调整图像中红色、绿色、蓝色通道的增益&#xff0c;使图像中灰白区域的颜色看起来为“中性白”或“灰白”&#xff0c;从而矫正因光…

Python中钩子函数的实现方式

在Python中&#xff0c;钩子函数(Hook)是一种允许你在程序执行的特定点插入自定义代码的技术。它本质上是一种回调机制&#xff0c;当特定事件发生时自动调用预先注册的函数。 Python中钩子函数的实现方式 Python中实现钩子主要有以下几种方式&#xff1a; ​回调函数​&…

【RTSP从零实践】3、实现最简单的传输H264的RTSP服务器

&#x1f601;博客主页&#x1f601;&#xff1a;&#x1f680;https://blog.csdn.net/wkd_007&#x1f680; &#x1f911;博客内容&#x1f911;&#xff1a;&#x1f36d;嵌入式开发、Linux、C语言、C、数据结构、音视频&#x1f36d; &#x1f923;本文内容&#x1f923;&a…

零开始git使用教程-传html文件

1. 准备工作 (1) 确保你已经安装&#xff1a; Visual Studio (VS)&#xff08;任何版本&#xff0c;社区版也行&#xff09; Git&#xff08;去官网 git-scm.com 下载安装&#xff09; (2) 注册 Gitee/GitHub 账号 国内推荐 Gitee&#xff08;码云&#xff09;&#xff1a;…

CPT204-Advanced OO Programming: Lists, Stacks, Queues, and Priority Queues

目录 1.Java 集合框架层次结构Java Collection Framework hierarchy 1.1Java 集合框架描述&#xff1a; 1.2数据结构Data structures 1.3 Java 集合框架支持两种类型的容器&#xff08;数据结构&#xff09;&#xff1a; 1.4 Java 集合框架的设计 2.Collection 2.1 Coll…

【网络安全】Mysql注入中锁机制

前言 在sql注入的延时注入中&#xff0c;常见的函数有sleep()直接延时、BENCHMARK()通过让数据库进行大量的计算而达到延时的效果、笛卡尔积、正则匹配等&#xff0c;但还有一个常常被忽略的函数&#xff0c;也就是Mysql中的锁机制。虽然早些年就已经出现过相关的技术文章&…

博途多重背景、参数实例

1&#xff1a;我们在博途中先新建一个工程&#xff0c;并且建立一个FB块名字为motor_fb&#xff0c;同样建立一个FC块名字为MOTOR_FC&#xff0c;里面写上我们电机程序里常用的逻辑控制。二者程序内容相同。下面是motor_fb块的程序截图: 2:我们再新建一个FB块&#xff0c;名字为…

运维的利器–监控–zabbix–第三步:配置zabbix–中间件–Tomcat–步骤+验证

&#x1f3e0;个人主页&#xff1a;fo安方的博客✨ &#x1f482;个人简历&#xff1a;大家好&#xff0c;我是fo安方&#xff0c;目前中南大学MBA在读&#xff0c;也考取过HCIE Cloud Computing、CCIE Security、PMP、CISP、RHCE、CCNP RS、PEST 3等证书。&#x1f433; &…

大模型在重症哮喘手术全流程风险预测与治疗方案制定中的应用研究

目录 一、引言 1.1 研究背景与意义 1.2 研究目标与方法 1.3 研究创新点 二、重症哮喘概述 2.1 定义与发病机制 2.2 分类与临床表现 2.3 诊断标准与方法 三、大模型技术原理与应用现状 3.1 大模型的基本原理 3.2 在医疗领域的应用案例分析 3.3 适用于重症哮喘预测的…

Webpack的插件机制Tapable

Tapable 是一个轻量级的库&#xff0c;用于创建和管理插件钩子&#xff08;hooks&#xff09;&#xff0c;它在 Webpack 中广泛应用&#xff0c;用于实现插件系统。Tapable 提供了一种机制&#xff0c;允许插件在特定的生命周期阶段插入自定义逻辑&#xff0c;从而扩展应用程序…

FRONT归因-两阶段训练流程

FRONT, Fine-Grained Grounded Citations归因 FRONT归因&#xff0c;首先从检索到的源文档中选择支持性引用&#xff0c;然后基于这些引用指导生成过程&#xff0c;确保生成回答有据可依&#xff0c;引用准确无误。 FRONT的特色在于两阶段归因训练&#xff0c;要点如下: 阶…

单端转差分放大器AD8138

根据 AD8138 的数据手册特性及参数&#xff0c;可以实现单端 5Vpp&#xff08;偏置 0V&#xff09;正弦波转差分 5Vpp&#xff08;共模 2.5V&#xff09;的功能&#xff0c;但需注意以下细节&#xff1a; 1. 信号幅度匹配性 输入信号&#xff1a;单端 5Vpp&#xff08;峰峰值…