#RabbitMQ# 消息队列入门

目录

一 MQ技术选型

1 运行rabbitmq

2 基本介绍

3 快速入门

1 交换机负责路由消息给队列

2 数据隔离

二 Java客户端

1 快速入门

2 WorkQueue

3 FanOut交换机

4 Direct交换机

5 Topic交换机

*6 声明队列交换机

1 在配置类当中声明

2 使用注解的方式指定

7 消息转换器


*前景引入

维度异步通讯同步通讯RabbitMQ 的定位
交互方式通过中间件间接通信,无阻塞等待直接通信,需实时响应作为异步通讯的核心载体,支持消息缓存与路由
耦合度低(生产者和消费者解耦)高(调用方依赖被调用方可用性)通过队列解耦系统,提升容错性
适用场景高并发、耗时任务、事件驱动架构实时性要求高的简单交互天然适合异步场景,也可通过 RPC 支持同步需求
性能与扩展性高吞吐,支持水平扩展受限于实时响应能力通过集群、负载均衡优化异步性能

一 MQ技术选型

MQ(message Queue)消息队列,字面来看就是存放消息的队列。也就是异步调用中的Broke。

1 运行rabbitmq

在虚拟机上安装Docker_虚拟机安装docker-CSDN博客

拉取镜像

  • docker pull rabbitmq:3-management

在容器当中运行

  • docker run ...

借助端口访问

2 基本介绍

核心概念总结

角色作用类比
Publisher发送消息的程序寄信人
Exchange按规则将消息分发到队列邮局分拣员
Queue存储消息的容器邮箱
Consumer从队列取消息并处理的程序收信人
Virtual Host隔离不同业务的消息环境(如测试、生产)邮局内的独立部门

3 快速入门

1 交换机负责路由消息给队列

添加成功

找到一台交换机

需要添加绑定队列从而实现路由给队列

消息路由成功

2 数据隔离

RabbitMQ 中的 虚拟主机(vhost) 可以用一个简单的比喻来理解:它就像一台大型服务器中的“独立房间”,每个房间都有自己的门禁系统、家具和规则,互不干扰。以下是它的核心作用:

实现:

先添加一个用户

现在这个用户还没有虚拟主机,这里其是无法访问之前创建的队列,是与之前的虚拟主机隔离开的

现在退出原先的用户,以刚刚创建的用户信息登录,然后添加一个虚拟主机

现在就可以在现在的用户之下的虚拟主机上创建新的队列

二 Java客户端

1 快速入门

实现:

1 导入spring-amqp依赖

        <dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

2 添加队列

3 配置MQ地址

4 发送消息

    @Autowiredprivate RabbitTemplate rabbitTemplate;@Testvoid testSendMessage2Queue() {String queueName = "simple.queue1";String msg = "hello, amqp!";rabbitTemplate.convertAndSend(queueName, msg);}

5 队列

6 在消费者的相关方法中定义

    @RabbitListener(queues = "simple.queue1")public void listenSimpleQueue(String msg) {System.out.println("消费者收到了simple.queue的消息:【" + msg + "】");}

7 然后将项目启动,再在测试类中发送消息,控制台会实时监控到发送的消息

8 队列当中的消息拿出来在控制台里面就没有消息了

2 WorkQueue

任务模型:简单来说就是让多个消费者绑定到一个队列,共同消费队列当中的消息。

一个队列多个消费者,可以缓解消息堆积问题。

1 配置项

2 不写的话(默认一人一半,处理不完在队列里等待)

3 新增一个队列

4 两个消费者(消费能力不同,消费能力相同应该是轮询消费)

    @RabbitListener(queues = "work.queue")public void listenWorkQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 work.queue的消息:【" + msg + "】");Thread.sleep(20);}@RabbitListener(queues = "work.queue")public void listenWorkQueue2(String msg) throws InterruptedException {System.err.println("消费者2 收到了 work.queue的消息...... :【" + msg + "】");Thread.sleep(200);}

5 生产者

    @Testvoid testWorkQueue() throws InterruptedException {String queueName = "work.queue";for (int i = 1; i <= 50; i++) {String msg = "hello, worker, message_" + i;rabbitTemplate.convertAndSend(queueName, msg);Thread.sleep(20);}}

