消息队列 2.RabbitMQ的基本概念与使用

RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol)协议的开源消息中间件,主要用于实现分布式系统中的消息传递,支持异步通信、系统解耦、流量削峰等场景。在 Java 生态中,RabbitMQ 被广泛应用,其 Java 客户端提供了简洁的 API,方便开发者快速集成。

AMQP 协议

核心概念

1. 消息模型
AMQP 采用生产者 - 消费者模型,但引入了更复杂的路由机制:
  • 生产者(Producer):发送消息的应用
  • 消费者(Consumer):接收消息的应用
  • 消息中间件(Broker):负责接收、存储和转发消息
2. 核心组件

AMQP(Advanced Message Queuing Protocol)是一种开放标准的应用层协议,专为消息队列设计。它定义了客户端与消息中间件之间的通信规范,确保不同厂商的实现可以互操作。

+----------+    +---------+    +----------+
| Producer | -> | Exchange| -> | Queue    | -> Consumer
+----------+    +---------+    +----------+|v+---------+| Binding |+---------+
  • Exchange(交换器)

接收生产者的消息

根据规则(Binding)将消息路由到队列

类型包括:Direct、Topic、Fanout、Headers

  • Queue(队列)

存储消息直到被消费

支持多个消费者竞争消费

消息可持久化存储

  • Binding(绑定)

定义 Exchange 与 Queue 之间的关联

通过 Binding Key(绑定键)和 Routing Key(路由键)匹配

工作流程

1.生产者发送消息

        指定消息的 Routing Key

        将消息发送到特定的 Exchange

