什么是死信队列?死信队列是如何导致的?

死信交换机(Dead Letter Exchange,DLX)

  • 定义:死信交换机是一种特殊的交换机,专门用于**接收从其他队列中因特定原因变成死信的消息**。它的本质还是交换机,遵循RabbitMQ中交换机的基本工作原理,如根据路由规则将消息发送到绑定的队列。
  • 作用:为死信提供一个集中处理的入口点。通过将死信发送到死信交换机,再由其路由到相应的死信队列,可以方便地对这些异常消息进行统一管理和处理,确保数据不丢失。

死信队列(Dead Letter Queue,DLQ)

  • 定义:死信队列用于存储那些无法在正常流程中被消费的消息,即死信。这些消息进入死信队列后,可以后续进行分析、重试或其他特殊处理。
  • 产生死信的原因
    • 消息被拒绝且不重新入队:消费者调用basic.rejectbasic.nack方法拒绝消息,并将requeue参数设置为false,表明该消息不再重新放回原队列等待消费,从而成为死信。
    • 消息过期:可以为消息或队列设置生存时间(TTL,Time-To-Live)。当消息在队列中的存活时间超过设定的TTL值时,消息就会过期成为死信。消息的TTL既可以在发送消息时针对单条消息设置,也可以在声明队列时对队列中的所有消息统一设置。
    • 队列达到最大长度:当为队列设置了最大长度(Max-Length),并且队列中的消息数量达到这个上限时,新进入的消息会被丢弃成为死信。

代码举例

下面将用代码举例,由于消息过期而进入死信队列

初始化RabbitMQ的连接配置、队列和交换机的声明

/*** RabbitMQ配置类* 负责管理RabbitMQ的连接配置、队列和交换机的声明*/
@Slf4j
public class RabbitMQConfig {// 普通队列和死信队列的配置常量public static final String NORMAL_QUEUE = "normal.queue";      // 普通队列名称public static final String DLX_QUEUE = "dlx.queue";           // 死信队列名称public static final String NORMAL_EXCHANGE = "normal.exchange"; // 普通交换机名称public static final String DLX_EXCHANGE = "dlx.exchange";     // 死信交换机名称public static final String NORMAL_ROUTING_KEY = "normal.routing.key"; // 普通路由键public static final String DLX_ROUTING_KEY = "dlx.routing.key";      // 死信路由键/*** 创建RabbitMQ连接** @return Connection RabbitMQ连接对象* @throws Exception*/public static Connection createConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("xxxx");    // 设置RabbitMQ服务器地址factory.setPort(5672);           // 设置RabbitMQ服务器端口factory.setUsername("xxxx");    // 设置用户名factory.setPassword("xxxx");    // 设置密码return factory.newConnection();  // 创建并返回新的连接}/*** 初始化RabbitMQ的队列和交换机* 包括:* 1. 删除已存在的队列和交换机* 2. 声明死信交换机和队列* 3. 声明普通交换机和队列* 4. 设置队列的死信参数* 5. 绑定队列和交换机** @throws Exception*/public static void init() throws Exception {try (Connection connection = createConnection();Channel channel = connection.createChannel()) {// 删除已存在的队列和交换机try {channel.queueDelete(NORMAL_QUEUE);channel.queueDelete(DLX_QUEUE);channel.exchangeDelete(NORMAL_EXCHANGE);channel.exchangeDelete(DLX_EXCHANGE);} catch (Exception e) {// 忽略删除不存在的队列或交换机时的错误log.warn("删除队列或交换机时出错(可能是首次创建): {}", e.getMessage());}// 声明死信交换机,类型为direct,持久化channel.exchangeDeclare(DLX_EXCHANGE, "direct", true);// 声明死信队列,持久化channel.queueDeclare(DLX_QUEUE, true, false, false, null);// 将死信队列绑定到死信交换机,使用死信路由键channel.queueBind(DLX_QUEUE, DLX_EXCHANGE, DLX_ROUTING_KEY);// 声明普通交换机,类型为direct,持久化channel.exchangeDeclare(NORMAL_EXCHANGE, "direct", true);// 设置普通队列的死信参数Map<String, Object> args = new HashMap<>();args.put("x-dead-letter-exchange", DLX_EXCHANGE);     // 设置死信交换机args.put("x-dead-letter-routing-key", DLX_ROUTING_KEY); // 设置死信路由键// 声明普通队列,并应用死信参数channel.queueDeclare(NORMAL_QUEUE, true, false, false, args);// 将普通队列绑定到普通交换机,使用普通路由键channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, NORMAL_ROUTING_KEY);}}
} 

消息生产者

