《消息队列学习指南:从 MQ 基础到 SpringAMQP 实践》

初识MQ

同步调用

目前我们采用的是基于OpenFeign的同步调用,也就是说业务执行流程是这样的:

  • 支付服务需要先调用用户服务完成余额扣减

  • 然后支付服务自己要更新支付流水单的状态

  • 然后支付服务调用交易服务,更新业务订单状态为已支付

三个步骤依次执行。

这其中就存在3个问题:

第一拓展性差

但是随着业务规模扩大,产品的功能也在不断完善,最终支付业务会越来越臃肿。

第二性能下降 

采用了同步调用,调用者需要等待服务提供者执行完返回结果后,才能继续向下执行,也就是说每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和:

第三,级联失败 

由于我们是基于OpenFeign调用交易服务、通知服务。当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

这其实就是同步调用的级联失败问题。

异步调用

异步调用方式其实就是基于消息通知的方式,一般包含三个角色:

  • 消息发送者:投递消息的人,就是原来的调用方

  • 消息Broker:管理、暂存、转发消息,你可以把它理解成微信服务器

  • 消息接收者:接收和处理消息的人,就是原来的服务提供方

在异步调用中,发送者不再直接同步调用接收者的业务接口,而是发送一条消息投递给消息Broker。然后接收者根据自己的需求从消息Broker那里订阅消息。每当发送方发送消息后,接受者都能获取消息并处理。

这样,发送消息的人和接收消息的人就完全解耦了。

异步调用的优势包括:

  • 耦合度更低

  • 性能更好

  • 业务拓展性强

  • 故障隔离,避免级联失败

当然,异步通信也并非完美无缺,它存在下列缺点:

  • 完全依赖于Broker的可靠性、安全性和性能

  • 架构复杂,后期维护和调试麻烦

常见的消息队列(MessageQueue)

目比较常见的MQ实现:

  • ActiveMQ

  • RabbitMQ

  • RocketMQ

  • Kafka

    RabbitMQActiveMQRocketMQKafka
    公司/社区RabbitApache阿里Apache
    开发语言ErlangJavaJavaScala&Java
    协议支持AMQP,XMPP,SMTP,STOMPOpenWire,STOMP,REST,XMPP,AMQP自定义协议自定义协议
    可用性一般
    单机吞吐量一般非常高
    消息延迟微秒级毫秒级毫秒级毫秒以内
    消息可靠性一般一般

追求可用性:Kafka、 RocketMQ 、RabbitMQ

追求可靠性:RabbitMQ、RocketMQ

追求吞吐能力:RocketMQ、Kafka

追求消息低延迟:RabbitMQ、Kafka

RabbitMQ

RabbitMQ是基于Erlang语言开发的开源消息通信中间件,官网地址:

RabbitMQ: One broker to queue them all | RabbitMQ

RabbitMQ对应的架构如图:

其中包含几个概念:

  • publisher:生产者,也就是发送消息的一方

  • consumer:消费者,也就是消费消息的一方

  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理

  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。

  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue

交换机

首先展示交换机项下的创建交换机:

我们点击任意交换机,即可进入交换机详情页面。仍然会利用控制台中的publish message 发送一条消息:

队列

我们打开Queues选项卡,新建一个队列:

数据隔离

用户管理

点击Admin选项卡,首先会看到RabbitMQ控制台的用户管理界面:

virtual host

SpringAMQP

   RabbitMQ采用了AMQP协议,因此它具备跨语言的特性。任何语言只要遵循AMQP协议收发消息,都可以与RabbitMQ交互。并且RabbitMQ官方也提供了各种不同语言的客户端。

SpringAmqp的官方地址:

Spring AMQP

SpringAMQP提供了三个功能:

  • 自动声明队列、交换机及其绑定关系

  • 基于注解的监听器模式,异步接收消息

  • 封装了RabbitTemplate工具,用于发送消息

快速入门

  • publisher直接发送消息到队列

  • 消费者监听并处理队列中的消息

导入依赖
<!--AMQP依赖,包含RabbitMQ--><dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId></dependency>

工程中就可以直接使用SpringAMQP了。

消息发送

首先配置MQ地址,在publisher服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.100.128 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在publisher服务中编写测试类SpringAmqpTest,并利用RabbitTemplate实现消息发送:

消息接收

首先配置MQ地址,在consumer服务的application.yml中添加配置:

spring:rabbitmq:host: 192.168.150.101 # 你的虚拟机IPport: 5672 # 端口virtual-host: /hmall # 虚拟主机username: hmall # 用户名password: 123 # 密码

然后在consumer服务的com.itheima.consumer.listener包中新建一个类SpringRabbitListener,代码如下:

测试

启动consumer服务,然后在publisher服务中运行测试代码,发送MQ消息。最终consumer收到消息:

WorkQueues模型

