技术栈RabbitMq的介绍和使用

目录

  • 1. 什么是消息队列?
  • 2. 消息队列的优点
  • 3. RabbitMQ 消息队列概述
  • 4. RabbitMQ 安装
  • 5. Exchange 四种类型
    • 5.1 direct 精准匹配
    • 5.2 fanout 广播
    • 5.3 topic 正则匹配
  • 6. RabbitMQ 队列模式
    • 6.1 简单队列模式
    • 6.2 工作队列模式
    • 6.3 发布/订阅模式
    • 6.4 路由模式
    • 6.5 主题模式
    • 6.6 RPC模式
    • 6.7 发布者确认模式
  • 7. 安装C++客户端库使用 RabbitMQ
  • 8. AMQP-CPP 库的简单使用
  • 9. RabbitMq 总结

1. 什么是消息队列?

(1)消息队列(Message queue)是一种进程间通信或同一进程的不同线程间的通信方式,软件的贮列用来处理一系列的输入,通常是来自用户。

  • 消息队列提供了异步的通信协议,每一个贮列中的纪录包含详细说明的数据,包含发生的时间,输入设备的种类,以及特定的输入参数,也就是说:消息的发送者和接收者不需要同时与消息队列互交。消息会保存在队列中,直到接收者取回它。
  • 消息队列,一般我们会简称他为MQ(Message Queue),消息队列可以简单的理解为:把要传输的数据放在队列中。

  • 消息队列说明:

    • Producer:消息生产者,负责产生和发送消息到 Broker;
    • Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue;
    • Consumer:消息消费者,负责从 Broker 中获取消息,并进行相应处理;
  • RabbitMQ:是由erlang语言开发,基于AMQP(高级消息队列协议)协议实现的一种消息队列。市面上还有很多消息队列,比如Kafka、RocketMQ、Redis等。

  • 官方文档:https://www.rabbitmq.com/tutorials

2. 消息队列的优点

(1)应用解偶:

  • 比如在我们现在公司的业务常见中:
    • 给客户打完电话之后我们需要根据通话录音进行打标签;
    • 给客户打完电话之后我们需要给他发送短信;
    • 给客户打完电话之后我们需要发送他的通话给机器人,让机器人自学习;
  • 简单架构图如下:

  • 如果没有消息队列,在A服务里面要写上3个API接口分别对应后面三个服务,突然有一天这个客户说我不需要发短信功能了,如果按照上面这种方式,我们就需要联系开发开始删代码,然后升级,刚升级好没几天,客户说我有要这个功能,那开发又要升级代码,这个时候开发集体离职了,(这每天干的完全是无用功)但是如果有消息队列那就完全不一样了,就会变成下面这个样子:

  • 所以通过引入消息队列,A只需要写一个接口对接MQ了。不同的应用程序之间可以通过消息队列进行通信,而无需直接调用对方的接口或方法。这样可以降低系统中各个应用程序之间的耦合度,使得它们可以独立演化和扩展,而不会因为对方的变化而受到影响。

(2)异步处理:

  • 还是上面那个场景来简述这个:A是公司的主要业务,打电话业务,BCD为非主要业务。
  • 假设A调用BCD 接口需要50ms,那等A把所有接口调用完成之后需要150ms,对主业务消耗特别大,如果我们不用搭理BCD的处理,A直接把他交给消息队列,让消息队列去处理BCD,A只要把数据给消息队列就行了,那么A的压力就会很小,也不会影响主要业务流程。

(3)流量削峰:

  • 打个比方,我们目前有A B两个服务,A服务的OPS峰值为100W,但是B服务的OPS峰值只有10w,这个时候来了90w个请求,A服务能处理过来没问题,但是这个时候B服务直接就崩溃了

  • 如果这个时候我们在A和B之间加一个rabbitmq,我们让B每次去取9w,这样B服务就不会挂了。

3. RabbitMQ 消息队列概述

(1)RabbitMQ 优点如下:

  • 可靠性:RabbitMQ提供了多种技术可以让你在性能和可靠性之间进行权衡。这些技术包括持久性机制、投递确认、发布者证实和高可用性机制。
  • 灵活的路由:消息在到达队列前是通过交换机进行路由的。RabbitMQ为典型的路由逻辑提供了多种内置交换机类型。如果你有更复杂的路由需求,可以将这些交换机组合起来使用,你甚至可以实现自己的交换机类型,并且当做RabbitMQ的插件来使用。
  • 集群:在相同局域网内的多个RabbitMQ服务器可以聚合在一起,作为一个独立的逻辑代理来使用
  • 联合:对于服务器来说,他比集群需要更多的松散和非可靠链接,为此RabbitMQ提供了联合模型
  • 高可用的队列:在同一个集群里,队列可以被镜像到多个机器中,以确保当前某些硬件出现故障后,你的消息仍然可以被使用
  • 多协议:RabbitMQ支持多种消息协议的消息传递
  • 广泛的客户端:只要是你能想到的编程语言几乎都有与其相适配的RabbitMQ客户端。
  • 可视化管理工具:RabbitMQ附带了一个易于使用的可视化管理工具,它可以帮助你监控消息代理的每一个环节。
  • 追踪:如果你的消息系统有异常行为,RabbitMQ还提供了追踪的支持,让你能够发现问题所在。
  • 插件系统:RabbitMQ附带了各种各样的插件来对自己进行扩展。你甚至也可以写自己的插件来使用。

