RabbitMQ 知识详解(Java版)

RabbitMQ 知识详解(Java版)

RabbitMQ 是一个开源的消息代理,实现了高级消息队列协议(AMQP)。它用于在分布式系统中实现应用解耦、异步通信和流量削峰。


核心概念
  1. 生产者(Producer):发送消息的应用
  2. 消费者(Consumer):接收消息的应用
  3. 队列(Queue):消息存储的缓冲区
  4. 交换机(Exchange):接收消息并路由到队列
  5. 绑定(Binding):连接交换机和队列的规则
  6. 路由键(Routing Key):消息的路由标识

交换机类型
类型路由规则典型用途
Direct精确匹配Routing Key点对点通信
Topic模式匹配(支持通配符)多条件路由
Fanout广播到所有绑定队列发布/订阅
Headers消息头键值对匹配复杂路由

Java 示例(使用官方客户端)

依赖:

<dependency><groupId>com.rabbitmq</groupId><artifactId>amqp-client</artifactId><version>5.9.0</version>
</dependency>
<dependency><groupId>org.slf4j</groupId><artifactId>slf4j-nop</artifactId><version>1.7.30</version>
</dependency>

1. 直连交换机(Direct Exchange)

// Producer
public class DirectExchangeProducer {private static final String EXCHANGE_NAME = "direct_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "direct");// 绑定不同路由键String routingKey1 = "red";String message1 = "重要消息";channel.basicPublish(EXCHANGE_NAME, routingKey1, null, message1.getBytes());System.out.println("发送消息: " + message1);String routingKey2 = "blue";String message2 = "普通消息";channel.basicPublish(EXCHANGE_NAME, routingKey2, null, message2.getBytes());System.out.println("发送消息: " + message2);}}
}// Consumer (红色队列)
public class DirectConsumerRed {private static final String EXCHANGE_NAME = "direct_exchange";private static final String QUEUE_NAME = "red_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "direct");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "red");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("红色队列收到消息: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

2. 扇出交换机(Fanout Exchange)

// Producer
public class FanoutExchangeProducer {private static final String EXCHANGE_NAME = "fanout_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "fanout");String message = "广播消息";channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());System.out.println("广播消息已发送");}}
}// Consumer (邮件队列)
public class FanoutConsumerEmail {private static final String EXCHANGE_NAME = "fanout_exchange";private static final String QUEUE_NAME = "email_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "fanout");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("邮件服务收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

3. 主题交换机(Topic Exchange)

// Producer
public class TopicExchangeProducer {private static final String EXCHANGE_NAME = "topic_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "topic");// 发送不同主题的消息String routingKey = "order.create";String message = "订单创建通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("发送订单创建消息");routingKey = "user.login";message = "用户登录通知";channel.basicPublish(EXCHANGE_NAME, routingKey, null, message.getBytes());System.out.println("发送用户登录消息");}}
}// Consumer (订单服务)
public class TopicConsumerOrder {private static final String EXCHANGE_NAME = "topic_exchange";private static final String QUEUE_NAME = "order_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "topic");channel.queueDeclare(QUEUE_NAME, false, false, false, null);channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "order.*");DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("订单服务收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

4. 头交换机(Headers Exchange)

// Producer
public class HeadersExchangeProducer {private static final String EXCHANGE_NAME = "headers_exchange";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");try (Connection connection = factory.newConnection();Channel channel = connection.createChannel()) {channel.exchangeDeclare(EXCHANGE_NAME, "headers");// 设置消息头Map<String, Object> headers = new HashMap<>();headers.put("type", "log");headers.put("level", "error");AMQP.BasicProperties props = new AMQP.BasicProperties.Builder().headers(headers).build();String message = "系统错误日志";channel.basicPublish(EXCHANGE_NAME, "", props, message.getBytes());System.out.println("发送错误日志消息");}}
}// Consumer (日志服务)
public class HeadersConsumerLog {private static final String EXCHANGE_NAME = "headers_exchange";private static final String QUEUE_NAME = "log_queue";public static void main(String[] args) throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost("localhost");Connection connection = factory.newConnection();Channel channel = connection.createChannel();channel.exchangeDeclare(EXCHANGE_NAME, "headers");channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 设置匹配规则 (必须包含type=log且level=error)Map<String, Object> bindingArgs = new HashMap<>();bindingArgs.put("x-match", "all"); // 全部匹配bindingArgs.put("type", "log");bindingArgs.put("level", "error");channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "", bindingArgs);DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("日志服务收到: " + message);};channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

通用配置说明

  1. 交换机类型总结:

    • 直连交换机:路由键精确匹配
    • 扇出交换机:忽略路由键,广播所有绑定队列
    • 主题交换机:使用通配符匹配路由键
    • 头交换机:通过消息头属性匹配(非路由键)
  2. 重要参数:

