RabbitMQ延时队列的两种实现方式

目录

一、延时插件实现

1、版本要求

2、为运行新容器时安装

3、为已运行的容器安装

4、验证安装

5、代码编写

1. 配置类

2. 生产者

3. 消费者

二、死信队列实现

1、代码编写

1. 配置类

2. 生产者

3. 消费者

三、踩坑记录

1、发送消息失败

2、消息过期后未能转发到死信队列

3、消费者消费报错


一、延时插件实现

1、版本要求

RabbitMQ 3.5.7以上

2、为运行新容器时安装

# 1. 拉取带管理界面的镜像
docker pull rabbitmq:3.11-management
​
# 2. 启动容器并启用插件
docker run -d \--name rabbitmq \-p 5672:5672 \-p 15672:15672 \-e RABBITMQ_DEFAULT_USER=admin \-e RABBITMQ_DEFAULT_PASS=password \rabbitmq:3.11-management \bash -c "rabbitmq-plugins enable rabbitmq_delayed_message_exchange && rabbitmq-server"

3、为已运行的容器安装

# 1. 进入正在运行的容器
docker exec -it rabbitmq /bin/bash
​
# 2. 在容器内执行插件安装
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
​
# 3. 退出容器
exit
​
# 4. 重启容器使插件生效
docker restart rabbitmq

4、验证安装

# 方法1:检查插件列表
docker exec rabbitmq rabbitmq-plugins list | grep delayed
​
# 方法2:登录管理界面
# 访问 http://localhost:15672 (使用设置的账号密码登录)
# 在 "Exchanges" 标签页创建交换机时,Type 下拉框会出现 "x-delayed-message" 选项

5、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";@Beanpublic CustomExchange delayedExchange() {Map<String, Object> args = new HashMap<>();args.put("x-delayed-type", "direct"); // 交换机类型return new CustomExchange(DELAYED_EXCHANGE,"x-delayed-message", // 固定类型true,false,args);}
​@Beanpublic Queue delayedQueue() {return new Queue(DELAYED_QUEUE, true);}
​@Beanpublic Binding delayedBinding(Queue delayedQueue, CustomExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY).noargs();}}

2. 生产者

public void send(String exchange, String routing_key,Object data, Integer delayMillis) {// 消息后处理器:设置延时和持久化MessagePostProcessor processor = message -> {// 毫秒message.getMessageProperties().setDelay(delayMillis);// 持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);return message;};
​rabbitTemplate.convertAndSend(exchange, routingKey, data, processor);
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

二、死信队列实现

1、代码编写

1. 配置类

@Configuration
public class RabbitMqConfig {public static final String DELAYED_EXCHANGE = "delayed.exchange";public static final String DELAYED_QUEUE = "delayed.queue";public static final String DELAYED_ROUTING_KEY = "delayed_routing_key";
​public static final String NORMAL_EXCHANGE = "normal.exchange";public static final String NORMAL_QUEUE = "normal.queue";public static final String NORMAL_ROUTING_KEY = "normal_routing_key";// 死信队列(延时队列)@Beanpublic Queue delayedQueue() {return QueueBuilder.durable(DELAYED_QUEUE).build();}
​// 死信交换机@Beanpublic DirectExchange delayedExchange() {return new DirectExchange(DELAYED_EXCHANGE);}
​// 绑定死信队列到死信交换机@Beanpublic Binding delayedBinding(Queue delayedQueue, DirectExchange delayedExchange) {return BindingBuilder.bind(delayedQueue).to(delayedExchange).with(DELAYED_ROUTING_KEY);}
​// 普通队列@Beanpublic Queue normalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE).deadLetterRoutingKey(DELAYED_ROUTING_KEY).build();}
​// 普通交换机@Beanpublic DirectExchange normalExchange() {return new DirectExchange(NORMAL_EXCHANGE);}
​// 绑定普通队列到普通交换机@Beanpublic Binding normalBinding(Queue normalQueue, DirectExchange normalExchange) {return BindingBuilder.bind(normalQueue).to(normalExchange).with(NORMAL_ROUTING_KEY);}}

2. 生产者

public void send(String exchange, String routing_key, Object data, Integer delayMillis) {String uuid = IdUtil.simpleUUID();// 消息入库略,uuid为主键MessageProperties properties = new MessageProperties();// 设置TTL,单位毫秒properties.setExpiration(String.valueOf(delayMillis));// 消息持久化(2 表示持久化)properties.setDeliveryMode(MessageDeliveryMode.PERSISTENT);
​Message msg = rabbitTemplate.getMessageConverter().toMessage(data, properties);rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));
}

3. 消费者

@Component
@RabbitListener(queues = RabbitMqConfig.DELAYED_QUEUE)
public class DelayedListener {
​@RabbitHandlerpublic void listener(String data, Channel channel, Message message) {log.warn("消息消费成功,消息内容:{}", data);MessageProperties properties = message.getMessageProperties();long deliveryTag = properties.getDeliveryTag()channel.basicAck(deliveryTag, false);}
​
}