6 测试

3 FanOut交换机

真正生产环境都会经过exchange来发送消息,而不是直接发送到队列,交换机的类型有以下三种

Fanout模式会将接受到的消息广播到跟其绑定的每一个队列,广播模式。

例子

1 先将队列声明好

2 再声明交换机同时与队列绑定

3 消费者

    @RabbitListener(queues = "fanout.queue1")public void listenFanoutQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 fanout.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "fanout.queue2")public void listenFanoutQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 fanout.queue2的消息:【" + msg + "】");}

4 生产者

    @Testvoid testSendFanout() {String exchangeName = "hmall.fanout";String msg = "hello, everyone!";rabbitTemplate.convertAndSend(exchangeName, null, msg);}

测试结果:

为什么第二个参数是 null

在你的代码中,第二个参数是 null,这是为了配合 Fanout 交换机 的特性。以下是关键点:

Fanout 交换机的特性

  • Fanout 交换机(也称为广播交换机)会将消息 无条件广播到所有绑定到该交换机的队列完全忽略路由键
  • 因此,在使用 Fanout 交换机时,路由键(routingKey)可以设为 null,因为交换机不会使用它来决定消息的路由规则。

4 Direct交换机

这种交换机可以实现与Fanout交换机相同的效果同时也可以实现定向的效果。

需求

1 创建队列与交换机

(交换机需要给routingKey值)

2 消费者

@RabbitListener(queues = "direct.queue1") // 直接监听名为 direct.queue1 的队列
public void listenDirectQueue1(String msg) {System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg + "】");
}@RabbitListener(queues = "direct.queue2") // 直接监听名为 direct.queue2 的队列
public void listenDirectQueue2(String msg) {System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg + "】");
}

3 生产者

    @Testvoid testSendDirect() {String exchangeName = "hmall.direct";String msg = "蓝色通知,警报解除,哥斯拉是放的气球";rabbitTemplate.convertAndSend(exchangeName, "blue", msg);}

测试:

发送的路由键接收队列触发的消费者
reddirect.queue1, direct.queue2消费者1 + 消费者2
bluedirect.queue1消费者1
yellowdirect.queue2消费者2

可以根据需求更改生产者的代码逻辑:

5 Topic交换机

Topic 交换机是 RabbitMQ 中基于模式匹配的路由机制,允许通过通配符(* 和 #)实现灵活的路由规则。

需求

实现:

声明队列和交换机

消费者

    @RabbitListener(queues = "topic.queue1")public void listenTopicQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 topic.queue1的消息:【" + msg + "】");}@RabbitListener(queues = "topic.queue2")public void listenTopicQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 topic.queue2的消息:【" + msg + "】");}

生产者

    @Testvoid testSendTopic() {String exchangeName = "hmall.topic";String msg = "今天天气挺不错,我的心情的挺好的";rabbitTemplate.convertAndSend(exchangeName, "china.weather", msg);}

测试:可以根据需求修改发送的RoutingKey

Direct交换机与Topic的差异