Work queues,任务模型。简单来说就是多个消费者绑定到一个队列,共同消费队列中的消息

当消息处理比较耗时的时候,可能生产消息的速度会远远大于消息的消费速度。长此以往,消息就会堆积越来越多,无法及时处理。

此时就可以使用work 模型,

多个消费者共同处理消息处理,消息处理的速度就能大大提高了。

但消息是平均分配给每个消费者,并没有考虑到消费者的处理能力。

导致1个消费者空闲,另一个消费者忙的不可开交。没有充分利用每一个消费者的能力

能者多劳

在spring中有一个简单的配置,可以解决这个问题。我们修改consumer服务的application.yml文件,添加配置:

spring:rabbitmq:listener:simple:prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

 这样充分利用了每一个消费者的处理能力,可以有效避免消息积压问题。

总结

Work模型的使用:

  • 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理

  • 通过设置prefetch来控制消费者预取的消息数量

交换机类型

  • Publisher:生产者,不再发送消息到队列中,而是发给交换机

  • Exchange:交换机,一方面,接收生产者发送的消息。另一方面,知道如何处理消息,例如递交给某个特别队列、递交给所有队列、或是将消息丢弃。到底如何操作,取决于Exchange的类型。

  • Queue:消息队列也与以前一样,接收消息、缓存消息。不过队列一定要与交换机绑定。

  • Consumer:消费者,与以前一样,订阅队列,没有变化

Exchange(交换机只负责转发消息,不具备存储消息的能力,因此如果没有任何队列与Exchange绑定,或者没有符合路由规则的队列,那么消息会丢失!

交换机的类型有四种:

  • Fanout:广播,将消息交给所有绑定到交换机的队列。我们最早在控制台使用的正是Fanout交换机

  • Direct:订阅,基于RoutingKey(路由key)发送给订阅了消息的队列

  • Topic:通配符订阅,与Direct类似,只不过RoutingKey可以使用通配符

  • Headers:头匹配,基于MQ的消息头匹配,用的较少。

Fanout交换机

Fanout,英文翻译是扇出,我们学过的广播,发出消息任何绑定的队列都可以收到。

  • 1) 可以有多个队列

  • 2) 每个队列都要绑定到Exchange(交换机)

  • 3) 生产者发送的消息,只能发送到交换机

  • 4) 交换机把消息发送给绑定过的所有队列

  • 5) 订阅队列的消费者都能拿到消息

消息发送

在有交换机参与时,发送方调用的参数时要注意参数类型

rabbitTemplate.convertAndSend( exchangeName,  "",  message );

第一个参数:交换机名称

第二个参数:交换机与队列绑定的RoutingKey值

第三个参数:消息对象

总结

交换机的作用:

  • 接收publisher发送的消息

  • 将消息按照规则路由到与之绑定的队列

  • 不能缓存消息,路由失败,消息丢失

  • FanoutExchange的会将消息路由到每个绑定的队列

Direct交换机

在Fanout模式中,一条消息,会被所有订阅的队列都消费。但是,在某些场景下,我们希望不同的消息被不同的队列消费。这时就要用到Direct类型的Exchange。

在Direct模型下:

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)

  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey

  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

总结

描述下Direct交换机与Fanout交换机的差异

  • Fanout交换机将消息路由给每一个与之绑定的队列

  • Direct交换机根据RoutingKey判断路由给哪个队列

  • 如果多个队列具有相同的RoutingKey,则与Fanout功能类似

Topic交换机

说明

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符!

BindingKey 一般都是有一个或多个单词组成,多个单词之间以.分割,例如: item.insert

通配符规则:

  • #:匹配一个或多个词

  • *:匹配不多不少恰好1个词

总结

描述下Direct交换机与Topic交换机的差异?

  • Topic交换机接收的消息RoutingKey必须是多个单词,以 . 分割

  • Topic交换机与队列绑定时的bindingKey可以指定通配符

  • #:代表0个或多个词

  • *:代表1个词

声明队列和交换机

         通过编写代码的方式来声明创建交换机和队列

         程序启动时检查队列和交换机是否存在,如果不存在自动创建。

基本API

SpringAMQP提供了一个Queue类,用来创建队列:

SpringAMQP还提供了一个Exchange接口,来表示所有不同类型的交换机:

我们可以自己创建队列和交换机,不过SpringAMQP还提供了ExchangeBuilder来简化这个过程:

而在绑定队列和交换机时,则需要使用BindingBuilder来创建Binding对象:

示例:

基于注解声明

基于@Bean的方式声明队列和交换机比较麻烦,Spring还提供了基于注解方式来声明。

@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue1"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "direct.queue2"),exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

消息转换器

Spring的消息发送代码接收的消息体是一个Object:

而在数据传输时,它会把你发送的消息序列化为字节发送给MQ,接收消息的时候,还会把字节反序列化为Java对象。