三、踩坑记录

1、发送消息失败

原因RabbitTemplate 配置了消息抵达确认,消息ID没有传值。

RabbitTemplate rabbitTemplate = new RabbitTemplate();
// 消息抵达确认通知
rabbitTemplate.setConfirmCallback((data, ack, cause) -> {String msgId = data.getId();if (ack) {log.info("消息抵达队列成功:{}", data);} else {log.error("消息未能发送成功,消息ID:{}", data.getId(), cause);}
});

生产者实际发送消息未传消息ID:

错误格式

rabbitTemplate.convertAndSend(exchange, routingKey, data);

正确格式

String uuid = IdUtil.simpleUUID();
rabbitTemplate.convertAndSend(exchange, routingKey, data, new CorrelationData(uuid));

2、消息过期后未能转发到死信队列

原因:正常消息未绑定死信队列,消息过期自动删除,而不会转发到死信队列中。

错误格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).build();
}

正确格式

@Bean
public Queue delayedNormalQueue() {return QueueBuilder.durable(NORMAL_QUEUE).deadLetterExchange(DELAYED_EXCHANGE) // 指定死信交换机.deadLetterRoutingKey(DELAYED_ROUTING_KEY) // 指定死信路由键.build();
}

3、消费者消费报错

原因:发送的消息由于自定义的 MessageProperties ,其中缺失了 contentType 参数,需要使用转化器进行转换,而不是直接发送消息。

错误格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = new Message(message.getBytes(), properties);
rabbitTemplate.convertAndSend(exchange, routingKey, msg, new CorrelationData(uuid));

正确格式

MessageProperties properties = new MessageProperties();
properties.setExpiration(String.valueOf(delayMillis));
​
Message msg = rabbitTemplate.getMessageConverter().toMessage(message, properties);
rabbitTemplate.send(exchange, routingKey, msg, new CorrelationData(uuid));

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/96405.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/96405.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习在股票量化中的应用

深度学习在股票量化中的具体应用&#xff1a;从时间序列预测到Alpha挖掘深度学习并非量化交易的银弹&#xff0c;但它是一套强大的工具集&#xff0c;能够解决传统量化方法难以处理的复杂问题。其核心价值在于从海量、高维、非结构化的数据中自动提取有效特征并发现非线性关系。…

Web 安全之 HTTP 响应截断攻击详解

这不是危言耸听。 在一次安全审计中&#xff0c;某电商平台发现&#xff1a; 用户访问首页后&#xff0c;自动跳转到了赌博网站。 但代码没被篡改&#xff0c;服务器没被入侵&#xff0c;日志一切正常。 最终追查发现—— 罪魁祸首&#xff0c;竟是一个 %0d%0a&#xff08;回车…

Envoy配置ext_proc

介绍 本文将使用gateway api inference extension作为envoy的ext_proc服务端 启动Ext_Proc 基于Gateway API Inference Extension https://github.com/kubernetes-sigs/gateway-api-inference-extension.git 先clone代码到本地 git clone https://github.com/kubernetes-…

echarts关系图(Vue3)

基础版效果图&#xff1a;后期请求接口&#xff0c;接入数据即可用<template><div><v-chartref"vChartRef":option"option"style"width: 100%; height: 800px"></v-chart></div> </template><script lan…

【LeetCode】17. 电话号码的字母组合

文章目录17. 电话号码的字母组合题目描述示例 1&#xff1a;示例 2&#xff1a;示例 3&#xff1a;提示&#xff1a;解题思路算法分析问题本质分析回溯法详解组合生成过程可视化数字映射关系各种解法对比算法流程图边界情况处理时间复杂度分析空间复杂度分析关键优化点实际应用…

全文 part1 - DGEMM Using Tensor Cores, and Its Accurate and Reproducible Versions

摘要 本文提出了一种在 NVIDIA 图形处理器&#xff08;GPU&#xff09;的张量核心&#xff08;Tensor Cores&#xff0c;仅含 FP16、INT8 等 GEMM 计算功能&#xff09;上实现 FP64&#xff08;双精度&#xff0c;DGEMM&#xff09;和 FP32&#xff08;单精度&#xff0c;SGEMM…

Hexo 博客图片托管:告别本地存储,用 PicGo + GitHub 打造高速稳定图床

之前刚开始进行Hexo博客撰写&#xff0c;图片都保存在本地Hexo源文件目录&#xff08;source/images/&#xff09;文件夹&#xff0c;随着图片增多&#xff0c;管理起来压力增大&#xff0c;于是产生了使用图床&#xff0c;引入外链进行图片存储的想法 Pros and Cons 提升部署…

关于 VScode 无法连接 Linux 主机并报错 <未能下载 VScode 服务器> 的解决方案

1. 出现的情况 VScode 远程登录 Linux 主机, 出现一下报错:2. 检查方案 2.1 VScode 方面 菜单栏: 点击 <帮助> →\to→ 点击 <关于> 在出现的弹窗中记录 [提交: ] 之后的字符串 (暂且将该字符串命名为变量 $commit_id) 2.2 Linux 方面 使用 ssh or MobaXterm 远程登…