2.Exchange 路由逻辑

        Direct Exchange:按 Routing Key 精确匹配

        Topic Exchange:按 Routing Key 的模式匹配(支持*#通配符)

        Fanout Exchange:将消息广播到所有绑定的队列

        Headers Exchange:按消息头部属性匹配

3.消费者接收消息

        从队列中拉取或订阅消息

        处理完成后发送确认(ACK)

RabbitMQ 核心概念

在使用 RabbitMQ 前,需先理解其核心组件和消息流转逻辑:

组件

作用

生产者(Producer)

消息的发送方,负责创建并发送消息到 RabbitMQ 服务器。

消费者(Consumer)

消息的接收方,监听队列并处理接收到的消息。

队列(Queue)

消息的存储容器,位于 RabbitMQ 服务器中,消息最终会被投递到队列中等待消费。

交换机(Exchange)

接收生产者发送的消息,并根据绑定规则(Binding)将消息路由到对应的队列。

绑定(Binding)

定义交换机与队列之间的关联关系,包含路由键(Routing Key)和匹配规则。

路由键(Routing Key)

生产者发送消息时指定的键,交换机根据该键和绑定规则路由消息。

RabbitMQ 消息流转流程

消息从生产者到消费者的完整路径为:
生产者 → 交换机(根据 Routing Key 和绑定规则)→ 队列 → 消费者
  • 生产者发送消息时,需指定交换机名称路由键
  • 交换机根据自身类型(如 Direct、Topic 等)和绑定规则,将消息转发到匹配的队列;
  • 消费者监听队列,获取并处理消息。

Java 操作 RabbitMQ 基础示例

1. 连接 RabbitMQ 服务器

所有操作的前提是建立与 RabbitMQ 的连接,需指定服务器地址、端口、账号密码(默认账号guest仅允许本地连接,远程连接需配置新用户)。

import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;public class RabbitMQConnection {// RabbitMQ连接配置private static final String HOST = "localhost"; // 服务器地址private static final int PORT = 5672; // 默认端口private static final String USERNAME = "guest";private static final String PASSWORD = "guest";// 获取连接public static Connection getConnection() throws Exception {ConnectionFactory factory = new ConnectionFactory();factory.setHost(HOST);factory.setPort(PORT);factory.setUsername(USERNAME);factory.setPassword(PASSWORD);return factory.newConnection();}
}

2. 生产者发送消息

生产者需完成以下步骤:
  1. 创建连接和通道(Channel);
  2. 声明交换机(可选,若使用默认交换机则无需声明);
  3. 声明队列(指定队列名称、是否持久化等);
  4. 绑定交换机与队列(若使用自定义交换机);
  5. 发送消息(指定交换机、路由键、消息内容)。
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;public class Producer {// 队列名称(需与消费者一致)private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 获取连接Connection connection = RabbitMQConnection.getConnection();// 2. 创建通道(RabbitMQ的操作大多通过通道完成)Channel channel = connection.createChannel();// 3. 声明队列(参数:队列名、是否持久化、是否排他、是否自动删除、附加参数)channel.queueDeclare(QUEUE_NAME, false, false, false, null);// 4. 消息内容String message = "Hello, RabbitMQ from Java!";// 5. 发送消息(参数:交换机名、路由键、消息属性、消息字节数组)// 此处使用默认交换机(""),路由键需与队列名一致channel.basicPublish("", QUEUE_NAME, null, message.getBytes());System.out.println("生产者发送消息:" + message);// 6. 关闭资源channel.close();connection.close();}
}

3. 消费者接收消息

消费者需持续监听队列,步骤如下:
  1. 创建连接和通道;
  2. 声明队列(需与生产者队列名一致);
  3. 定义消息处理逻辑(通过DefaultConsumer回调);
  4. 开启消费(指定队列、是否自动确认消息)。
import com.rabbitmq.client.*;
import java.io.IOException;public class Consumer {private static final String QUEUE_NAME = "java_rabbitmq_queue";public static void main(String[] args) throws Exception {// 1. 获取连接Connection connection = RabbitMQConnection.getConnection();// 2. 创建通道Channel channel = connection.createChannel();// 3. 声明队列(需与生产者一致,重复声明不会报错)channel.queueDeclare(QUEUE_NAME, false, false, false, null);System.out.println("消费者已启动,等待接收消息...");// 4. 定义消息处理回调DeliverCallback deliverCallback = (consumerTag, delivery) -> {String message = new String(delivery.getBody(), "UTF-8");System.out.println("消费者接收消息:" + message);};// 5. 开启消费(参数:队列名、是否自动确认、消息接收回调、取消消费回调)// 自动确认(autoAck=true):消息被接收后自动从队列删除;false则需手动确认channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> {});}
}

Spring AMQP简化 RabbitMQ

在 Spring Boot 项目中,可通过Spring AMQP简化 RabbitMQ 的使用,其封装了底层 API,提供注解驱动开发:

1.引入依赖:

<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

2.配置application.yml

spring:rabbitmq:host: localhostport: 5673username: guestpassword: guest

3. 生产者(使用RabbitTemplate):

@Autowired
private RabbitTemplate rabbitTemplate;public void sendMessage(String message) {rabbitTemplate.convertAndSend("queue_name", message);
}

4.消费者(使用@RabbitListener注解):

@RabbitListener(queues = "queue_name")
public void receiveMessage(String message) {System.out.println("接收消息:" + message);
}

交换机类型及 Java 实现

RabbitMQ 的交换机负责消息路由,不同类型的交换机路由规则不同,需根据场景选择:
1. Direct 交换机(精确匹配)
  • 路由规则:消息的路由键与绑定的路由键完全一致时,消息被路由到对应队列。
  • 适用场景:一对一通信(如订单通知)。
// 生产者声明Direct交换机并绑定队列
String EXCHANGE_NAME = "direct_exchange";
String ROUTING_KEY = "order.notify";
// 声明Direct交换机(参数:交换机名、类型、是否持久化)
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false);
// 绑定交换机与队列(参数:队列名、交换机名、路由键)
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTING_KEY);
// 发送消息(指定交换机和路由键)
channel.basicPublish(EXCHANGE_NAME, ROUTING_KEY, null, message.getBytes());
2. Topic 交换机(模糊匹配)
  • 路由规则:路由键支持通配符(*匹配一个单词,#匹配多个单词,单词以.分隔)。
  • 适用场景:多规则匹配(如日志分类:log.errorlog.warn)。
    // 生产者声明Topic交换机
    String EXCHANGE_NAME = "topic_exchange";
    // 路由键为"log.error"(匹配绑定键"log.*"或"log.#")
    String ROUTING_KEY = "log.error";
    channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC, false);
    // 绑定队列到交换机,绑定键为"log.#"(匹配所有以log.开头的路由键)
    channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "log.#");
3. Fanout 交换机(广播)
  • 路由规则:忽略路由键,将消息路由到所有绑定的队列。
  • 适用场景:一对多通信(如广播通知)。
// 生产者声明Fanout交换机
String EXCHANGE_NAME = "fanout_exchange";
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.FANOUT, false);
// 绑定多个队列到交换机(无需指定路由键)
channel.queueBind(QUEUE1, EXCHANGE_NAME, "");
channel.queueBind(QUEUE2, EXCHANGE_NAME, "");
// 发送消息(路由键无效,可设为空)
channel.basicPublish(EXCHANGE_NAME, "", null, message.getBytes());