只不过,默认情况下Spring采用的序列化方式是JDK序列化。众所周知,JDK序列化存在下列问题:

  • 数据体积过大

  • 有安全漏洞

  • 可读性差

配置JSON转换器

显然,JDK序列化方式并不合适。我们希望消息体的体积更小、可读性更高,因此可以使用JSON方式来做序列化和反序列化。

publisherconsumer两个服务中都引入依赖:

<dependency><groupId>com.fasterxml.jackson.dataformat</groupId><artifactId>jackson-dataformat-xml</artifactId><version>2.9.10</version>
</dependency>

注意,如果项目中引入了spring-boot-starter-web依赖,则无需再次引入Jackson依赖。

配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){// 1.定义消息转换器Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息jackson2JsonMessageConverter.setCreateMessageIds(true);return jackson2JsonMessageConverter;
}

注意: publisher用什么类型的消息传递,接收者也要用什么类型来接收

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/916885.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/916885.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

深度学习 --- 过拟合与欠拟合

深度学习 — 过拟合与欠拟合 文章目录深度学习 --- 过拟合与欠拟合一.概念1.1 过拟合1.2 欠拟合1.3 判断方式二&#xff0c;解决欠拟合三&#xff0c;解决过拟合3.1 L2正则化3.1.1 定义以及作用3.1.2 代码3.2 L1正则化3.3 L1与L2对比3.4 Dropout示例3.5 数据增强3.5.1 图片缩放…

Python 之抽象方法 @abstractmethod 的理解

如果你熟悉 Java 的话&#xff0c;Java 里有一个抽象接口的概念&#xff0c;Python 里的抽象方法基本上与其类似。在 Python 中&#xff0c;abstractmethod 是一个装饰器&#xff0c;用于定义抽象方法。它是实现抽象基类&#xff08;Abstract Base Class, ABC&#xff09;的核心…

深度学习·pytorch

广播机制 从末尾开始逐个维度遍历两个矩阵的shape&#xff0c;如果维度不相同&#xff0c;则考虑广播&#xff1a;任一方的维度为1或者维度不存在(小矩阵广播为大矩阵)&#xff0c;这样的运算可以广播 可以广播的例子 xtorch.empty(5,3,4,1) ytorch.empty(3,1,1) (x.add_(y)).s…

SpringBoot集成deepseek

pom文件&#xff1a;<?xml version"1.0" encoding"UTF-8"?> <project xmlns"http://maven.apache.org/POM/4.0.0" xmlns:xsi"http://www.w3.org/2001/XMLSchema-instance"xsi:schemaLocation"http://maven.apache.org…

JetBrains Annotations:从入门到落地,彻底告别 NullPointerException

本文基于三篇高质量博客&#xff08;JetBrains Annotations官方文档、Jakarta Validation 规范、《Effective Java》第3版&#xff09;的原文内容&#xff0c;结合作者在一线研发团队落地 JetBrains Annotations 的实战经验&#xff0c;系统梳理了该注解库的核心能力、使用姿势…

基于Rust与HDFS、YARN、Hue、ZooKeeper、MySQL

基于Rust与HDFS、YARN、Hue、ZooKeeper、MySQL集合 以下是基于Rust与HDFS、YARN、Hue、ZooKeeper、MySQL等技术栈结合的实例,涵盖不同场景和应用方向: 数据处理与分析 使用Rust编写MapReduce作业,通过YARN提交到HDFS处理大规模数据集。Rust的高性能特性适合处理密集型计算…

芯片上市公司正在放弃射频业务

转载自--钟林谈芯射频芯片赛道本来不卷的&#xff0c;投资人多了也就卷了。本周&#xff0c;多家媒体报道某芯片上市公司终止射频业务&#xff0c;终止射频业务的何止一家芯片上市公司&#xff0c;从去年开始就逐渐有上市公司终止射频业务&#xff0c;开启清货模式。如人饮水&a…

Jmeter 性能测试监控之ServerAgent

使用 Jmeter 对 Linux 服务器的进行压测时&#xff0c;想要监控服务器的 CPU 、内存&#xff0c;可以通过添加插件 【ServerAgent】来观察,可以实时监控性能指标 一、ServerAgent-2.2.3下载 下载地址&#xff1a; GitCode - 全球开发者的开源社区,开源代码托管平台 二、通过插…

5.苹果ios逆向-过ssl证书检测和安装ssh和获取root权限

免责声明&#xff1a;内容仅供学习参考&#xff0c;请合法利用知识&#xff0c;禁止进行违法犯罪活动&#xff01; 内容参考于&#xff1a;图灵Python学院 工具下载&#xff1a; 链接&#xff1a;https://pan.baidu.com/s/1bb8NhJc9eTuLzQr39lF55Q?pwdzy89 提取码&#xff1…