特性Direct 交换机Topic 交换机
路由键匹配方式精确匹配(完全一致)模式匹配(支持通配符 * 和 #
灵活性低(适合简单路由)高(适合复杂路由场景)
典型场景订单状态变更、任务分发日志分类、多维度消息分发

*6 声明队列交换机

为了改善在控制台创建队列交换机的笨重,可以使用相关接口

声明队列和交换机

实现:

1 在配置类当中声明

Fanout的

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;@Configuration
public class FanoutConfiguration {// fanoutExchange 定义交换机@Beanpublic FanoutExchange fanoutExchange(){// ExchangeBuilder.fanoutExchange("").build();return new FanoutExchange("hmall.fanout2");}//  queue 创建队列@Beanpublic Queue fanoutQueue3(){// QueueBuilder.durable("ff").build();//持久化return new Queue("fanout.queue3");}// 绑定队列和交换机@Beanpublic Binding fanoutBinding3(Queue fanoutQueue3, FanoutExchange fanoutExchange){return BindingBuilder.bind(fanoutQueue3).to(fanoutExchange);}// 创建队列@Beanpublic Queue fanoutQueue4(){return new Queue("fanout.queue4");}// 绑定队列和交换机@Beanpublic Binding fanoutBinding4(){return BindingBuilder.bind(fanoutQueue4()).to(fanoutExchange());}
}

Direct的

package com.itheima.consumer.config;import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;// @Configuration
public class DirectConfiguration {//   定义交换机@Beanpublic DirectExchange directExchange() {return new DirectExchange("hmall.direct");}//   创建队列@Beanpublic Queue directQueue1() {return new Queue("direct.queue1");}// 队列与交换机进行绑定@Beanpublic Binding directQueue1BindingRed(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("red");}//  队列与交换机进行绑定@Beanpublic Binding directQueue1BindingBlue(Queue directQueue1, DirectExchange directExchange) {return BindingBuilder.bind(directQueue1).to(directExchange).with("blue");}//   创建队列@Beanpublic Queue directQueue2() {return new Queue("direct.queue2");}//  队列与交换机进行绑定@Beanpublic Binding directQueue2BindingRed(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("red");}//  队列与交换机进行绑定@Beanpublic Binding directQueue2BindingBlue(Queue directQueue2, DirectExchange directExchange) {return BindingBuilder.bind(directQueue2).to(directExchange).with("yellow");}}

2 使用注解的方式指定

    @RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}))public void listenDirectQueue1(String msg) throws InterruptedException {System.out.println("消费者1 收到了 direct.queue1的消息:【" + msg + "】");}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2", durable = "true"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}))public void listenDirectQueue2(String msg) throws InterruptedException {System.out.println("消费者2 收到了 direct.queue2的消息:【" + msg + "】");}