RabbitMQ 应用场景

  • 异步通信:如用户注册后异步发送邮件 / 短信通知;
  • 系统解耦:订单系统与库存系统通过消息通信,避免直接依赖;
  • 流量削峰:秒杀场景中,通过队列缓冲请求,避免服务器过载;
  • 日志收集:多服务日志通过 Fanout 交换机广播到日志处理服务。

总结

RabbitMQ 凭借其灵活的路由机制、可靠的消息传递和丰富的特性,成为 Java 分布式系统中消息中间件的首选之一。通过 Java 客户端或 Spring AMQP,开发者可快速实现消息的生产、消费及高级功能,有效提升系统的可扩展性和稳定性。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。
如若转载,请注明出处:http://www.pswp.cn/news/915332.shtml
繁体地址,请注明出处:http://hk.pswp.cn/news/915332.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

【web安全】SQL注入与认证绕过

目录 一、SQL注入漏洞 1.1 基础注入原理 1.2 实用注入Payload分类 逻辑绕过型 注释截断型 联合查询型 常见的万能密码-CSDN博客 二、登录绕过实战技巧 2.1 基础绕过手法 2.2 高级绕过技巧 编码绕过 多重注释 参数污染 三、密码重置漏洞利用 3.1 常见漏洞模式 3…

Python适配器模式详解:让不兼容的接口协同工作

一、模式定义与核心思想 适配器模式&#xff08;Adapter Pattern&#xff09; 是一种结构型设计模式&#xff0c;它通过创建一个中间层&#xff08;适配器&#xff09;&#xff0c;将不兼容的接口转换为客户端期望的接口。就像现实中的电源适配器&#xff0c;让不同国家的插头…

微信小程序列表数据上拉加载,下拉刷新

1.上拉加载数据&#xff0c;数据 下一页数据 前面的数据&#xff08;[...this.data.list, ...data.records&#xff09;2.当用户上拉加载过快时&#xff0c;会不停的调用接口&#xff0c;需要节流阀isLoading3.上拉加载到最后一页的判断&#xff0c;isFinish// pages/list.js…

【树上倍增 LCA DFS 前缀和】P10391 [蓝桥杯 2024 省 A] 零食采购|普及+

本文涉及知识点 C算法&#xff1a;前缀和、前缀乘积、前缀异或的原理、源码及测试用例 包括课程视频 CDFS 树上倍增 LCA P10391 [蓝桥杯 2024 省 A] 零食采购 题目描述 小蓝准备去星际旅行&#xff0c;出发前想在本星系采购一些零食&#xff0c;星系内有 nnn 颗星球&#x…

PDF发票批量打印工具哪个好?高效打印发票的实用工具推荐

开小超市这几年&#xff0c;每月要打几十张进货发票做账&#xff0c;以前打印时总犯愁&#xff1a;有的发票 PDF 太大&#xff0c;打出来字小得看不清&#xff1b;有的又太窄&#xff0c;白白浪费半张纸。试过手动调整&#xff0c;每张都要改缩放比例&#xff0c;累不说&#x…

4G模块 A7680通过MQTT协议连接到华为云

命令说明 基础AT指令 ATi显示产品的标志信息 ATCIMI查询IMSI ATCICCID从SIM卡读取ICCID ATCGSN查询产品序列号 ATCPIN查询卡状态 ATCSQ查询信号强度 ATCGATT查询当前PS域状态 ATCREG查询GPRS注册状态 ATCEREG查询4G注册状态 ATCGPADDR查询PDP地址 ATCMGF选择短信格式 ATCMGS发…

大模型词表设计与作用解析

几乎所有大型语言模型&#xff08;LLM&#xff09;都有自己独立的词表&#xff08;Vocabulary&#xff09;。这是模型设计和训练过程中的核心组件之一。以下是关于词表的关键点&#xff1a; 1. 词表的作用 分词基础&#xff1a;词表定义了模型如何将输入文本拆分成基本单元&…

(一)Eshop(异常处理中间件/grpc)

文章目录项目地址一、异常处理1.1 自定异常1.2 自定义异常处理中间件1.3 注册中间件二、grpc服务2.1 创建protos1. 打折的protos2. 设置grpc server3. program配置服务4. docker-compose2.2 CRUD1. 查询2.3 测试1. 发起查询请求三、grpc服务消费3.1 创建client1. 添加服务2. 选…

BLIP、InternVL Series(下)

目录 一、InternVL1.5 1、改进 二、InternVL2 1、渐进式扩展 2、多模态扩展 三、InternVL2.5 1、方法 2、数据优化 四、InternVL3 2、方法 3、训练后处理 4、测试时扩展 五、BLIP-3o 一、InternVL1.5 1、改进 InternVL1.5在InternVL基础上&#xff0c;优化了QLLa…

