目录
- 1. 什么是消息队列?
- 2. 消息队列的优点
- 3. RabbitMQ 消息队列概述
- 4. RabbitMQ 安装
- 5. Exchange 四种类型
- 5.1 direct 精准匹配
- 5.2 fanout 广播
- 5.3 topic 正则匹配
- 6. RabbitMQ 队列模式
- 6.1 简单队列模式
- 6.2 工作队列模式
- 6.3 发布/订阅模式
- 6.4 路由模式
- 6.5 主题模式
- 6.6 RPC模式
- 6.7 发布者确认模式
- 7. 安装C++客户端库使用 RabbitMQ
- 8. AMQP-CPP 库的简单使用
- 9. RabbitMq 总结
1. 什么是消息队列?
(1)消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。
- 消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
- 消息队列,一般我们会简称他为MQ(Message Queue),消息队列可以简单的理解为:把要传输的数据放在队列中。
-
消息队列说明:
- Producer:消息生产者,负责产生和发送消息到 Broker;
- Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
- Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
-
RabbitMQ:是由erlang语言开发,基于AMQP(高级消息队列协议)协议实现的一种消息队列。市面上还有很多消息队列,比如Kafka、RocketMQ、Redis等。
-
官方文档:https://www.rabbitmq.com/tutorials
2. 消息队列的优点
(1)应用解偶:
- 比如在我们现在公司的业务常见中:
- 给客户打完电话之后我们需要根据通话录音进行打标签;
- 给客户打完电话之后我们需要给他发送短信;
- 给客户打完电话之后我们需要发送他的通话给机器人,让机器人自学习;
- 简单架构图如下:
- 如果没有消息队列,在A服务里面要写上3个API接口分别对应后面三个服务,突然有一天这个客户说我不需要发短信功能了,如果按照上面这种方式,我们就需要联系开发开始删代码,然后升级,刚升级好没几天,客户说我有要这个功能,那开发又要升级代码,这个时候开发集体离职了,(这每天干的完全是无用功)但是如果有消息队列那就完全不一样了,就会变成下面这个样子:
- 所以通过引入消息队列,A只需要写一个接口对接MQ了。不同的应用程序之间可以通过消息队列进行通信,而无需直接调用对方的接口或方法。这样可以降低系统中各个应用程序之间的耦合度,使得它们可以独立演化和扩展,而不会因为对方的变化而受到影响。
(2)异步处理:
- 还是上面那个场景来简述这个:A是公司的主要业务,打电话业务,BCD为非主要业务。
- 假设A调用BCD 接口需要50ms,那等A把所有接口调用完成之后需要150ms,对主业务消耗特别大,如果我们不用搭理BCD的处理,A直接把他交给消息队列,让消息队列去处理BCD,A只要把数据给消息队列就行了,那么A的压力就会很小,也不会影响主要业务流程。
(3)流量削峰:
- 打个比方,我们目前有A B两个服务,A服务的OPS峰值为100W,但是B服务的OPS峰值只有10w,这个时候来了90w个请求,A服务能处理过来没问题,但是这个时候B服务直接就崩溃了
- 如果这个时候我们在A和B之间加一个rabbitmq,我们让B每次去取9w,这样B服务就不会挂了。
3. RabbitMQ 消息队列概述
(1)RabbitMQ 优点如下:
- 可靠性:RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。
- 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。
- 集群:在相同局域网内的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用
- 联合:对于服务器来说,他比集群需要更多的松散和非可靠链接,为此RabbitMQ提供了联合模型
- 高可用的队列:在同一个集群里,队列可以被镜像到多个机器中,以确保当前某些硬件出现故障后,你的消息仍然可以被使用
- 多协议:RabbitMQ支持多种消息协议的消息传递
- 广泛的客户端:只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。
- 可视化管理工具:RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。
- 追踪:如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。
- 插件系统:RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。
(2)RabbitMQ 的概念模型:
- 所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。
(3)RabbitMQ 基本流程图:
- Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
- Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
- Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
- Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
- Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- Connection:网络连接,比如一个TCP连接。
- Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
- Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
- Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
- Broker:表示消息队列服务器实体。
(4)RabbitMQ使用中的一些概念:
- 在上面的RabbitMQ的基本流程图里面我们可以看到,RabbitMQ的整体工作流程是,生产者产生数据交给RabbitMQ,然后RabbitMQ通过Exchange更具规则来选择绑定到那个队列(Queues)中,然后消费者在到对应的队列里面去取数据。所以我们先安装 RabbitMQ 后下面就来讲解下Exchange的四种类型。
4. RabbitMQ 安装
(1)安装RabbitMq:
sudo apt install rabbitmq-server# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service
# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个administrator 用户,才可以做为远程登录和发表订阅消息。
# 添加用户
sudo rabbitmqctl add_user root 123456
# 设置用户 tag
sudo rabbitmqctl set_user_tags root administrator
# 设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management
(2)查看 rabbitmq-server 的状态:
(3)访问 webUI 界面,默认端口为 15672:
(4)浏览器访问管理界面(用户名和密码是上述添加的用户):
5. Exchange 四种类型
5.1 direct 精准匹配
(1)direct 交换机如下图:
- 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。
5.2 fanout 广播
(1)fanout 交换机如下图:
- 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。
5.3 topic 正则匹配
(1)topic 交换机如下图:
- topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。
(2)还有一个是headers 交换器,它是根据头部匹配。几乎是不使用的,这里就不介绍了。
6. RabbitMQ 队列模式
(1)基于Exchange交换机,RabbitMQ截至目前有七种队列模式:
- 简单队列模式。
- 工作队列模式。
- 发布/订阅模式。
- 路由模式。
- 主题模式。
- RPC模式。
- 发布者确认模式。
6.1 简单队列模式
(1)一个消息生产者,一个消息消费者,一个队列。也称为点对点模式。
- 图中P代表生产者,C代表消费者,Queue是队列名称。
- 我们看到是没有Exchange的,但是RabbitMQ也会有一个默认的交换机。这个默认的交换机通常被称为"amq.default"或者""(空字符串),是RabbitMQ自动创建的,用于在没有指定交换机的情况下将消息发送到队列。
//生产者
var factory = new ConnectionFactory { HostName = "localhost"}; //初始化连接信息
using var connection = factory.CreateConnection(); //创建连接
using var channel = connection.CreateModel(); //创建信道//声明一个队列,并将信道与队列绑定
channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);
//发送消息的内容
string message = $"Hello World!";
var body = Encoding.UTF8.GetBytes(message);//信道绑定交换机
channel.BasicPublish(exchange: string.Empty,routingKey: string.Empty,basicProperties: null,body: body);Console.WriteLine($" [x] Sent {message}");Console.WriteLine(" Press [enter] to exit.");//消费者
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");
};channel.BasicConsume(queue: "hello",autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
- 此时就会生产者发送一条消息,消费者就会接收一条消息。
6.2 工作队列模式
(1)工作队列又叫做任务队列,正常会按顺序把消息发送给每一个订阅的消费者,平均而言,每个消费者将获得相同数量的消息。(不是P发送一条消息,C1和C2都会收到,而是第一条C1消费,第二条C2消费。每个消息只会被一个消费者接收和处理)。
- 这样的好处是可以提高吞吐量,因为生产者发送了很多消息,但是消费者只有一个,消费者处理很慢,就会造成消息积压。
6.3 发布/订阅模式
(1)发布/订阅模式是一种消息传递模式,它允许发送者(发布者)将消息发布到多个接收者(订阅者)。消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。
- 所以消息传递模式,发布者不需要指定队列。发布/订阅模式交换机类型为Fanout。
//发布者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();//声明一个交换机,叫做logs,并且交换机的类型是Fanout
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var message = "publish_subscribe";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",routingKey: string.Empty,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent {message}");Console.WriteLine(" Press [enter] to exit.");//接收者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//创建一个具有生成名称的非持久、独占、自动删除队列
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] {message}");
};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
(2)注意:如果发布者已经发布消息到交换机,但还没有队列绑定到交换机,消息将会丢失。
6.4 路由模式
(1)路由模式也是一种消息传递模式,是基于消息的路由键(routing key)来将消息从交换机(exchange)发送到一个或多个队列中。相比较于发布/订阅模式,路由模式多了一个routing key的概念。
- 路由模式交换机类型为Direct。
//生产者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();//定义交换机名称以及类型为Direct
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);//定义路由键
string routingKey = "direct_test";//发送消息体
string message = "direct_message";
var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "direct_logs",routingKey: routingKey,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");Console.WriteLine(" Press [enter] to exit.");//消费者
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);//创建一个具有生成名称的非持久、独占、自动删除队列
var queueName = channel.QueueDeclare().QueueName;//路由键集合
var routeKeyArr = new string[] { "direct_test", "direct_test2" };foreach (var routeKey in routeKeyArr)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: routeKey);
}Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
- 路由模式,消费者可以监听多个路由键。
6.5 主题模式
(1)基于路由模式,仍然有局限性——它不能基于多个标准进行路由。也就是一个消费者只能接收完全与routing key相匹配的交换机。主题模式主要解决路由模式的不足,可以模糊匹配routing key。
- 路由模式交换机类型为Topic。
- 在生产者方面,基于 . 作为分隔符,用于routing key。比如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。可以是任何单词,但最多只有255 个字节。在消费者方面,绑定routing key有两种重要的情况:
(2)*(星号):匹配一个单词。具体语法:
var routeing_key = "info.debug.error";//匹配 info
"info.*.*"
//匹配debug
"*.debug.*"
//匹配error
"*.*.error"
(3)#(散列):匹配零个或多个单词。具体语法:
var routeing_key = "info.debug.error";//匹配 info
"info.#"
//匹配debug
"#.debug.#"
//匹配error
"*.*.error"
6.6 RPC模式
(1)RPC模式又叫"请求/回复模式"。
- RPC(Remote Procedure Call,远程过程调用)是一种用于在分布式系统中进行通信的技术。它允许一个进程(或线程)调用另一个进程(或线程)的过程(函数或方法),就像调用本地函数一样,而不需要开发者显式处理底层通信细节。
- 就是生产者发送一条消息,消费者端执行某个方法,获取值的同时,并返回到生产者。
6.7 发布者确认模式
(1)发布者确认模式(Publisher Confirmation)是 RabbitMQ 提供的一种机制,用于确保消息被成功发送到交换机(exchange)并被接收到,以及确保消息被正确地路由到队列中。
- 在传统的消息发布过程中,发布者发送消息到交换机后,并不知道消息是否已经被正确地处理。为了解决这个问题,RabbitMQ 提供了发布者确认模式,允许发布者确认消息是否已经被成功接收到。
7. 安装C++客户端库使用 RabbitMQ
(1)安装 RabbitMQ 的 C++客户端库:
- C 语言库:https://github.com/alanxz/rabbitmq-c
- C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master
(2)我们这里使用 AMQP-CPP 库来编写客户端程序。安装 AMQP-CPP:
sudo apt install libev-dev # libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make
sudo make install
8. AMQP-CPP 库的简单使用
(1)概述:
- AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq 服务发送来的数据,也可以生成发向 RabbitMq 的数据包。AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。
- 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 io,我们也可以选择 libevent、libev、libuv、asio 等异步通信组件,需要手动安装对应的组件。
- AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
- 注意:它需要 c++17 的支持。
(2)具体使用:
- AMQP-CPP 的使用有两种模式:
- 使用默认的 TCP 模块进行网络通信。
- 使用扩展的 libevent、libev、libuv、asio 异步通信组件进行通信。
- TCP 模式:
- 该模式下需要实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接,重写相关函数, 其中必须重写 monitor 函数。在 monitor 函数中需要实现的是将 fd 放入 eventloop(select 、 epoll) 中监控, 当 fd 可写可读就绪之后, 调用 AMQP-CPP 的 connection->process(fd, flags) 方法。
- TCP 模式使用较为麻烦,不过提供了灵活的网络层集成能力,可以根据项目需求选择合适的网络库进行集成。在实际应用中,建议结合事件循环库(如libuv、Boost.Asio等)使用以获得最佳性能。
- 扩展模式
- 以 libev 为例, 我们不必要自己实现 monitor 函数,可以直接使用 AMQP::LibEvHandler。
(3)常用类与接口介绍:
- Channel:
- channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel 。
- 因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。
namespace AMQP
{using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char *message)>;using FinalizeCallback = std::function<void()>;using QueueCallback = std::function<void(const std::string &name,uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message,uint64_t deliveryTag, bool redelivered)>;// 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallbackusing AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;// 使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{Channel(Connection *connection);bool connected();// 声明交换机,如果提供了一个空名称,则服务器将分配一个名称。// @param name 交换机的名称// @param type 交换类型// enum ExchangeType// {// fanout, 广播交换,绑定的队列都能拿到消息// direct, 直接交换,只将消息交给 routingkey 一致的队列// topic, 主题交换,将消息交给符合 bindingkey 规则的队列// headers,// consistent_hash,// message_deduplication// };// @param flags 交换机标志// 以下 flags 可用于交换机:// *-durable 持久化,重启后交换机依然有效// *-autodelete 删除所有连接的队列后,自动删除交换// *-passive 仅被动检查交换机是否存在// *-internal 创建内部交换// @param arguments 其他参数Deferred &declareExchange(const std::string_view &name,ExchangeType type, int flags, const Table &arguments);// 声明队列,如果不提供名称,服务器将分配一个名称。// @param name 队列的名称// @param flags 标志组合// flags 可以是以下值的组合:// -durable 持久队列在代理重新启动后仍然有效// -autodelete 当所有连接的使用者都离开时,自动删除队列// -passive 仅被动检查队列是否存在*-exclusive 队列仅存在于此连接,并且在连接断开时自动删除// @param arguments 可选参数DeferredQueue &declareQueue(const std::string_view &name,int flags, const Table &arguments);// 将队列绑定到交换机// @param exchange 源交换机// @param queue 目标队列// @param routingkey 路由密钥// @param arguments 其他绑定参数Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue,const std::string_view &routingkey, const Table &arguments);// 将消息发布到 exchange,必须提供交换机的名称和路由密钥。然后RabbitMQ 将尝试将消息发送到一个或多个队列。// 使用可选的 flags 参数,可以指定如果消息无法路由到队列时应该发生的情况。// @param exchange 要发布到的交易所// @param routingkey 路由密钥// @param envelope 要发送的完整信封// @param message 要发送的消息// @param size 消息的大小// @param flags 可选标志// 可以提供以下 flags:// -mandatory 如果设置,服务器将返回未发送到队列的消息// -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。bool publish(const std::string_view &exchange, const std::string_view &routingKey,const std::string &message, int flags = 0);// 告诉 RabbitMQ 服务器我们已准备好使用消息-也就是订阅队列消息,调用此方法后,RabbitMQ 开始向客户端应用程序传递消息。// @param queue 您要使用的队列// @param tag 将与此消费操作关联的消费者标记// consumer tag 是一个字符串标识符,如果以后想通过 channel::cancel()调用停止它,可以使用它来标识使用者。// 如果您没有指定使用者 tag,服务器将为您分配一个。// @param flags 其他标记// @param arguments 其他参数// 支持以下 flags:// -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息// -noack 如果设置了,则不必对已消费的消息进行确认// -exclusive 请求独占访问,只有此使用者可以访问队列DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag, int flags, const Table &arguments);// 确认接收到的消息,当在 DeferredConsumer::onReceived()方法中接收到消息时,必须确认该消息,// 以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。// @param deliveryTag 消息的唯一 delivery 标签// @param flags 可选标志bool ack(uint64_t deliveryTag, int flags = 0);};class DeferredConsumer{// 注册一个回调函数,该函数在消费者启动时被调用。DeferredConsumer &onSuccess(const ConsumeCallback &callback);// 注册回调函数,用于接收到一个完整消息的时候被调用void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered);DeferredConsumer &onReceived(const MessageCallback &callback);DeferredConsumer &onMessage(const MessageCallback &callback);};class Message : public Envelope{const std::string &exchange();const std::string &routingkey();};class Envelope : public MetaData{const char *body();uint64_t bodySize();};
}
- ev:
typedef struct ev_async
{EV_WATCHER(ev_async);EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1, /* unloop once */EVBREAK_ALL = 2 /* unloop all loops */
};
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
#define EV_DEFAULT ev_default_loop(0)
int ev_run(struct ev_loop *loop);
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);
(4)使用案例:
- publish.cpp:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>int main()
{//1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;//2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来AMQP::LibEvHandler handler(loop);//2.5. 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 实例化信道对象AMQP::TcpChannel channel(&connection);//4. 声明交换机AMQP::Deferred &deferred = channel.declareExchange("test-exchange", AMQP::ExchangeType::direct); deferred.onError([](const char *message){std::cout << "声明交换机失败:" << message << std::endl;exit(0);});deferred.onSuccess([](){std::cout << "test-exchange 交换机创建成功!" << std::endl;});//5. 声明队列AMQP::DeferredQueue &deferredQueue = channel.declareQueue("test-queue");deferredQueue.onError([](const char *message){std::cout << "声明队列失败:" << message << std::endl;exit(0);});deferredQueue.onSuccess([](){std::cout << "test-queue 队列创建成功!" << std::endl;});//6. 针对交换机和队列进行绑定auto &binding_deferred = channel.bindQueue("test-exchange", "test-queue", "test-queue-key");binding_deferred.onError([](const char *message) {std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;exit(0);});binding_deferred.onSuccess([](){std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;});//7. 向交换机发布消息for(int i = 0; i < 10; i++) {std::string msg = "Hello World-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if(ret == false) {std::cout << "publish 失败!\n";}}//启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}
- consume.cpp:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>// 消息回调处理函数的实现
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{std::string msg;msg.assign(message.body(), message.bodySize());std::cout << msg << std::endl;channel->ack(deliveryTag); // 对消息进行确认
}int main()
{// 1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;// 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来AMQP::LibEvHandler handler(loop);// 2.5. 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 3. 实例化信道对象AMQP::TcpChannel channel(&connection);// 4. 声明交换机AMQP::Deferred &deferred = channel.declareExchange("test-exchange", AMQP::ExchangeType::direct);deferred.onError([](const char *message){std::cout << "声明交换机失败:" << message << std::endl;exit(0); });deferred.onSuccess([](){ std::cout << "test-exchange 交换机创建成功!" << std::endl; });// 5. 声明队列AMQP::DeferredQueue &deferredQueue = channel.declareQueue("test-queue");deferredQueue.onError([](const char *message){std::cout << "声明队列失败:" << message << std::endl;exit(0); });deferredQueue.onSuccess([]() { std::cout << "test-queue 队列创建成功!" << std::endl; });// 6. 针对交换机和队列进行绑定auto &binding_deferred = channel.bindQueue("test-exchange", "test-queue", "test-queue-key");binding_deferred.onError([](const char *message){std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;exit(0); });binding_deferred.onSuccess([](){ std::cout << "test-exchange - test-queue 绑定成功!" << std::endl; });// 7. 订阅队列消息 -- 设置消息处理回调函数auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag") // 返回值 DeferredConsumer.onReceived(callback).onError([](const char *message){std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;exit(0); }); // 返回值是 AMQP::Deferred// 8. 启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}
- Makefile:
all:publish consume
publish:publish.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags
consume:consume.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags.PHONY:clean
clean:rm -f publish consume
- 运行结果:
9. RabbitMq 总结
(1)核心架构与组件:
- 核心组件:
(2)Exchange 类型详解:
(3)核心特性与机制:
- 消息可靠性:
- 高级功能:
(4)使用场景:
(5)消息生命周期:
(6)与其他消息中间件对比:
(7)总结:
- RabbitMQ 凭借其灵活的的路由机制、可靠的消息传递和丰富的生态系统,成为企业级异步通信的首选解决方案。掌握其核心原理和最佳实践,能有效构建高可靠、可扩展的分布式系统。