泛型与反射

也是重新温习了下泛型与反射,反射基本就是一些api理解即可,不过需要注意类加载器原理,而泛型则需要理解其设计思想,可以代替Object,更加灵活,可读性强。泛型泛型如果指定后,编译阶段就会检查,不让乱输其他类型,必须是引用类型; 如果不指定就默认Object// 如果指定泛型, 就必须存…

Docker端口映射与数据卷完全指南

目录 Docker端口映射与数据卷完全指南 1. 端口映射:连接Docker容器与外部世界 1.1 为什么需要端口映射 1.2 实现端口映射 1.3 查看端口映射 1.4 修改端口映射(高级操作) 2. 数据卷:Docker数据持久化解决方案 2.1 数据持久化问题 2.2 数据卷的含义 2.3 数据卷的特点 2.4 挂载…

【Linux篇章】穿越网络迷雾:揭开 HTTP 应用层协议的终极奥秘!从请求响应到实战编程,从静态网页到动态交互,一文带你全面吃透并征服 HTTP 协议,打造属于你的 Web 通信利刃!

本篇摘要 本篇将介绍何为HTTP协议&#xff0c;以及它的请求与答复信息的格式&#xff08;请求行&#xff0c;请求包头&#xff0c;正文等&#xff09;&#xff0c;对一些比较重要的部分来展开讲解&#xff0c;其他不常用的即一概而过&#xff0c;从静态网页到动态网页的过渡&a…

QT的项目pro qmake编译

使用qmake管理Qt库的子工程示例-CSDN博客 top_srcdir top_builddir

语音交互系统意图识别介绍和构建

一、意图识别简介**意图识别&#xff08;Intent Recognition&#xff09;**是语音交互系统的核心组件&#xff0c;用于理解用户语音输入背后的真实目的&#xff08;如查询天气、播放音乐等&#xff09;。输入&#xff1a;语音转文本&#xff08;ASR输出&#xff09;的语句输出&…

DINOv3 重磅发布

2025年8月14日 Meta 发布了 DINOv3 。 主页&#xff1a;https://ai.meta.com/dinov3/ 论文&#xff1a;DINOv3 HuggingFace地址&#xff1a;https://huggingface.co/collections/facebook/dinov3-68924841bd6b561778e31009 官方博客&#xff1a;https://ai.meta.com/blog/d…

ansible playbook 实战案例roles | 实现基于firewalld添加端口

文章目录一、核心功能描述二、roles内容2.1 文件结构2.2 主配置文件2.3 tasks文件内容免费个人运维知识库&#xff0c;欢迎您的订阅&#xff1a;literator_ray.flowus.cn 一、核心功能描述 这个 Ansible Role (firewalld) 的核心功能是&#xff1a;动态地、安全地配置 firewal…

【深度学习实战(55)】记录一次在新服务器上使用docker的流程

使用docker&#xff1a;apt-get install dockersudo usermod -aG docker sliu &#xff08;将用户 sliu 添加到 docker 用户组&#xff09;newgrp docker &#xff08;刷新&#xff09;docker imagessudo docker load --input /home/sliu/workspace/env/shuai_docker.tar &…

面试后的跟进策略:如何提高录用几率并留下专业印象

面试结束后&#xff0c;许多求职者认为自己的任务已经完成&#xff0c;只需等待结果通知。然而&#xff0c;面试后的跟进策略同样是求职过程中的关键环节&#xff0c;它不仅能提高你的录用几率&#xff0c;还能展示你的专业素养和持续兴趣。本文将结合酷酷面试平台的专业建议&a…

深入解析RAGFlow六阶段架构

下面用“流程图 六阶段拆解”的方式&#xff0c;把 RAGFlow 的完整流程逐层剖开&#xff0c;力求把每一步的输入、输出、可选策略、内部机制都讲清楚。 ──────────────────────── 一、总览图&#xff08;先建立体感&#xff09; 用户提问 │ ├─→【…

Go语言中的迭代器模式与安全访问实践

Go语言中的迭代器模式与安全访问实践 1. 迭代器模式在Go中的演进 1.1 传统迭代器模式回顾 在传统面向对象语言中&#xff0c;迭代器模式通常涉及三个核心组件&#xff1a;可迭代集合接口(Iterable)迭代器接口(Iterator)具体实现类// 传统迭代器模式示例 type Iterator interfac…

从零开始:JDK 在 Windows、macOS 和 Linux 上的下载、安装与环境变量配置

前言 在进入 Java 世界之前&#xff0c;搭建一个稳定、可用的开发环境是每个开发者必须迈过的第一道门槛。JDK&#xff08;Java Development Kit&#xff09;作为 Java 程序开发的核心工具包&#xff0c;其正确安装与环境变量配置直接关系到后续编译、运行、调试等所有开发流程…