【数据结构】二维差分数组

题目链接 【模板】二维差分_牛客题霸_牛客网 牛客网 - 找工作神器|笔试题库|面试经验|实习招聘内推&#xff0c;求职就业一站解决_牛客网 描述 给定一个 nmnm 的整数矩阵 bb&#xff0c;矩阵的下标从 11 开始记作 bi,jbi,j​。现在需要支持 qq 次操作&#xff0c;第 tt 次…

【JDK内置工具】常用工具和实战指令

作者&#xff1a;唐叔在学习 专栏&#xff1a;唐叔的Java实践 关键词: #JDK工具 #Java性能调优 #JVM调优 #内存泄漏排查 #线程死锁分析 #Java开发工具 #线上问题排查 #Java诊断工具 Hello&#xff0c;大家好&#xff0c;我是爱学习的唐叔。作为Java开发者&#xff0c;JDK内置工…

一站式PDF转Markdown解决方案PDF3MD

简介 什么是 PDF3MD &#xff1f; PDF3MD 是一个现代化、用户友好的网络应用程序&#xff0c;旨在将 PDF 文档转换为干净、格式化的 Markdown 文本。它提供了高效的转换工具&#xff0c;支持多种文件格式之间的转换。 主要特点 PDF 转 Markdown&#xff1a;能够将 PDF 文档转…

RocketMQ学习系列之——MQ入门概念

一、什么是MQMQ&#xff08;Message Queue&#xff0c;消息队列&#xff09;是一种能够实现跨进程消息传输&#xff0c;并且消息缓存符合队列特性的组件。二、MQ的作用异步&#xff1a;消息发送方无需等待消息接收方收到消息&#xff0c;发送方将消息成功发送到 MQ 之后即可无阻…

血条识别功能实现及原理

从零开始学Python图像处理 - 血条识别 从实际问题中能快速的学习特定技能&#xff0c;通过完成一个能自动刷怪的工具&#xff0c;达成快速学习python图像处理和识别。 自动刷怪需要先识别怪物&#xff0c;在游戏中怪物类型很多&#xff0c;同时在移动中形态会一直发生变化&…

网络地址和主机地址之间进行转换的类

#pragma once #include "Common.hpp" // 网络地址和主机地址之间进行转换的类class InetAddr { public:InetAddr(){}InetAddr(struct sockaddr_in &addr) : _addr(addr){// 网络转主机_port ntohs(_addr.sin_port); // 从网络中拿到的&#xff01;网络序列// _i…

《Python 项目 CI/CD 实战指南:从零构建自动化部署流水线》

🛠《Python 项目 CI/CD 实战指南:从零构建自动化部署流水线》 一、引言:为什么 Python 项目需要 CI/CD? 在现代软件开发中,CI/CD(持续集成 / 持续部署)已成为不可或缺的工程实践。它不仅提升了开发效率,还显著降低了部署风险。对于 Python 项目而言,CI/CD 的价值尤…

AJAX 技术

AJAX全称是 Asynchronous JavaScript and XML ( 异步的JavaScript 和 XML )&#xff0c;使用该技术后&#xff0c;可以实现不刷新整个网页&#xff0c;与服务器进行异步通信并更新部分网页。一&#xff09;为什么需要AJAX?传统网页在与服务器通信时&#xff0c;需要刷新整个页…

Python爬虫实战:研究NLTK库相关技术

1. 引言 1.1 研究背景与意义 随着互联网的快速发展,网络新闻已成为人们获取信息的主要来源之一。每天产生的海量新闻文本蕴含着丰富的信息和知识,但也给信息获取和分析带来了挑战。如何从大量非结构化的新闻文本中自动提取有价值的信息,识别热点话题和趋势,成为当前自然语…

ARM 学习笔记(二)

参考文献&#xff1a;《ARM ArchitectureReference Manual ARMv7-A and ARMv7-R edition》1、MMU 1.1 背景早期的内存是比较小的&#xff0c;一般是几十k&#xff0c;不过相应的程序也是比较小的&#xff0c;这时程序可以直接加载到内存中运行。后来为了支持多个程序的并行&…

Github 贪吃蛇 主页设置

自动化脚本顶部元信息触发条件&#xff08;on:&#xff09;作业&#xff08;jobs:&#xff09;步骤&#xff08;steps:&#xff09;1. 生成 SVG2. 推送到 output 分支Commit & Push在 README 里引用参考&#xff1a;https://github.com/Platane/Platane/tree/master 首先写…