    • channel.queueDeclare() 参数说明:
      • durable: 是否持久化
      • exclusive: 是否排他
      • autoDelete: 是否自动删除
    • x-match参数在头交换机中有两种模式:
      • “all”: 需匹配所有指定头
      • “any”: 匹配任意指定头

关键特性(Java实现)

1. 消息持久化
// 声明持久化队列
boolean durable = true;
channel.queueDeclare("task_queue", durable, false, false, null);// 发送持久化消息
channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN,message.getBytes());
2. 公平分发(Prefetch)
// 每次只分发一条消息
int prefetchCount = 1;
channel.basicQos(prefetchCount);
3. 消息确认(ACK)
// 消费者关闭自动ACK
boolean autoAck = false;
channel.basicConsume(queueName, autoAck, deliverCallback, consumerTag -> {});// 处理完成后手动ACK
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
4. 持久化消费者
// 重启后自动恢复的消费者
Map<String, Object> args = new HashMap<>();
args.put("x-queue-type", "quorum");
channel.queueDeclare("persistent_queue", true, false, false, args);

使用场景

  1. 服务解耦:订单系统与库存系统分离
  2. 异步处理:耗时操作(如邮件发送)
  3. 流量削峰:突发请求缓冲(秒杀系统)
  4. 分布式事务:最终一致性实现
  5. 日志收集:多系统日志聚合

最佳实践

  1. 连接管理:使用连接池(如Spring AMQP的CachingConnectionFactory)
  2. 异常处理:实现Consumer和Connection的监听器
  3. 死信队列:处理失败消息
  4. 集群部署:保证高可用性
  5. 监控管理:使用RabbitMQ Management Plugin

提示:生产环境推荐使用Spring AMQP简化开发,它提供了RabbitTemplate和@RabbitListener等便捷工具。

建议运行测试时:

  1. 先启动所有消费者
  2. 再运行生产者发送消息
  3. 观察各消费者接收到的消息是否符合路由规则

以上示例展示了RabbitMQ的核心路由机制,在实际生产环境中需添加异常处理、连接恢复、消息确认等机制。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/bicheng/84968.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

Flink task、Operator 和 UDF 之间的关系

要真正驾驭 Flink 并构建出高效、稳定、可扩展的流处理应用&#xff0c;仅仅停留在 API 的表面使用是远远不够的。深入理解其内部的运行机制&#xff0c;洞悉数据从代码到分布式执行的完整生命周期&#xff0c;以及明晰各个核心组件之间错综复杂而又协同工作的关系&#xff0c;…

Veeam Backup Replication系统的安装与使用

Veeam Backup & Replication系统安装与使用 系统简介 核心功能 备份与恢复&#xff1a;专注于虚拟化环境&#xff08;如VMware和Hyper-V&#xff09;的备份与恢复&#xff0c;支持物理服务器和云环境。快速恢复&#xff1a;提供即时恢复功能&#xff0c;可在几分钟内恢复…

十四、【ESP32全栈开发指南:搭建轻量级HTTP服务器】

一、HTTP协议基础 HTTP&#xff08;Hyper Text Transfer Protocol&#xff09;作为互联网基础协议&#xff0c;采用请求-响应模型工作&#xff1a; 1.1 HTTP请求组成 GET /uri?query1value1 HTTP/1.1 // 请求行&#xff08;方法URI协议版本&#xff09; Host: example…

java中LinkedList和ArrayList的区别和联系?

我们被要求解释Java中LinkedList和ArrayList的区别和联系。下面将分别从实现原理、性能特点、使用场景等方面进行详细说明&#xff0c;并总结它们的相同点和不同点。 # 一、联系&#xff08;共同点&#xff09; 1. 都实现了List接口&#xff0c;因此具有List接口的所有方法&…

明远智睿SD2351核心板:边缘计算时代的工业级核心引擎深度解析

在工业4.0与物联网深度融合的背景下&#xff0c;边缘计算设备正从单一功能模块向高集成度、智能化平台演进。明远智睿推出的SD2351核心板&#xff0c;凭借其异构计算架构、工业级接口资源和全栈技术生态&#xff0c;重新定义了边缘计算设备的性能边界。本文将从技术架构、场景适…

Flask 动态模块注册

目录 1. 项目概述2. 项目结构3. 核心组件解析3.1 动态模块注册系统 (api/__init__.py)3.2 应用程序入口 (setup_demo.py) 4. 模块开发指南4.1 标准模块 (*_app.py)4.2 SDK模块 (sdk/*.py) 5. URL路径规则6. 如何使用6.1 启动应用6.2 添加新模块 7. 工作原理 1. 项目概述 这个项…

JVM 内存、JMM内存与集群机器节点内存的联系

目录 1、JVM 内存 1.1、分配机制 1.2、jvm模型位置 1.3、字节码内存块 2、JMM内存 2.1、JMM模型 2.2、工作流程图 1、工作内存与主内存的交互 2. 多线程下的主内存与堆内存交互 2.3、 主内存与工作内存的同步方案 1、volatile 2、synchronized 3、final 3、内存使…

学习昇腾开发的第一天--环境配置

1、昇腾社区官网&#xff1a;昇腾社区官网-昇腾万里 让智能无所不及 2、产品-->选择开发者套件-->点击制卡工具的下载&#xff1a;资源-Atlas 200I DK A2-昇腾社区 3、如果制卡工具不能使用在线制卡&#xff0c;可以下载镜像到本地使用本地制卡&#xff1a;Linux系统制…

Android WebView 深色模式适配方案总结

Android WebView 深色模式适配方案总结 在 Android WebView 中适配深色模式&#xff08;Dark Mode&#xff09;是一个常见的需求&#xff0c;尤其是当加载的网页没有原生支持 prefers-color-scheme 时。本文将介绍 3 种主流方案&#xff0c;并分析它们的优缺点&#xff0c;帮助…

项目练习:使用mybatis的foreach标签,实现union all的拼接语句

文章目录 一、需求说明二、需求分析三、代码实现四、报表效果 一、需求说明 在sql查询数据后&#xff0c;对数据分组统计。并最后进行总计。 二、需求分析 最终&#xff0c;我想用sql来实现这个统计和查询的功能。 那么&#xff0c;怎么又查询&#xff0c;又统计了&#xf…

7.7 Extracting and saving responses

Chapter 7-Fine-tuning to follow instructions 7.7 Extracting and saving responses 在本节中&#xff0c;我们保存测试集响应以便在下一节中评分&#xff0c;除此之外保存模型的副本以供将来使用。 ​ 首先&#xff0c;让我们简单看看finetuned模型生成的响应 torch.manu…

计算机网络第3章(上):数据链路层全解析——组帧、差错控制与信道效率

目录 一、数据链路层的功能二、组帧2.1 字符计数法&#xff08;Character Count&#xff09;2.2 字符填充法&#xff08;Character Stuffing&#xff09;2.3 零比特填充法2.4 违规编码法 三、差错控制3.1 检错编码&#xff08;奇偶校验码&#xff09;3.2 循环冗余校验&#xff…

铸铁试验平台的重要性及应用前景

铸铁作为一种重要的金属材料&#xff0c;在工业生产中扮演着举足轻重的角色。为了确保铸铁制品的质量和性能&#xff0c;铸铁材料的试验是必不可少的环节。而铸铁试验平台则是进行铸铁试验的关键设备之一&#xff0c;它为铸铁材料的研究和开发提供了重要的技术支持。本文将探讨…

std::shared_ptr引起内存泄漏的例子

目录 一、循环引用&#xff08;最常见场景&#xff09; 示例代码 内存泄漏原因 二、共享指针管理的对象包含自身的 shared_ptr 示例代码 内存泄漏&#xff08;或双重释放&#xff09;原因 三、解决方案 1. 循环引用&#xff1a;使用 std::weak_ptr 2. 对象获取自身的 …

AI 知识数据库搭建方案:从需求分析到落地实施

AI 知识数据库的搭建需结合业务场景、数据特性与技术架构&#xff0c;形成系统化解决方案。以下是一套完整的搭建框架&#xff0c;涵盖规划、设计、实施及优化全流程&#xff1a; 一、前期规划&#xff1a;需求分析与目标定义 1. 明确业务场景与知识需求 场景导向&#xff1a…

Tensorflow 基础知识:变量、常量、占位符、Session 详解

在深度学习领域,TensorFlow 是一个广泛使用的开源机器学习框架。想要熟练使用 TensorFlow 进行模型开发,掌握变量、常量、占位符和 Session 这些基础知识是必不可少的。接下来,我们就深入了解一下它们的概念、用处,并通过代码示例进行演示。 一、常量(Constant) 常量,顾…

linux 常见问题之如何清除大文件的内容

linux 常见问题之如何清除大文件的内容 在 Linux 系统中&#xff0c;我们有时会遇到文件随着时间增长变得巨大&#xff0c;最常见的就是服务器的日志文件&#xff0c;随着时间的推移占用大量的磁盘空间&#xff0c;下面介绍如何清楚大文件的内容&#xff0c;当然避免文件内容过…

薛定谔的猫思想实验如何推演到量子计算

前言 这是我的选修课作业&#xff0c;但是我并不喜欢小论文方式的写法&#xff0c;死板又老套。先在这打一份底稿。 薛定谔的猫 可能一说到量子这个关键词&#xff0c;大家第一时间都会想到的是“薛定谔的猫”。 实验介绍 薛定谔的猫是一个著名的思想实验&#xff0c;由奥…

嵌入式开发中fmacro-prefix-map选项解析

在嵌入式开发中&#xff0c;-fmacro-prefix-map 是 GCC 和 Clang 等编译器提供的一个路径映射选项&#xff0c;主要用于在预处理阶段重写宏定义中出现的绝对路径。它的核心目的是解决以下问题&#xff1a; 核心作用 构建可重现性 消除编译输出&#xff08;如 .o、.d 文件&…

Javaweb学习——day3(Servlet 中处理表单数据)

文章目录 一、概念学习1. GET vs POST 请求方式的区别2. HttpServletRequest 获取表单数据 二、代码讲解与练习第 1 步&#xff1a;在 webapp 下创建 login.html第 2 步&#xff1a;在 com.example 包下创建 LoginServlet第 3 步&#xff1a;修改 web.xml 注册 LoginServlet第 …