RabbitMQ 利用死信队列来实现延迟消息

RabbitMQ 利用死信队列来实现延迟消息

基于 TTL(Time-To-Live)+ 死信队列(DLX)的方式来实现延迟消息

首先消息会被推送到普通队列中,该消息设置了TTL,当TTL到期未被消费掉,则会自动进入死信队列(DLQ)中,由死信队列消费者消费,来达到延迟消息的效果


首先让我们来安装 Rabbit MQ 服务端

由于服务器基本都是使用 Linux 系统
以下介绍常见的 Ubuntu/Debian 和 CentOS 系统安装 RabbitMQ 的方法

Ubuntu / Debian

# 添加 Erlang 仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.deb.sh | sudo bash# 安装兼容版本的 Erlang(例如 25.3.2)
sudo apt-get install -y erlang=1:25.3.2+dfsg-1# 添加 RabbitMQ 官方仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.deb.sh | sudo bash# 安装 RabbitMQ
sudo apt-get update
# 安装最新稳定版
sudo apt-get install -y rabbitmq-server# 启动服务
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server  # 设置开机自启
# 然后是开放 Linux 的防火墙端口
# Ubuntu/Debian
sudo ufw allow 5672/tcp # 5672 是 RabbitMQ 服务端口
sudo ufw allow 15672/tcp # 15672 是 RabbitMQ 的后台管理端口

CentOS 7

# 添加Erlang仓库
curl -s https://packagecloud.io/install/repositories/rabbitmq/erlang/script.rpm.sh | sudo bash# 安装兼容版本的Erlang(例如25.3.2)
sudo yum install -y erlang-25.3.2.8-1.el7curl -s https://packagecloud.io/install/repositories/rabbitmq/rabbitmq-server/script.rpm.sh | sudo bash
sudo yum install -y rabbitmq-server-3.12.7-1.el7# 启动RabbitMQ服务
sudo systemctl start rabbitmq-server
# 设置开机自启
sudo systemctl enable rabbitmq-server
# 检查服务状态
sudo systemctl status rabbitmq-server
# 开放AMQP协议端口(默认5672)
sudo firewall-cmd --permanent --add-port=5672/tcp
# 开放管理界面端口(默认15672)
sudo firewall-cmd --permanent --add-port=15672/tcp
# 重新加载防火墙规则
sudo firewall-cmd --reload

到这里你已经安装完成了, 然后该开始初始化 Rabbit MQ 设置一些账号相关的配置了

# 安装完后用 rabbitmqctl 命令来查看服务状态
sudo rabbitmqctl status
# 然后是安装后台管理页面,虽然都是用代码来操作MQ,
# 但是业务上难免是需要看MQ运行状态的,所以强烈建议是装上可视化界面
sudo rabbitmq-plugins enable rabbitmq_management
# 接下来是创建账号,创建虚拟主机,给用户授权,这些账号是要配置在 application.properties 中的# 创建新用户
sudo rabbitmqctl add_user yourUserName yourPassword # 替换成你的账号密码
# 创建虚拟主机
sudo rabbitmqctl add_vhost yourvhost # 替换成你的虚拟主机
# 为用户授予虚拟主机的权限
sudo rabbitmqctl set_permissions -p yourvhost yourUserName ".*" ".*" ".*"
# 查看用户权限
sudo rabbitmqctl list_user_permissions yourUserName
# 设置 yourUserName 为管理员角色
# 这一步很重要,你要设置成管理员,后面才能登录后台管理页面
sudo rabbitmqctl set_user_tags yourUserName administrator
# 设置完后你就可以通过浏览器 访问 http://localhost:15672/ 登录后台(自己替换成 Linux 的IP)

顺带提一嘴,权限级别有多级, administrator 为最高权限

  • none:无特殊权限(默认角色)
  • management:可以访问管理 API 和 Web 界面
  • policymaker:包含management权限,还能管理策略和参数
  • monitoring:包含management权限,还能查看节点和集群信息
  • administrator:最高权限,可管理所有资源和用户

接下来接入到 Spring 工程里

application.properties

spring.application.name=RabbitMQDemospring.rabbitmq.host=localhost # 替换为你的RabbitMQ服务IP
spring.rabbitmq.port=5672
spring.rabbitmq.username=yourUserName
spring.rabbitmq.password=yourPassword

Config 配置类