(2)RabbitMQ 的概念模型:

  • 所有 MQ 产品从模型抽象上来说都是一样的过程:消费者(consumer)订阅某个队列。生产者(producer)创建消息,然后发布到队列(queue)中,最后将消息发送到监听的消费者。

(3)RabbitMQ 基本流程图:

  • Message:消息,消息是不具名的,它由消息头和消息体组成。消息体是不透明的,而消息头则由一系列的可选属性组成,这些属性包括routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储)等。
  • Publisher:消息的生产者,也是一个向交换器发布消息的客户端应用程序。
  • Exchange:交换器,用来接收生产者发送的消息并将这些消息路由给服务器中的队列。
  • Binding:绑定,用于消息队列和交换器之间的关联。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。
  • Queue:消息队列,用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
  • Connection:网络连接,比如一个TCP连接。
  • Channel:信道,多路复用连接中的一条独立的双向数据流通道。信道是建立在真实的TCP连接内地虚拟连接,AMQP 命令都是通过信道发出去的,不管是发布消息、订阅队列还是接收消息,这些动作都是通过信道完成。因为对于操作系统来说建立和销毁 TCP 都是非常昂贵的开销,所以引入了信道的概念,以复用一条 TCP 连接。
  • Consumer:消息的消费者,表示一个从消息队列中取得消息的客户端应用程序。
  • Virtual Host:虚拟主机,表示一批交换器、消息队列和相关对象。虚拟主机是共享相同的身份认证和加密环境的独立服务器域。每个 vhost 本质上就是一个 mini 版的 RabbitMQ 服务器,拥有自己的队列、交换器、绑定和权限机制。vhost 是 AMQP 概念的基础,必须在连接时指定,RabbitMQ 默认的 vhost 是 / 。
  • Broker:表示消息队列服务器实体。

(4)RabbitMQ使用中的一些概念:

  • 在上面的RabbitMQ的基本流程图里面我们可以看到,RabbitMQ的整体工作流程是,生产者产生数据交给RabbitMQ,然后RabbitMQ通过Exchange更具规则来选择绑定到那个队列(Queues)中,然后消费者在到对应的队列里面去取数据。所以我们先安装 RabbitMQ 后下面就来讲解下Exchange的四种类型。

4. RabbitMQ 安装

(1)安装RabbitMq:

sudo apt install rabbitmq-server# 启动服务
sudo systemctl start rabbitmq-server.service
# 查看服务状态
sudo systemctl status rabbitmq-server.service
# 安装完成的时候默认有个用户 guest ,但是权限不够,要创建一个administrator 用户,才可以做为远程登录和发表订阅消息。
# 添加用户
sudo rabbitmqctl add_user root 123456
# 设置用户 tag
sudo rabbitmqctl set_user_tags root administrator
# 设置用户权限
sudo rabbitmqctl set_permissions -p / root "." "." ".*"
# RabbitMQ 自带了 web 管理界面,执行下面命令开启
sudo rabbitmq-plugins enable rabbitmq_management

(2)查看 rabbitmq-server 的状态:

(3)访问 webUI 界面,默认端口为 15672:


(4)浏览器访问管理界面(用户名和密码是上述添加的用户):

5. Exchange 四种类型

5.1 direct 精准匹配

(1)direct 交换机如下图:

  • 消息中的路由键(routing key)如果和 Binding 中的 binding key 一致, 交换器就将消息发到对应的队列中。路由键与队列名完全匹配,如果一个队列绑定到交换机要求路由键为“dog”,则只转发 routing key 标记为“dog”的消息,不会转发“dog.puppy”,也不会转发“dog.guard”等等。它是完全匹配、单播的模式。

5.2 fanout 广播

(1)fanout 交换机如下图:

  • 每个发到 fanout 类型交换器的消息都会分到所有绑定的队列上去。fanout 交换器不处理路由键,只是简单的将队列绑定到交换器上,每个发送到交换器的消息都会被转发到与该交换器绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。fanout 类型转发消息是最快的。

5.3 topic 正则匹配

(1)topic 交换机如下图:

  • topic 交换器通过模式匹配分配消息的路由键属性,将路由键和某个模式进行匹配,此时队列需要绑定到一个模式上。它将路由键和绑定键的字符串切分成单词,这些单词之间用点隔开。它同样也会识别两个通配符:符号“#”和符号“”。#匹配0个或多个单词,匹配不多不少一个单词。