Navicat 17 教程:Windows 和 Mac 系统适用

一、引言 对于程序员们来说&#xff0c;Navicat是一款极为实用的数据库管理工具。Navicat 17更是带来了诸多新特性&#xff0c;能大大提升我们的工作效率。今天就为大家带来Navicat 17在Windows和Mac系统上的使用教程。 二、准备工作 &#xff08;一&#xff09;下载安装包 「…

Android 中 实现柱状图自定义控件

一、基本思路 创建自定义控件的数据模型; 创建一个自定义 View 类,继承自 View; 在初始化方法中获取自定义属性的值。 创建设置数据方法,将数据模型列表转换成自定义绘制时的数据; 重写 onDraw 方法,以实现自定义的绘制逻辑。 二、主要绘制方法 1、drawLine 绘制直线 p…

Netty 核心原理与实战:从 DiscardServer 看透 Reactor 模式与组件协作

目录 Netty 是什么&#xff1f; Netty 的目标 Netty 实战案例 DiscardServer 服务端程序 NettyDiscardServer 业务处理器 NettyDiscardHandler 配置类 NettyDemoConfig 回顾 Reactor 模式中的 IO 事件处理流程 Netty 中的 Channel Netty 中的 Reactor Netty 中的 Han…

关于“LoggerFactory is not a Logback LoggerContext but Logback is on ......“的解决方案

​ ✨重磅&#xff01;盹猫的个人小站正式上线啦&#xff5e;诚邀各位技术大佬前来探秘&#xff01;✨ 这里有&#xff1a; 硬核技术干货&#xff1a;编程技巧、开发经验、踩坑指南&#xff0c;带你解锁技术新姿势&#xff01;趣味开发日常&#xff1a;代码背后的脑洞故事、工具…

2025年6月电子学会青少年软件编程(C语言)等级考试试卷(三级)

答案和更多内容请查看网站&#xff1a;【试卷中心 -----> 电子学会 ----> C/C ---->三级】 网站链接 青少年软件编程历年真题模拟题实时更新 编程题 第 1 题 打印城门 题目描述 给定一个正整数 n&#xff0c;输出如下的星号城门。具体格式请见样例。 输入格…

跨平台直播美颜SDK开发指南:兼顾性能与美型效果的最佳实践

面对iOS、Android乃至Web等多端应用需求&#xff0c;如何开发一款真正跨平台、兼顾性能与美型效果的美颜SDK&#xff0c;成为众多开发团队和产品经理的一道必答题。 今天笔者这篇文章&#xff0c;就从架构设计、性能优化、视觉效果调校三个关键维度&#xff0c;带你深入解析跨平…

2025数字藏品安全保卫战:高防CDN如何成为NFT应用的“隐形护甲”?

副标题&#xff1a; 从DDoS防御到全球加速&#xff0c;拆解数字资产平台的生死防线&#x1f310; 引言&#xff1a;当数字藏品成为黑客的“头号靶场”2025年全球数字藏品市场突破$1000亿&#xff0c;但安全事件同步激增230%——某头部NFT平台因3.2Tbps DDoS攻击瘫痪&#xff0c…

linux 执行sh脚本,提示$‘\r‘: command not found

1、在Linux下执行某个脚本文件却提示$\r: command not found&#xff0c;如下图:2、错误原因:a、 Windows 风格的换行符&#xff1a;Windows 系统使用 \r\n 作为行结束符&#xff0c;而 Linux 和 Unix 系统使用 \n。当你从 Windows 环境中复制文本到 Linux 环境时&#xff0c;可…

使用HaiSnap做了一款取件码App(一键生成)

你是否怀揣着奇思妙想&#xff0c;却因不懂代码而对开发应用望而却步&#xff1f;现在&#xff0c;有一个神奇AI Agent&#xff08;响指HaiSnap&#xff09;&#xff0c;一个响指就能实现&#xff0c;你说神奇不&#xff1f;只需要一句话就可以生成你想要的应用&#xff01;让你…

容器与虚拟机的本质差异:从资源隔离到网络存储机制

目录 专栏介绍 作者与平台 您将学到什么&#xff1f; 学习特色 容器与虚拟机的本质差异&#xff1a;从资源隔离到网络存储机制 一、容器与虚拟机的本质区别 1.1 资源抽象层次差异 1.2 资源消耗与性能对比 1.3 隔离性深度差异 二、容器网络基础架构 2.1 Docker网络模型…

ros2 launch文件编写详解

一个完整的简单的launch文件配置过程1.编写launch文件2.配置package.xml3.配置setup.py&#xff08;python包&#xff09;4.配置CMakeList(C包)5.编译运行# 在 ROS 2 的 Python 启动文件中&#xff0c;这些导入语句用于引入各类启动模块&#xff0c;以构建和配置节点启动流程 f…