package vip.erichong.rabbitmqdemo.mq.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;import java.util.HashMap;
import java.util.Map;/*** @author eric*/
@Configuration
public class RabbitMQConfig {// 死信队列相关配置public static final String DEAD_LETTER_EXCHANGE = "dlx.exchange";public static final String DEAD_LETTER_QUEUE = "dlx.queue";public static final String DEAD_LETTER_ROUTING_KEY = "dlx.key";// 业务队列相关配置public static final String WORK_EXCHANGE = "work.exchange";public static final String WORK_QUEUE = "work.queue";public static final String WORK_ROUTING_KEY = "work.key";// 配置死信交换器@BeanDirectExchange deadLetterExchange() {return new DirectExchange(DEAD_LETTER_EXCHANGE);}// 配置死信队列@BeanQueue deadLetterQueue() {return new Queue(DEAD_LETTER_QUEUE, true);}// 绑定死信队列到死信交换器@BeanBinding deadLetterBinding() {return BindingBuilder.bind(deadLetterQueue()).to(deadLetterExchange()).with(DEAD_LETTER_ROUTING_KEY);}// 配置业务交换器@BeanDirectExchange workExchange() {return new DirectExchange(WORK_EXCHANGE);}// 配置业务队列,并设置死信交换器@BeanQueue workQueue() {Map<String, Object> args = new HashMap<>();// 这个是死信队列的关键,指定了 work.queue 消息过期时要移入死信队列args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);return new Queue(WORK_QUEUE, true, false, false, args);}// 绑定业务队列到业务交换器@BeanBinding workBinding() {return BindingBuilder.bind(workQueue()).to(workExchange()).with(WORK_ROUTING_KEY);}// 配置消息转换器@Beanpublic Jackson2JsonMessageConverter jsonMessageConverter() {return new Jackson2JsonMessageConverter();}// 配置RabbitTemplate@Beanpublic RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {RabbitTemplate template = new RabbitTemplate(connectionFactory);template.setMessageConverter(jsonMessageConverter());return template;}
}

生产者类

package vip.erichong.rabbitmqdemo.mq.producer;import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Service;
import vip.erichong.rabbitmqdemo.mq.config.RabbitMQConfig;import java.time.LocalDateTime;/*** @author eric*/
@Service
public class RabbitMQProducer {private final RabbitTemplate rabbitTemplate;public RabbitMQProducer(RabbitTemplate rabbitTemplate) {this.rabbitTemplate = rabbitTemplate;}// 发送延迟消息public void sendDelayedMessage(String message, int delaySeconds) {System.out.println("[" + LocalDateTime.now() + "] 发送延迟消息: " + message + ", 延迟: " + delaySeconds + "秒");rabbitTemplate.convertAndSend(RabbitMQConfig.WORK_EXCHANGE,RabbitMQConfig.WORK_ROUTING_KEY,message,msg -> {msg.getMessageProperties().setExpiration(String.valueOf(delaySeconds * 1000));return msg;});}
}

消费者类