通过使用 @RabbitListener 的 bindings + @QueueBinding 注解的方式,不需要手动创建队列、交换机或绑定关系

  1. 检查资源是否存在
    Spring 会通过 RabbitAdmin 组件向 RabbitMQ 服务器发起检查,确认队列、交换机是否已存在。

  2. 自动创建缺失的资源

    • 若队列 direct.queue1 或 direct.queue2 不存在,会根据 @Queue 注解的配置(如 namedurable自动创建队列

    • 若交换机 hmall.direct 不存在,会根据 @Exchange 注解的配置(如 nametype自动创建交换机

  3. 自动绑定队列到交换机
    根据 key 指定的路由键,将队列与交换机绑定(如 direct.queue1 绑定 red 和 blue 路由键)。

7 消息转换器

使用

1. SimpleMessageConverter(默认)

  • 行为

    • 支持 Stringbyte[]Serializable 对象。

    • 若消息是 Serializable 对象,使用 Java 原生序列化。

  • 问题

    • 强耦合:发送方和接收方必须有相同的类路径(否则反序列化失败)。

    • 安全性差:Java 原生序列化易受攻击(如反序列化漏洞)。

2. Jackson2JsonMessageConverter(推荐)

  • 行为

    • 将对象转换为 JSON 字符串,再转为 byte[]

    • 反序列化时,将 JSON 还原为对象(需指定目标类型)。

  • 优势

    • 跨语言兼容:JSON 是通用格式,非 Java 客户端也可解析。

    • 松耦合:不强制要求发送方和接收方的类路径一致。

    • 安全性高:避免 Java 原生序列化漏洞

1 依赖引入

        <!--Jackson--><dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId></dependency>

2 Bean的创建

    // 消息转换器@Beanpublic MessageConverter jacksonMessageConvertor(){return new Jackson2JsonMessageConverter();}

3 消费者

    @RabbitListener(queues = "object.queue")public void listenObject(Map<String, Object> msg) throws InterruptedException {System.out.println("消费者 收到了 object.queue的消息:【" + msg + "】");}

4 生产者

    @Testvoid testSendObject() {Map<String, Object> msg = new HashMap<>(2);msg.put("name", "jack");msg.put("age", 21);rabbitTemplate.convertAndSend("object.queue", msg);}

5 在实际业务当中的使用

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/81303.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【深度学习】多目标融合算法(六):渐进式分层提取模型PLE(Progressive Layered Extraction)

目录 一、引言 二、PLE&#xff08;Progressive Layered Extraction&#xff0c;渐进式分层提取模型&#xff09; 2.1 技术原理 2.2 技术优缺点 2.3 业务代码实践 2.3.1 业务场景与建模 2.3.2 模型代码实现 2.3.3 模型训练与推理测试 2.3.4 打印模型结构 三、总结 一…

【Java开发日记】如何使用Java开发在线生成 pdf 文档

一、介绍 在实际的业务开发的时候&#xff0c;研发人员往往会碰到很多这样的一些场景&#xff0c;需要提供相关的电子凭证信息给用户&#xff0c;例如网银&#xff0f;支付宝&#xff0f;微信购物支付的电子发票、订单的库存打印单、各种电子签署合同等等&#xff0c;以方便用…

Oracle 11g 单实例使用+asm修改主机名导致ORA-29701 故障分析

解决 把服务器名修改为原来的&#xff0c;重启服务器。 故障 建表空间失败。 分析 查看告警日志 ORA-1119 signalled during: create tablespace splex datafile ‘DATA’ size 2000M… Tue May 20 18:04:28 2025 create tablespace splex datafile ‘DATA/option/dataf…

消息队列的使用

使用内存队列来处理基于内存的【生产者-消费者】场景 思考和使用Disruptor Disruptor可以实现单个或多个生产者生产消息&#xff0c;单个或多个消费者消息&#xff0c;且消费者之间可以存在消费消息的依赖关系 使用Disruptor需要结合业务特性&#xff0c;设计要灵活 什么业务…

《帝国时代1》游戏秘籍

资源类 PEPPERONI PIZZA&#xff1a;获得 1000 食物。COINAGE&#xff1a;获得 1000 金。WOODSTOCK&#xff1a;获得 1000 木头。QUARRY&#xff1a;获得 1000 石头。 建筑与生产类 STEROIDS&#xff1a;快速建筑。 地图类 REVEAL MAP&#xff1a;显示所有地图。NO FOG&#xf…

使用JSP踩过的坑

虽然说jsp已经过时了&#xff0c;但是有时维护比较老的项目还是需要的。 下面说下&#xff0c;我使用jsp踩过的坑&#xff1a; 1.关于打印输出 在jsp中输出使用 out.println("hello");而不是 System.out.println("hello");如果在定义函数部分需要打印…

redis集群创建时手动指定主从关系的方法

适用场景&#xff1a; 创建主从关系时默认参数 --cluster-replicas 1 会自动分配从节点。 为了能精确控制 Redis Cluster 的主从拓扑结构&#xff0c;我们通过 Redis Cluster 的手动分片功能来实现 一、手动指定主从关系的方法 使用 redis-cli --cluster-replicas 0 先创建纯…

ROS合集(七)SVIn2声呐模块分析

文章目录 一、整体思想二、具体误差建模流程三、总结明确&#xff08;预测值与观测值&#xff09;四、选点逻辑五、Sonar 数据处理流水线1. ROS Launch 配置&#xff08;imagenex831l.launch&#xff09;2. SonarNode 节点&#xff08;sonar_node.py&#xff09;3. Subscriber …

Python爬虫实战:研究PySpider框架相关技术

1. 引言 1.1 研究背景与意义 网络爬虫作为互联网数据采集的重要工具,在信息检索、舆情分析、市场调研等领域发挥着重要作用。随着互联网信息的爆炸式增长,如何高效、稳定地获取所需数据成为了一个关键挑战。PySpider 作为一款功能强大的 Python 爬虫框架,提供了丰富的功能…

《大模型开源与闭源的深度博弈:科技新生态下的权衡与抉择》

开源智能体大模型的核心魅力&#xff0c;在于它构建起了一个全球开发者共同参与的超级协作网络。想象一下&#xff0c;来自世界各个角落的开发者、研究者&#xff0c;无论身处繁华都市还是偏远小镇&#xff0c;只要心怀对技术的热爱与追求&#xff0c;就能加入到这场技术狂欢中…

大数据模型对陌生场景图像的识别能力研究 —— 以 DEEPSEEK 私有化部署模型为例

摘要 本研究聚焦于已训练的大数据模型能否识别未包含在样本数据集中的陌生场景图像这一问题&#xff0c;以 DEEPSEEK 私有化部署模型为研究对象&#xff0c;结合机器学习理论&#xff0c;分析模型识别陌生场景图像的影响因素&#xff0c;并通过理论探讨与实际应用场景分析&…

STM32——从点灯到传感器控制

STM32基础外设开发&#xff1a;从点灯到传感器控制 一、前言 本篇文章总结STM32F10x系列基础外设开发实例&#xff0c;涵盖GPIO控制、按键检测、传感器应用等。所有代码基于标准库开发&#xff0c;适合STM32初学者参考。 二、硬件准备 STM32F10x系列开发板LED模块有源蜂鸣器…

[特殊字符] 使用增量同步+MQ机制将用户数据同步到Elasticsearch

在开发用户搜索功能时&#xff0c;我们通常会将用户信息存储到 Elasticsearch&#xff08;简称 ES&#xff09; 中&#xff0c;以提高搜索效率。本篇文章将详细介绍我们是如何实现 MySQL 到 Elasticsearch 的增量同步&#xff0c;以及如何通过 MQ 消息队列实现用户信息实时更新…

MyBatis缓存机制全解析

在MyBatis中&#xff0c;缓存分为一级缓存和二级缓存&#xff0c;它们的主要目的是减少数据库的访问次数&#xff0c;提高查询效率。下面简述这两种缓存的工作原理&#xff1a; 一、 一级缓存&#xff08;SqlSession级别的缓存&#xff09; 一级缓存是MyBatis默认开启的缓存机…

【短距离通信】【WiFi】WiFi7关键技术之4096-QAM、MRU

目录 3. 4096-QAM 3.1 4096-QAM 3.2 QAM 的阶数越高越好吗&#xff1f; 4. MRU 4.1 OFDMA 和 RU 4.2 MRU 资源分配 3. 4096-QAM 摘要 本章主要介绍了Wi-Fi 7引入的4096-QAM对数据传输速率的提升。 3.1 4096-QAM 对速率的提升 Wi-Fi 标准一直致力于提升数据传输速率&a…

【二刷力扣】【力扣热题100】今天的题目是:283.移动零

题目&#xff1a; 给定一个数组 nums&#xff0c;编写一个函数将所有 0 移动到数组的末尾&#xff0c;同时保持非零元素的相对顺序。 请注意 &#xff0c;必须在不复制数组的情况下原地对数组进行操作。 示例 1: 输入: nums [0,1,0,3,12] 输出: [1,3,12,0,0] 示例 2: 输…

机器学习中的多GPU训练模式

文章目录 一、数据并行&#xff08;Data Parallelism&#xff09;二、模型并行&#xff08;Model Parallelism&#xff09;1. 模型并行2. 张量并行&#xff08;Tensor Parallelism&#xff09; 三、流水线并行&#xff08;Pipeline Parallelism&#xff09;四、混合并行&#x…

《JavaScript 性能优化:从原理到实战的全面指南》

《JavaScript 性能优化&#xff1a;从原理到实战的全面指南》 一、JavaScript 性能优化基础理论 在深入探讨 JavaScript 性能优化技术之前&#xff0c;我们需要明白JavaScript 的执行机制和性能瓶颈产生的根本原因。JavaScript 是一种单线程、非阻塞的脚本语言&#xff0c;其…

选择合适的Azure数据库监控工具

Azure云为组织提供了众多服务&#xff0c;使其能够无缝运行应用程序、Web服务和服务器部署&#xff0c;其中包括云端数据库部署。Azure数据库能够与云应用程序实现无缝集成&#xff0c;具备可靠、易扩展和易管理的特性&#xff0c;不仅能提升数据库可用性与性能&#xff0c;同时…

9.4在 VS Code 中配置 Maven

在 VS Code 中配置 Maven 需要完成 Maven 环境安装 一、安装 Maven&#xff08;如果未安装&#xff09; 下载 Maven 访问 Apache Maven 官网&#xff0c;下载最新版本的 Maven&#xff08;如apache-maven-3.9.9-bin.zip&#xff09;。 解压文件 将下载的 ZIP 文件解压到本地目…