/*** 消息生产者类* 负责向RabbitMQ发送消息*/
@Slf4j
public class MessageProducer {/*** 发送消息到普通队列* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 将消息发布到普通交换机* 3. 使用try-with-resources自动关闭连接和通道* * @param message 要发送的消息内容* @throws Exception */public void sendMessage(String message) throws Exception {// 使用try-with-resources自动管理连接和通道的关闭try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 打印发送的消息内容log.info("发送消息: {}", message);// 发布消息到普通交换机// 参数说明:// 1. 交换机名称// 2. 路由键// 3. 消息属性(这里为null表示使用默认属性)// 4. 消息内容(转换为字节数组)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,null,message.getBytes());}}/*** 发送带TTL的消息到普通队列* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置消息的TTL属性* 3. 将消息发布到普通交换机* 4. 使用try-with-resources自动关闭连接和通道* * @param message 要发送的消息内容* @param ttl 消息的过期时间(毫秒)* @throws Exception 如果发送过程中出现错误则抛出异常*/public void sendMessageWithTTL(String message, int ttl) throws Exception {// 使用try-with-resources自动管理连接和通道的关闭try (Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel()) {// 设置消息属性,包括TTLAMQP.BasicProperties properties = new AMQP.BasicProperties.Builder().expiration(String.valueOf(ttl)).build();// 打印发送的消息内容log.info("发送消息: {}, TTL: {}ms", message, ttl);// 发布消息到普通交换机// 参数说明:// 1. 交换机名称// 2. 路由键// 3. 消息属性(包含TTL)// 4. 消息内容(转换为字节数组)channel.basicPublish(RabbitMQConfig.NORMAL_EXCHANGE,RabbitMQConfig.NORMAL_ROUTING_KEY,properties,message.getBytes());}}
} 

消息消费者

/*** 消息消费者类* 负责从普通队列和死信队列中消费消息*/
@Slf4j
public class MessageConsumer {/*** 消费普通队列中的消息* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置预取计数为1,确保公平分发* 3. 创建消费者回调处理消息* 4. 确认消息处理完成** @throws Exception 异常*/public void consumeNormalQueue() throws Exception {// 创建RabbitMQ连接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1,确保公平分发,避免某个消费者处理过多消息channel.basicQos(1);// 创建普通队列消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息内容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到普通队列消息: {}", message);// 模拟消息处理耗时try {Thread.sleep(20000);} catch (InterruptedException e) {throw new RuntimeException(e);}// 确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费普通队列// 参数说明:// 1. 队列名称// 2. 是否自动确认消息(false表示手动确认)// 3. 消息处理回调// 4. 消费者取消回调(这里为空实现)channel.basicConsume(RabbitMQConfig.NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});}/*** 消费死信队列中的消息* 该方法会:* 1. 创建RabbitMQ连接和通道* 2. 设置预取计数为1,确保公平分发* 3. 创建消费者回调处理消息* 4. 确认消息处理完成** @throws Exception 异常*/public void consumeDlxQueue() throws Exception {// 创建RabbitMQ连接和通道Connection connection = RabbitMQConfig.createConnection();Channel channel = connection.createChannel();// 设置预取计数为1,确保公平分发channel.basicQos(1);// 创建死信队列消费者回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {// 获取消息内容String message = new String(delivery.getBody(), StandardCharsets.UTF_8);log.info("收到死信队列消息: {}", message);// 确认消息处理完成channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);};// 开始消费死信队列// 参数说明:// 1. 队列名称// 2. 是否自动确认消息(false表示手动确认)// 3. 消息处理回调// 4. 消费者取消回调(这里为空实现)channel.basicConsume(RabbitMQConfig.DLX_QUEUE, false, deliverCallback, consumerTag -> {});}
} 

测试

@Slf4j
public class DLXTest {private static final int THREAD_COUNT = 1;  // 并发线程数private static final int MESSAGE_COUNT = 2; // 每个线程发送的消息数/*** 主方法,执行死信队列测试流程* 测试流程:* 1. 初始化RabbitMQ的队列和交换机* 2. 创建生产者和消费者实例* 3. 启动普通队列和死信队列的消费者线程* 4. 使用线程池发送测试消息* 5. 等待消息处理完成** @param args* @throws Exception*/public static void main(String[] args) throws Exception {// 初始化RabbitMQ的队列和交换机RabbitMQConfig.init();// 创建生产者和消费者实例MessageProducer producer = new MessageProducer();MessageConsumer consumer = new MessageConsumer();// 启动普通队列消费者线程new Thread(() -> {try {consumer.consumeNormalQueue();} catch (Exception e) {log.error("普通队列消费者异常", e);}}).start();// 启动死信队列消费者线程new Thread(() -> {try {consumer.consumeDlxQueue();} catch (Exception e) {log.error("死信队列消费者异常", e);}}).start();// 创建线程池和计数器ExecutorService executorService = Executors.newFixedThreadPool(THREAD_COUNT);CountDownLatch latch = new CountDownLatch(THREAD_COUNT);// 提交任务到线程池for (int i = 0; i < THREAD_COUNT; i++) {final int threadId = i;executorService.submit(() -> {try {// 每个线程发送MESSAGE_COUNT条消息for (int j = 0; j < MESSAGE_COUNT; j++) {// 随机生成消息TTL(1-30秒)int ttl = (int) (Math.random() * 30000) + 1000;String message = String.format("消息-线程%d-第%d条 (消息TTL: %dms)", threadId + 1, j + 1, ttl);producer.sendMessageWithTTL(message, ttl);// 随机延迟0-100ms,模拟真实场景Thread.sleep((long) (Math.random() * 100));}} catch (Exception e) {log.error("发送消息异常", e);} finally {latch.countDown();}});}// 等待所有消息发送完成latch.await();log.info("所有消息已发送完成");// 关闭线程池executorService.shutdown();executorService.awaitTermination(1, TimeUnit.MINUTES);// 保持程序运行,等待消息处理完成Thread.sleep(60000);log.info("测试完成");}
} 

从结果可以看出,第一条消息 ttl 为 28301ms,被普通消费者进行消费,而产生的第二条消息得到 ttl 为 4332ms,由于第一条消息在消费时耗时较久,在此期间 第二条消息已经过期,不得不进入死信队列,由死信消费者进行处理,从前面的日志时间也可以看出,刚好间隔 4s 左右。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/diannao/82256.shtml
繁体地址,请注明出处:http://hk.pswp.cn/diannao/82256.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

9. 从《蜀道难》学CSS基础:三种选择器的实战解析

引言&#xff1a;当古诗遇上现代网页设计 今天我们通过李白的经典诗作《蜀道难》来学习CSS的三种核心选择器。这种古今结合的学习方式&#xff0c;既能感受中华诗词的魅力&#xff0c;又能掌握实用的网页设计技能。让我们开始这场穿越时空的技术之旅吧&#xff01; 一、HTML骨架…

三角网格减面算法及其代表的算法库都有哪些?

以下是三角网格减面算法及其代表库/工具的详细分类&#xff0c;涵盖经典算法和现代实现&#xff1a; ​​1. 顶点聚类&#xff08;Vertex Clustering&#xff09;​​ ​​原理​​&#xff1a;将网格空间划分为体素栅格&#xff0c;合并每个栅格内的顶点。​​特点​​&#…

URP - 屏幕图像(_CameraOpaqueTexture)

首先需要在unity中开启屏幕图像开关才可以使用该纹理 同样只有不透明对象才能被渲染到屏幕图像中 若想要该对象不被渲染到屏幕图像中&#xff0c;可以将其Shader的渲染队列改为 "Queue" "Transparent" 如何在Shader中使用_CameraOpaqueTexture&#xf…

vue 和 html 的区别

使用 Vue.js 和原生 HTML 开发 Web 应用有显著的区别&#xff0c;主要体现在开发模式、功能扩展、性能优化和维护性等方面。以下是两者的对比分析&#xff1a; &#x1f9f1; 原生 HTML&#xff08;HTML CSS JavaScript&#xff09; 特点&#xff1a; 静态结构&#xff1a;H…

LeetCode[226] 翻转二叉树

思路&#xff1a; 使用递归&#xff0c;归根结底还是左右节点互相倒&#xff0c;那么肯定需要一个temp节点在中间传递&#xff0c;最后就是递归&#xff0c;没什么说的 代码&#xff1a; /*** Definition for a binary tree node.* public class TreeNode {* int …

幂等的几种解决方案以及实践

目录 什么是幂等&#xff1f; 解决幂等的常见解决方案&#xff1a; 唯一标识符案例 数据库唯一约束 案例 乐观锁案例 分布式锁&#xff08;Distributed Locking&#xff09; 实践精选方案 首先 为什么不直接使用分布式锁呢&#xff1f; 自定义实现幂等组件&#xff01…

PowerShell中的Json处理

1.定义JSON字符串变量 PS C:\WINDOWS\system32> $body {"Method": "POST","Body": {"model": "deepseek-r1","messages": [{"content": "why is the sky blue?","role"…

奥威BI:AI+BI深度融合,重塑智能AI数据分析新标杆

在数字化浪潮席卷全球的今天&#xff0c;企业正面临着前所未有的数据挑战与机遇。如何高效、精准地挖掘数据价值&#xff0c;已成为推动业务增长、提升竞争力的核心议题。奥威BI&#xff0c;作为智能AI数据分析领域的领军者&#xff0c;凭借其创新的AIBI融合模式&#xff0c;正…

【Linux网络】网络协议基础

网络基础 计算机网络背景 独立模式:计算机之间相互独立 网络互联:多台计算机连接在一起,完成数据共享 局域网LAN:计算机数量更多了,通过交换机和路由器连接在一起 广域网WAN:将远隔千里的计算机都连在一起 所谓"局域网"和"广域网"只是一个相对的概念.比…

LabVIEW表面粗糙度测量及算法解析

在制造业和科研领域&#xff0c;表面粗糙度测量对保障产品质量、推动材料研究意义重大。表面粗糙度作为衡量工件表面加工质量的关键指标&#xff0c;直接影响着工件诸如磨损、密封、疲劳等机械性能。随着技术的发展&#xff0c;LabVIEW 在表面粗糙度测量及数据处理中发挥着不可…

深入探索 JavaScript 中的模块对象

引言 在现代 JavaScript 开发中&#xff0c;模块化编程是一项至关重要的技术。它允许开发者将代码拆分成多个独立的模块&#xff0c;每个模块专注于单一功能&#xff0c;从而提高代码的可维护性、可测试性和复用性。而模块对象则是模块化编程中的核心概念之一&#xff0c;它为…

Linux——Mysql数据库

目录 一&#xff0c;数据库简介 二&#xff0c;数据库的基本概念 1&#xff0c;数据 2&#xff0c;数据库和数据库表 3&#xff0c;数据库管理系统和数据库系统 三&#xff0c;主流数据库介绍 四&#xff0c;数据库的两大类型 1&#xff0c;关系型数据库 主键 外键 2…

73页最佳实践PPT《DeepSeek自学手册-从理论模型训练到实践模型应用》

这份文档是一份关于 DeepSeek 自学手册的详细指南&#xff0c;涵盖了 DeepSeek V3 和 R1 模型的架构、训练方法、性能表现以及使用技巧等内容。它介绍了 DeepSeek V3 作为强大的 MoE 语言模型在数学、代码等任务上的出色表现以及其训练过程中的创新架构如多头潜在注意力和多 To…

LabVIEW 2019 与 NI VISA 20.0 安装及报错处理

在使用 Windows 11 操作系统的电脑上&#xff0c;同时安装了 LabVIEW 2019 32 位和 64 位版本的软件。此前安装的 NI VISA 2024 Q1 版&#xff0c;该版本与 LabVIEW 2019 32 位和 64 位不兼容&#xff0c;之后重新安装了 NI VISA 20.0。从说明书来看&#xff0c;NI VISA 20.0 …

基于Centos7的DHCP服务器搭建

一、准备实验环境&#xff1a; 克隆两台虚拟机 一台作服务器&#xff1a;DHCP Server 一台作客户端&#xff1a;DHCP Clinet 二、部署服务器 在网络模式为NAT下使用yum下载DHCP 需要管理员用户权限才能下载&#xff0c;下载好后关闭客户端&#xff0c;改NAT模式为仅主机模式…

最全盘点,赶紧收藏:2025 年全网最全的 Java 技术栈内容梳理(持续更新中)

大家好&#xff0c;我是栗筝i&#xff0c;是一个拥有 5 年经验的 Java 开发工程师和技术博主&#xff0c;曾有多年在国内某大厂工作的经历。从 2022 年 10 月份开始&#xff0c;我将持续梳理出全面的 Java 技术栈内容&#xff0c;一方面是对自己学习内容进行整合梳理&#xff0…

【项目实践】boost 搜索引擎

1. 项目展示 boost搜索引擎具体讲解视频 2. 项目背景 对于boost库&#xff0c;官方是没有提供搜索功能的&#xff0c;我们这个项目就是来为它添加一个站内搜索的功能。 3. 项目环境与技术栈 • 项目环境&#xff1a; ubuntu22.04、vscode • 技术栈&#xff1a; C/C、C11、S…

一个简单的MCP测试与debug

最近MCP挺火&#xff0c;我也跟着网上教程试试&#xff0c;参考如下&#xff0c;感谢原博主分享&#xff1a; https://zhuanlan.zhihu.com/p/1891227835722606201https://zhuanlan.zhihu.com/p/1891227835722606201 MCP是啥&#xff1f;技术原理是什么&#xff1f;一个视频搞…

深度学习系统学习系列【7】之卷积神经网络(CNN)

文章目录 说明卷积神经网络概述(Convolutional Neural Network,CNN)卷积神经网络的应用图像分类与识别图像着色自然语言处理NLP卷积神经网络的结构卷积神经网络中的数据流动 CNN与ANN的关系 卷积操作Padding 操作滑动窗口卷积操作网络卷积层操作矩阵快速卷积Im2col算法GEMM算法…

事务隔离(MySQL)

事务隔离 —— 为什么你改了我还看不见&#xff1f; 在数据库中&#xff0c;事务&#xff08;Transaction&#xff09; 用于保证一组数据库操作要么全部成功&#xff0c;要么全部失败&#xff0c;这是一种原子性的操作机制。在 MySQL 中&#xff0c;事务的支持由存储引擎层实现…