package vip.erichong.rabbitmqdemo.mq.consumer;import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import vip.erichong.rabbitmqdemo.mq.config.RabbitMQConfig;import java.time.LocalDateTime;/*** @author eric*/
@Component
public class MessageConsumer {@RabbitListener(queues = RabbitMQConfig.DEAD_LETTER_QUEUE)public void handleDeadLetterMessage(Message message) {System.out.println("[" + LocalDateTime.now() + "] 收到延迟消息: " + new String(message.getBody()));// 处理延迟后的业务逻辑}
}

测试 Controller

package vip.erichong.rabbitmqdemo.controller;import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;
import vip.erichong.rabbitmqdemo.mq.producer.RabbitMQProducer;import java.util.Optional;/*** @author eric*/
@RestController
@RequestMapping("/message")
public class TestController {@Autowiredprivate RabbitMQProducer rabbitMQProducer;@GetMapping("/pushDelayMessage")public void pushDelayMessage(@RequestParam(value = "message", required = false) String message) {String msg = Optional.ofNullable(message).orElse("delay message");// 延迟 20 秒后,由 MessageConsumer.handleDeadLetterMessage 消费消息rabbitMQProducer.sendDelayedMessage(msg, 20);}
}

把项目启动起来,模拟HTTP请求发一条延迟消息吧,20秒钟之后就能看到消费者接收到消息了。

期间你可以在后台管理页面看到消息会先进到普通队列 work.queue 中,等待了20秒之后消息过期,消息被移入到与之绑定的死信队列 dlx.queue 中,由 MessageConsumer.handleDeadLetterMessage 消费该消息。

恭喜你,已经实现了一个简单的延迟消息功能。

完整的代码 Demo 点我获取

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/bicheng/86376.shtml
繁体地址,请注明出处:http://hk.pswp.cn/bicheng/86376.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Keepalived+Haproxy+Redis三主三从

一、集群部署 1、案例拓扑 2、资源列表 主从节点是随机分配的&#xff0c;下属列表只是框架&#xff1a; 操作系统主机名配置IP应用OpenEuler24master12C4G192.168.10.101RedisOpenEuler24master22C4G192.168.10.102RedisOpenEuler24master32C4G192.168.10.103RedisOpenEule…

Modbus转IEC104网关:电力自动化系统的桥梁

现代电力系统中&#xff0c;变电站、发电厂以及配电网络中存在大量采用不同通信协议的设备。Modbus协议因其简单易用在现场设备中广泛部署&#xff0c;而电力行业主流监控系统则普遍采用IEC 60870-5-104&#xff08;简称IEC104&#xff09;协议。协议差异导致的数据孤岛现象&am…

@annotation:Spring AOP 的“精准定位器“

想象你是一位快递员&#xff0c;负责给一个大型社区送快递。社区里有几百户人家&#xff0c;但只有特定家庭需要特殊服务&#xff1a; 普通快递&#xff1a;直接放快递柜生鲜快递&#xff1a;需要冷藏处理贵重物品&#xff1a;需要本人签收药品快递&#xff1a;需要优先配送 …

Web Worker使用指南 解锁浏览器多线程 ,提升前端性能的利器

文章目录 前言一、什么是 Web Worker二、适用场景1、CPU 密集型计算2、图像/视频处理3、实时数据流处理&#xff08;高频场景&#xff09;4、后台文件操作5、复杂状态机/AI逻辑&#xff08;游戏开发&#xff09;6、长轮询与心跳检测7、WebAssembly 加速8、WebGL 与 Canvas 渲染…

React 18.2.0 源码打包

一、React源码地址 GitHub&#xff1a;React 二、参考文章 sourcemap实战-生成react源码sourcemap Rollup中文文档 JavaScript Source Map 详解 全网最优雅的 React 源码调试方式 三、打包操作 安装依赖 // 全局安装yarn npm i -g yarn // 源码项目目录下执行yarn安装依赖…

UniApp 开发第一个项目

UniApp 开发第一个项目全流程指南,涵盖环境搭建、项目创建、核心开发到调试发布,结合最新实践整理而成,适合零基础快速上手: 🧰 一、环境准备(5分钟) 安装开发工具 HBuilderX(官方推荐IDE):下载 App 开发版,安装路径避免中文或空格 微信开发者工具(调试小程序必备…

Web项目开发中Tomcat10+所需的jar包

版权声明 本文原创作者&#xff1a;谷哥的小弟作者博客地址&#xff1a;http://blog.csdn.net/lfdfhl 项目背景 Web项目中使用低版本Tomcat时常用的jar包如下&#xff1a; javax.servlet-apijavax.ejb-apijavax.jms-apijavax.json-api 当Web项目使用Tomcat10的版本时&#…

网络安全就业方向与现实发展分析:机遇、挑战与未来趋势

网络安全行业的战略地位与就业背景 在数字经济蓬勃发展的今天&#xff0c;网络安全已从技术分支演变为关乎国家安全、企业存亡和个人隐私的核心领域。根据国家网信办数据显示&#xff0c;2025年我国网络安全人才缺口达200万人&#xff0c;较2023年增长33%。这一现象源于三重驱…

iOS runtime随笔-消息转发机制

运行时的消息转发分三步, 当你调用了没有实现的方法时, 有机会通过runtime的消息转发机制补救一下 resolveInstanceMethod/resolveClassMethod 这里可以动态去创建方法来解决CrashforwardingTargetForSelector ​​​​​第一步未解决, 就会走到这里, 可以给出一个Target去转发…

vue3用js+css实现轮播图(可调整堆叠程度)

先看效果 html <divclass"outer"style"width: 650px;background: #fff;box-shadow: 0px 0px 8px rgba(0, 0, 0, 0.1);border-radius: 15px;margin: 0 10px 15px 5px;">//这里用的是svg-icon,需要的可自行替换为其他图片<svg-iconid"btn_l&q…

Three.js项目实战:从零搭建小米SU7三维汽车

大家如果有过购车的经验&#xff0c;肯定会先从网站上收集车辆的信息&#xff0c;比如懂车帝&#xff0c;汽车之家&#xff0c;这些网站上逼真的看车效果是如何实现的呢&#xff0c;这节课带你从0-1快速的手搓一个看车小项目。 懂车帝官网 效果 视频教程和笔记 大家可以下方小…

Android13 永久关闭SELinux 权限

永久关闭 SeLinux 在cmdline中增加参数androidboot.selinuxpermissive&#xff1b; 芯片: QCM6115 版本: Android 13 kernel: msm-4.19 ~/temp_code/SLM927D_LA.UM.9.15$ git diff device/qcom/bengal/BoardConfig.mk diff --git a/device/qcom/bengal/BoardConfig.mk b…

Linux创建DHCP服务

Linux可作为DHCP服务端使用&#xff0c;为同一个网络下的其它机器动态分配ip。在一些情况下&#xff0c;可以起到很大的作用。 二级标题 安装dnsmasq # ubuntu sudo apt update -y sudo apt install -y dnsmasq# centos sudo yum install -y dnsmasq修改配置文件 sudo vim …

汽车4G-TBOX智能终端 汽车国标GB/T 32960协议

汽车国标GB/T 32960协议4G TBOX是一种广泛应用于车联网的设备&#xff0c;下面将从不同方面为你详细介绍。 移动管家汽车4G-TBOX智能终端定义与用途 4G TBOX是基于车联网技术智能服务系统中的采集终端。以车云网的4G TBOX_CC750为例&#xff0c;它为整个智能服务系统提供GPS/…

JavaEE-Mybatis初阶

什么是MyBatis MyBatis是⼀款优秀的 持久层 框架&#xff0c;⽤于简化JDBC的开发。 MyBatis本是 Apache的⼀个开源项⽬iBatis&#xff0c;2010年这个项⽬由apache迁移到了google code&#xff0c;并 且改名为MyBatis 。2013年11⽉迁移到Github 创建项目时添加依赖 上面有…

记一次jvm机器问题定位经历

背景 开发过程中发现机器指标异常&#xff0c;端口也hang住无响应&#xff0c;端口返回为timeout&#xff0c;对应探活检测也失败了。 现象 在st测试环节&#xff0c;突然每隔一段时间新接口就hang住无响应&#xff0c;观察机器监控也发现端口探活失败&#xff0c;看机器指标…

【机器学习深度学习】张量基本操作

目录 一、张量基本操作 1.1 执行代码 1.2 运行结果 1.3 代码解析 ✅ 1. 创建张量&#xff08;tensor、randn、zeros&#xff09; ✅ 2. 索引与切片&#xff08;类似 NumPy&#xff09; ✅ 3. 形状变换&#xff08;reshape、转置、压缩&#xff09; ✅ 4. 数学运算&#x…

【微信小程序】8、获取用户当前的地理位置

1、获取当前的地理位置 获取当前的地理位置、速度。当用户离开小程序后&#xff0c;此接口无法调用。开启高精度定位&#xff0c;接口耗时会增加&#xff0c;可指定 highAccuracyExpireTime 作为超时时间。 注意&#xff1a; 地图相关使用的坐标格式应为 gcj02。高频率调用会…

Jenkins 常用定时构建脚本

Jenkins 常用定时构建脚本 Jenkins 使用 cron 风格的语法来配置定时构建任务&#xff0c;以下是常用的定时构建脚本示例和说明&#xff1a; 基本语法 Jenkins 定时构建使用五个字段表示时间&#xff0c;格式为&#xff1a; MINUTE HOUR DOM MONTH DOWMINUTE - 分钟 (0-59)H…

ActiveMQ漏洞复现

以下内容均在nextcyber靶场环境中完成&#xff0c;请勿用于非法途径&#xff01; ActiveMQ 反序列化漏洞&#xff08;CVE-2015-5254&#xff09; Apache ActiveMQ是美国阿帕奇&#xff08;Apache&#xff09;软件基金会所研发的一套开源的消息中间件&#xff0c;它支持Java消息…