(2)还有一个是headers 交换器,它是根据头部匹配。几乎是不使用的,这里就不介绍了。

6. RabbitMQ 队列模式

(1)基于Exchange交换机,RabbitMQ截至目前有七种队列模式:

  • 简单队列模式。
  • 工作队列模式。
  • 发布/订阅模式。
  • 路由模式。
  • 主题模式。
  • RPC模式。
  • 发布者确认模式。

6.1 简单队列模式

(1)一个消息生产者,一个消息消费者,一个队列。也称为点对点模式。

  • 图中P代表生产者,C代表消费者,Queue是队列名称。
  • 我们看到是没有Exchange的,但是RabbitMQ也会有一个默认的交换机。这个默认的交换机通常被称为"amq.default"或者""(空字符串),是RabbitMQ自动创建的,用于在没有指定交换机的情况下将消息发送到队列。
//生产者
var factory = new ConnectionFactory { HostName = "localhost"}; //初始化连接信息
using var connection = factory.CreateConnection();  //创建连接
using var channel = connection.CreateModel(); //创建信道//声明一个队列,并将信道与队列绑定
channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);
//发送消息的内容
string message = $"Hello World!";
var body = Encoding.UTF8.GetBytes(message);//信道绑定交换机
channel.BasicPublish(exchange: string.Empty,routingKey: string.Empty,basicProperties: null,body: body);Console.WriteLine($" [x] Sent {message}");Console.WriteLine(" Press [enter] to exit.");//消费者
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.QueueDeclare(queue: "hello",durable: false,exclusive: false,autoDelete: false,arguments: null);Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] Received {message}");
};channel.BasicConsume(queue: "hello",autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
  • 此时就会生产者发送一条消息,消费者就会接收一条消息。

6.2 工作队列模式

(1)工作队列又叫做任务队列,正常会按顺序把消息发送给每一个订阅的消费者,平均而言,每个消费者将获得相同数量的消息。(不是P发送一条消息,C1和C2都会收到,而是第一条C1消费,第二条C2消费。每个消息只会被一个消费者接收和处理)。

  • 这样的好处是可以提高吞吐量,因为生产者发送了很多消息,但是消费者只有一个,消费者处理很慢,就会造成消息积压。

6.3 发布/订阅模式

(1)发布/订阅模式是一种消息传递模式,它允许发送者(发布者)将消息发布到多个接收者(订阅者)。消息传递模型的核心思想是生产者从不直接向队列发送任何消息。实际上,生产者通常根本不知道消息是否会被传递到任何队列。

  • 所以消息传递模式,发布者不需要指定队列。发布/订阅模式交换机类型为Fanout。

//发布者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();//声明一个交换机,叫做logs,并且交换机的类型是Fanout
channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);var message = "publish_subscribe";
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: "logs",routingKey: string.Empty,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent {message}");Console.WriteLine(" Press [enter] to exit.");//接收者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "logs", type: ExchangeType.Fanout);//创建一个具有生成名称的非持久、独占、自动删除队列
var queueName = channel.QueueDeclare().QueueName;
channel.QueueBind(queue: queueName,exchange: "logs",routingKey: string.Empty);Console.WriteLine(" [*] Waiting for logs.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{byte[] body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);Console.WriteLine($" [x] {message}");
};channel.BasicConsume(queue: queueName,autoAck: false,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");

(2)注意:如果发布者已经发布消息到交换机,但还没有队列绑定到交换机,消息将会丢失。

6.4 路由模式

(1)路由模式也是一种消息传递模式,是基于消息的路由键(routing key)来将消息从交换机(exchange)发送到一个或多个队列中。相比较于发布/订阅模式,路由模式多了一个routing key的概念。

  • 路由模式交换机类型为Direct。

//生产者
var factory = new ConnectionFactory { HostName = "localhost"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();//定义交换机名称以及类型为Direct
channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);//定义路由键
string routingKey = "direct_test";//发送消息体
string message  = "direct_message";
var body = Encoding.UTF8.GetBytes(message);channel.BasicPublish(exchange: "direct_logs",routingKey: routingKey,basicProperties: null,body: body);
Console.WriteLine($" [x] Sent '{routingKey}':'{message}'");Console.WriteLine(" Press [enter] to exit.");//消费者
var factory = new ConnectionFactory { HostName = "localhost" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();channel.ExchangeDeclare(exchange: "direct_logs", type: ExchangeType.Direct);//创建一个具有生成名称的非持久、独占、自动删除队列
var queueName = channel.QueueDeclare().QueueName;//路由键集合
var routeKeyArr = new string[] { "direct_test", "direct_test2" };foreach (var routeKey in routeKeyArr)
{channel.QueueBind(queue: queueName,exchange: "direct_logs",routingKey: routeKey);
}Console.WriteLine(" [*] Waiting for messages.");var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{var body = ea.Body.ToArray();var message = Encoding.UTF8.GetString(body);var routingKey = ea.RoutingKey;Console.WriteLine($" [x] Received '{routingKey}':'{message}'");
};channel.BasicConsume(queue: queueName,autoAck: true,consumer: consumer);Console.WriteLine(" Press [enter] to exit.");
  • 路由模式,消费者可以监听多个路由键。

6.5 主题模式

(1)基于路由模式,仍然有局限性——它不能基于多个标准进行路由。也就是一个消费者只能接收完全与routing key相匹配的交换机。主题模式主要解决路由模式的不足,可以模糊匹配routing key。

  • 路由模式交换机类型为Topic。

  • 在生产者方面,基于 . 作为分隔符,用于routing key。比如“stock.usd.nyse”、“nyse.vmw”、“quick.orange.rabbit”。可以是任何单词,但最多只有255 个字节。在消费者方面,绑定routing key有两种重要的情况:

(2)*(星号):匹配一个单词。具体语法:

var routeing_key = "info.debug.error";//匹配 info
"info.*.*"
//匹配debug
"*.debug.*"
//匹配error
"*.*.error"

(3)#(散列):匹配零个或多个单词。具体语法:

var routeing_key = "info.debug.error";//匹配 info
"info.#"
//匹配debug
"#.debug.#"
//匹配error
"*.*.error"

6.6 RPC模式

(1)RPC模式又叫"请求/回复模式"。

  • RPC(Remote Procedure Call,远程过程调用)是一种用于在分布式系统中进行通信的技术。它允许一个进程(或线程)调用另一个进程(或线程)的过程(函数或方法),就像调用本地函数一样,而不需要开发者显式处理底层通信细节。
  • 就是生产者发送一条消息,消费者端执行某个方法,获取值的同时,并返回到生产者。

6.7 发布者确认模式

(1)发布者确认模式(Publisher Confirmation)是 RabbitMQ 提供的一种机制,用于确保消息被成功发送到交换机(exchange)并被接收到,以及确保消息被正确地路由到队列中。

  • 在传统的消息发布过程中,发布者发送消息到交换机后,并不知道消息是否已经被正确地处理。为了解决这个问题,RabbitMQ 提供了发布者确认模式,允许发布者确认消息是否已经被成功接收到。

7. 安装C++客户端库使用 RabbitMQ

(1)安装 RabbitMQ 的 C++客户端库:

  • C 语言库:https://github.com/alanxz/rabbitmq-c
  • C++库: https://github.com/CopernicaMarketingSoftware/AMQP-CPP/tree/master

(2)我们这里使用 AMQP-CPP 库来编写客户端程序。安装 AMQP-CPP:

sudo apt install libev-dev # libev 网络库组件
git clone https://github.com/CopernicaMarketingSoftware/AMQP-CPP.git
cd AMQP-CPP/
make 
sudo make install

8. AMQP-CPP 库的简单使用

(1)概述:

  • AMQP-CPP 是用于与 RabbitMq 消息中间件通信的 c++库。它能解析从 RabbitMq 服务发送来的数据,也可以生成发向 RabbitMq 的数据包。AMQP-CPP 库不会向 RabbitMq 建立网络连接,所有的网络 io 由用户完成。
  • 当然,AMQP-CPP 提供了可选的网络层接口,它预定义了 TCP 模块,用户就不用自己实现网络 io,我们也可以选择 libevent、libev、libuv、asio 等异步通信组件,需要手动安装对应的组件。
  • AMQP-CPP 完全异步,没有阻塞式的系统调用,不使用线程就能够应用在高性能应用中。
  • 注意:它需要 c++17 的支持。

(2)具体使用:

  • AMQP-CPP 的使用有两种模式:
    • 使用默认的 TCP 模块进行网络通信。
    • 使用扩展的 libevent、libev、libuv、asio 异步通信组件进行通信。
  • TCP 模式:
    • 该模式下需要实现一个类继承自 AMQP::TcpHandler 类, 它负责网络层的 TCP 连接,重写相关函数, 其中必须重写 monitor 函数。在 monitor 函数中需要实现的是将 fd 放入 eventloop(select 、 epoll) 中监控, 当 fd 可写可读就绪之后, 调用 AMQP-CPP 的 connection->process(fd, flags) 方法。
    • TCP 模式使用较为麻烦,不过提供了灵活的网络层集成能力,可以根据项目需求选择合适的网络库进行集成。在实际应用中,建议结合事件循环库(如libuv、Boost.Asio等)使用以获得最佳性能。
  • 扩展模式
    • 以 libev 为例, 我们不必要自己实现 monitor 函数,可以直接使用 AMQP::LibEvHandler。

(3)常用类与接口介绍:

  1. Channel:
    • channel 是一个虚拟连接,一个连接上可以建立多个通道。并且所有的 RabbitMq 指令都是通过 channel 传输,所以连接建立后的第一步,就是建立 channel 。
    • 因为所有操作是异步的,所以在 channel 上执行指令的返回值并不能作为操作执行结果,实际上它返回的是 Deferred 类,可以使用它安装处理函数。
namespace AMQP
{using SuccessCallback = std::function<void()>;using ErrorCallback = std::function<void(const char *message)>;using FinalizeCallback = std::function<void()>;using QueueCallback = std::function<void(const std::string &name,uint32_t messagecount, uint32_t consumercount)>;using DeleteCallback = std::function<void(uint32_t deletedmessages)>;using MessageCallback = std::function<void(const Message &message,uint64_t deliveryTag, bool redelivered)>;// 当使用发布者确认时,当服务器确认消息已被接收和处理时,将调用AckCallbackusing AckCallback = std::function<void(uint64_t deliveryTag, bool multiple)>;// 使用确认包裹通道时,当消息被 ack/nacked 时,会调用这些回调using PublishAckCallback = std::function<void()>;using PublishNackCallback = std::function<void()>;using PublishLostCallback = std::function<void()>;class Channel{Channel(Connection *connection);bool connected();// 声明交换机,如果提供了一个空名称,则服务器将分配一个名称。// @param name 交换机的名称// @param type 交换类型//     enum ExchangeType//     {//         fanout, 广播交换,绑定的队列都能拿到消息//         direct, 直接交换,只将消息交给 routingkey 一致的队列//         topic, 主题交换,将消息交给符合 bindingkey 规则的队列//         headers,//         consistent_hash,//         message_deduplication//     };// @param flags 交换机标志//     以下 flags 可用于交换机://     *-durable 持久化,重启后交换机依然有效//     *-autodelete 删除所有连接的队列后,自动删除交换//     *-passive 仅被动检查交换机是否存在//     *-internal 创建内部交换// @param arguments 其他参数Deferred &declareExchange(const std::string_view &name,ExchangeType type, int flags, const Table &arguments);// 声明队列,如果不提供名称,服务器将分配一个名称。// @param name 队列的名称// @param flags 标志组合//     flags 可以是以下值的组合://     -durable 持久队列在代理重新启动后仍然有效//     -autodelete 当所有连接的使用者都离开时,自动删除队列//     -passive 仅被动检查队列是否存在*-exclusive 队列仅存在于此连接,并且在连接断开时自动删除// @param arguments 可选参数DeferredQueue &declareQueue(const std::string_view &name,int flags, const Table &arguments);// 将队列绑定到交换机// @param exchange 源交换机// @param queue 目标队列// @param routingkey 路由密钥// @param arguments 其他绑定参数Deferred &bindQueue(const std::string_view &exchange, const std::string_view &queue,const std::string_view &routingkey, const Table &arguments);//    将消息发布到 exchange,必须提供交换机的名称和路由密钥。然后RabbitMQ 将尝试将消息发送到一个或多个队列。//    使用可选的 flags 参数,可以指定如果消息无法路由到队列时应该发生的情况。//    @param exchange 要发布到的交易所//    @param routingkey 路由密钥//    @param envelope 要发送的完整信封//    @param message 要发送的消息//    @param size 消息的大小//    @param flags 可选标志//         可以提供以下 flags://         -mandatory 如果设置,服务器将返回未发送到队列的消息//         -immediate 如果设置,服务器将返回无法立即转发给使用者的消息。bool publish(const std::string_view &exchange, const std::string_view &routingKey,const std::string &message, int flags = 0);//    告诉 RabbitMQ 服务器我们已准备好使用消息-也就是订阅队列消息,调用此方法后,RabbitMQ 开始向客户端应用程序传递消息。//    @param queue 您要使用的队列//    @param tag 将与此消费操作关联的消费者标记//     consumer tag 是一个字符串标识符,如果以后想通过 channel::cancel()调用停止它,可以使用它来标识使用者。//     如果您没有指定使用者 tag,服务器将为您分配一个。//    @param flags 其他标记//    @param arguments 其他参数//     支持以下 flags://     -nolocal 如果设置了,则不会同时消耗在此通道上发布的消息//     -noack 如果设置了,则不必对已消费的消息进行确认//     -exclusive 请求独占访问,只有此使用者可以访问队列DeferredConsumer &consume(const std::string_view &queue,const std::string_view &tag, int flags, const Table &arguments);// 确认接收到的消息,当在 DeferredConsumer::onReceived()方法中接收到消息时,必须确认该消息,// 以便 RabbitMQ 将其从队列中删除(除非使用 noack 选项消费)。// @param deliveryTag 消息的唯一 delivery 标签// @param flags 可选标志bool ack(uint64_t deliveryTag, int flags = 0);};class DeferredConsumer{// 注册一个回调函数,该函数在消费者启动时被调用。DeferredConsumer &onSuccess(const ConsumeCallback &callback);// 注册回调函数,用于接收到一个完整消息的时候被调用void MessageCallback(const AMQP::Message &message, uint64_t deliveryTag, bool redelivered);DeferredConsumer &onReceived(const MessageCallback &callback);DeferredConsumer &onMessage(const MessageCallback &callback);};class Message : public Envelope{const std::string &exchange();const std::string &routingkey();};class Envelope : public MetaData{const char *body();uint64_t bodySize();};
}
  1. ev:
typedef struct ev_async
{EV_WATCHER(ev_async);EV_ATOMIC_T sent; /* private */
} ev_async;
// break type
enum
{EVBREAK_CANCEL = 0, /* undo unloop */EVBREAK_ONE = 1,    /* unloop once */EVBREAK_ALL = 2     /* unloop all loops */
};
struct ev_loop *ev_default_loop(unsigned int flags EV_CPP(= 0));
#define EV_DEFAULT ev_default_loop(0)
int ev_run(struct ev_loop *loop);
void ev_break(struct ev_loop *loop, int32_t break_type);
void (*callback)(struct ev_loop *loop, ev_async *watcher, int32_t revents);
void ev_async_init(ev_async *w, callback cb);
void ev_async_start(struct ev_loop *loop, ev_async *w);
void ev_async_send(struct ev_loop *loop, ev_async *w);

(4)使用案例:

  • publish.cpp:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>int main()
{//1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;//2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来AMQP::LibEvHandler handler(loop);//2.5. 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);//3. 实例化信道对象AMQP::TcpChannel channel(&connection);//4. 声明交换机AMQP::Deferred &deferred = channel.declareExchange("test-exchange", AMQP::ExchangeType::direct); deferred.onError([](const char *message){std::cout << "声明交换机失败:" << message << std::endl;exit(0);});deferred.onSuccess([](){std::cout << "test-exchange 交换机创建成功!" << std::endl;});//5. 声明队列AMQP::DeferredQueue &deferredQueue = channel.declareQueue("test-queue");deferredQueue.onError([](const char *message){std::cout << "声明队列失败:" << message << std::endl;exit(0);});deferredQueue.onSuccess([](){std::cout << "test-queue 队列创建成功!" << std::endl;});//6. 针对交换机和队列进行绑定auto &binding_deferred = channel.bindQueue("test-exchange", "test-queue", "test-queue-key");binding_deferred.onError([](const char *message) {std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;exit(0);});binding_deferred.onSuccess([](){std::cout << "test-exchange - test-queue 绑定成功!" << std::endl;});//7. 向交换机发布消息for(int i = 0; i < 10; i++) {std::string msg = "Hello World-" + std::to_string(i);bool ret = channel.publish("test-exchange", "test-queue-key", msg);if(ret == false) {std::cout << "publish 失败!\n";}}//启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}
  • consume.cpp:
#include <ev.h>
#include <amqpcpp.h>
#include <amqpcpp/libev.h>
#include <openssl/ssl.h>
#include <openssl/opensslv.h>// 消息回调处理函数的实现
void MessageCb(AMQP::TcpChannel *channel, const AMQP::Message &message, uint64_t deliveryTag, bool redelivered)
{std::string msg;msg.assign(message.body(), message.bodySize());std::cout << msg << std::endl;channel->ack(deliveryTag); // 对消息进行确认
}int main()
{// 1. 实例化底层网络通信框架的I/O事件监控句柄auto *loop = EV_DEFAULT;// 2. 实例化libEvHandler句柄 --- 将AMQP框架与事件监控关联起来AMQP::LibEvHandler handler(loop);// 2.5. 实例化连接对象AMQP::Address address("amqp://root:123456@127.0.0.1:5672/");AMQP::TcpConnection connection(&handler, address);// 3. 实例化信道对象AMQP::TcpChannel channel(&connection);// 4. 声明交换机AMQP::Deferred &deferred = channel.declareExchange("test-exchange", AMQP::ExchangeType::direct);deferred.onError([](const char *message){std::cout << "声明交换机失败:" << message << std::endl;exit(0); });deferred.onSuccess([](){ std::cout << "test-exchange 交换机创建成功!" << std::endl; });// 5. 声明队列AMQP::DeferredQueue &deferredQueue = channel.declareQueue("test-queue");deferredQueue.onError([](const char *message){std::cout << "声明队列失败:" << message << std::endl;exit(0); });deferredQueue.onSuccess([]() { std::cout << "test-queue 队列创建成功!" << std::endl; });// 6. 针对交换机和队列进行绑定auto &binding_deferred = channel.bindQueue("test-exchange", "test-queue", "test-queue-key");binding_deferred.onError([](const char *message){std::cout << "test-exchange - test-queue 绑定失败:" << message << std::endl;exit(0); });binding_deferred.onSuccess([](){ std::cout << "test-exchange - test-queue 绑定成功!" << std::endl; });// 7. 订阅队列消息 -- 设置消息处理回调函数auto callback = std::bind(MessageCb, &channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel.consume("test-queue", "consume-tag") // 返回值 DeferredConsumer.onReceived(callback).onError([](const char *message){std::cout << "订阅 test-queue 队列消息失败:" << message << std::endl;exit(0); }); // 返回值是 AMQP::Deferred// 8. 启动底层网络通信框架--开启I/Oev_run(loop, 0);return 0;
}
  • Makefile:
all:publish consume
publish:publish.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags
consume:consume.ccg++ -g -o $@ $^ -std=c++17 -lamqpcpp -lev -lspdlog -lfmt -lgflags.PHONY:clean
clean:rm -f publish consume
  • 运行结果:

9. RabbitMq 总结

(1)核心架构与组件:

  • 核心组件:

(2)Exchange 类型详解:

(3)核心特性与机制:

  • 消息可靠性:

  • 高级功能:

(4)使用场景:

(5)消息生命周期:

(6)与其他消息中间件对比:

(7)总结:

  • RabbitMQ 凭借其灵活的的路由机制、可靠的消息传递和丰富的生态系统,成为企业级异步通信的首选解决方案。掌握其核心原理和最佳实践,能有效构建高可靠、可扩展的分布式系统。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/pingmian/84149.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

项目部署到Linux上时遇到的错误(Redis,MySQL,无法正确连接,地址占用问题)

Redis无法正确连接 在运行jar包时出现了这样的错误 查询得知问题核心在于Redis连接失败&#xff0c;具体原因是客户端发送了密码认证请求&#xff0c;但Redis服务器未设置密码 1.为Redis设置密码&#xff08;匹配客户端配置&#xff09; 步骤&#xff1a; 1&#xff09;.修…

Linux边缘智能:物联网的终极进化

Linux边缘智能&#xff1a;物联网的终极进化 从数据中心到万物终端的智能革命 引言&#xff1a;边缘计算的范式转变 随着物联网设备的爆炸式增长&#xff0c;传统的云计算架构已无法满足实时性、隐私保护和带宽效率的需求。边缘智能应运而生&#xff0c;将计算能力从云端下沉到…

Linux Shell 中的 dash 符号 “-”

Shell中的-&#xff1a;小符号的大智慧 在Unix/Linux系统中&#xff0c;-符号是一个约定俗成的特殊标记&#xff0c;它表示命令应该使用标准输入或标准输出而非文件。这个看似简单的短横线&#xff0c;完美诠释了Unix"一切皆文件"的设计哲学。 作为标准输入/输出的…

JMeter 实现 MQTT 协议压力测试 !

想象一下&#xff0c;你的智能家居系统连接了上千个设备&#xff0c;传感器数据通过 MQTT 协议飞速传输&#xff0c;但突然服务器崩溃&#xff0c;灯光、空调全失控&#xff01;如何确保你的 MQTT 经纪人能承受高负载&#xff1f;答案是 JMeter&#xff01;通过安装 MQTT 插件&…

CKA考试知识点分享(6)---PriorityClass

CKA 版本&#xff1a;1.32 第六套题是涉及PriorityClass相关。 注意&#xff1a;本文不是题目&#xff0c;只是为了学习相关知识点做的实验。仅供参考 实验目的 创建一套PriorityClass &#xff0c;验证PriorityClass的运作策略。 1 环境准备 创建2个pc&#xff0c;一个为高…

暴力破解篇补充-字典

在皮卡丘靶场的第二期&#xff0c;暴力破解模块中&#xff0c;我相信大家短暂的接触了字典这个概念&#xff0c;字典事实上就是包含了大量弱口令的txt文本文件 所以这篇文章用于分享一些字典&#xff1a;https://wwhc.lanzoue.com/ihdl12ybhbhi&#xff08;弱口令字典&#xff…

关于VS2022中C++导入第三方库的方式

首先&#xff0c;新建一个Cpp项目(控制台项目即可&#xff0c;其他项目也无所谓)&#xff0c;右键点击项目名称(Test1)选择属性或者在VS2022工具栏选择调试标签->属性按钮打开属性页。 注意点: 在开始其他操作前请注意先进行 配置和平台选项框的选择。配置选框选定了是配置…

C++中vector类型的介绍和使用

文章目录 一、vector 类型的简介1.1 基本介绍1.2 常见用法示例1.3 常见成员函数简表 二、vector 数据的插入2.1 push_back() —— 在尾部插入一个元素2.2 emplace_back() —— 在尾部“就地”构造对象2.3 insert() —— 在任意位置插入一个或多个元素2.4 emplace() —— 在任意…

在Vue或React项目中使用Tailwind CSS实现暗黑模式切换:从系统适配到手动控制

在现代Web开发中&#xff0c;暗黑模式(Dark Mode)已成为提升用户体验的重要功能。本文将带你使用Tailwind CSS在React项目(Vue项目类似)中实现两种暗黑模式控制方式&#xff1a; 系统自动适配 - 根据用户设备偏好自动切换手动切换 - 通过按钮让用户自由选择 一、项目准备 使…

Linux C语言网络编程详细入门教程:如何一步步实现TCP服务端与客户端通信

文章目录 Linux C语言网络编程详细入门教程&#xff1a;如何一步步实现TCP服务端与客户端通信前言一、网络通信基础概念二、服务端与客户端的完整流程图解三、每一步的详细讲解和代码示例1. 创建Socket&#xff08;服务端和客户端都要&#xff09;2. 绑定本地地址和端口&#x…

Tomcat 安装和配置

一、Tomcat官网 Apache Tomcat - Welcome! 选择解压到任意一个盘&#xff01;&#xff01; 二、Tomcat配置 1&#xff09;在系统变量处新建一个变量CATALINA_HOME。CATALINA_HOME环境变量的值&#xff0c;设置为Tomcat的解压安装目录 2&#xff09;找到系统变量Path&#xff0…

动态规划 熟悉30题 ---上

本来是要写那个二维动态规划嘛&#xff0c;但是我今天在问题时候&#xff0c;一个大佬就把他初一时候教练让他练dp的30题发出来了&#xff08;初一&#xff0c;啊虽然知道计算机这一专业&#xff0c;很多人从小就学了&#xff0c;但是我每次看到一些大佬从小学还是会很羡慕吧或…

基于stm32F10x 系列微控制器的智能电子琴(附完整项目源码、详细接线及讲解视频)

注&#xff1a;成品使用演示、项目源码、项目文档在文章末尾网盘链接中自取 所用硬件&#xff1a;STM32F103C8T6、无源蜂鸣器、44矩阵键盘、flash存储模块、OLED显示屏、RGB三色灯、面包板、杜邦线、usb转ttl串口 stm32f103c8t6 面包板 …

时间同步技术在电力系统中的应用

随着电力自动化技术的发展&#xff0c;时间同步不仅可以为电力系统的事后故障分析提供支持&#xff0c;而且已经参与到电力系统的实时控制中来&#xff0c;其可靠性对电力系统的稳定运行影响越来越大。在电力系统中&#xff0c;时间同步技术广泛应用于调度控制中心、发电厂、变…

XMLGregorianCalendar跟Date、localDateTime以及String有什么区别

1. java.util.Date&#xff08;已过时&#xff0c;不推荐新代码使用&#xff09; 特点 表示时间戳&#xff1a;存储自 1970-01-01 00:00:00 UTC&#xff08;Unix 纪元&#xff09; 以来的毫秒数。 问题&#xff1a; 不区分日期和时间&#xff0c;也没有时区支持&#xff08;依…

Python网页自动化Selenium中文文档

1. 安装 1.1. 安装 Selenium Python bindings 提供了一个简单的API&#xff0c;让你使用Selenium WebDriver来编写功能/校验测试。 通过Selenium Python的API&#xff0c;你可以非常直观的使用Selenium WebDriver的所有功能。 Selenium Python bindings 使用非常简洁方便的A…

玩转抖音矩阵:核心玩法与高效运营规则

一、 抖音矩阵&#xff1a;流量协同的生态网络 抖音矩阵&#xff0c;本质是运营一个相互关联、互相支持的抖音账号群。核心目标在于通过账号间的深度协同&#xff08;内容、流量、粉丝&#xff09;&#xff0c;打破单个账号的流量天花板&#xff0c;实现11>2的效果。它不仅…

C++11 constexpr和字面类型:从入门到精通

文章目录 引言一、constexpr的基本概念与使用1.1 constexpr的定义与作用1.2 constexpr变量1.3 constexpr函数1.4 constexpr在类构造函数中的应用1.5 constexpr的优势 二、字面类型的基本概念与使用2.1 字面类型的定义与作用2.2 字面类型的应用场景2.2.1 常量定义2.2.2 模板参数…

用电脑通过USB总线连接控制keysight示波器

通过USB总线控制示波器的优势 在上篇文章我介绍了如何通过网线远程连接keysight示波器&#xff0c;如果连接的距离不是很远&#xff0c;也可以通过USB线将示波器与电脑连接起来&#xff0c;实现对示波器的控制和截图。 在KEYSIGHT示波器DSOX1204A的后端&#xff0c;除了有网口…

StarRocks 全面向量化执行引擎深度解析

StarRocks 全面向量化执行引擎深度解析 StarRocks 的向量化执行引擎是其高性能的核心设计&#xff0c;相比传统行式处理引擎&#xff08;如MySQL&#xff09;&#xff0c;性能可提升 5-10倍。以下是分层拆解&#xff1a; 1. 向量化 vs 传统行式处理 维度行式处理向量化处理数…