高性能分布式消息队列系统(四)

八、客户端模块的实现

客户端实现的总体框架

在 RabbitMQ 中,应用层提供消息服务的核心实体是 信道(Channel)
用户想要与消息队列服务器交互时,通常不会直接操作底层的 TCP 连接,而是通过信道来进行各种消息的发布、订阅、确认等操作。

信道可以看作是在单个 TCP 连接之上的轻量级虚拟连接,它负责封装具体的协议细节,屏蔽了网络通信的复杂性。
用户只需要调用信道提供的接口,发送消息或接收消息,无需关心底层数据如何传输、协议如何实现。

简单来说,用户面向的是信道服务接口,而信道背后处理了连接管理、数据编解码、协议交互等工作,实现了业务与网络通信的解耦。

客户端设计视角和服务器视角的对比

方面客户端视角(信道)服务端视角(连接+信道)
主要关注点 调用信道接口完成消息操作管理连接和信道,执行底层协议与业务逻辑
抽象层级   抽象出具体网络和协议细节解析信道请求,实现消息路由和持久化
资源管理    不关心连接具体实现,只用信道管理物理连接、信道状态及相关资源
多路复用    多信道复用同一连接,简化调用维护多信道,保证隔离与并发性能
用户责任     只需调用信道接口                          处理协议解析、消息调度、资源分配

  • 连接模块

客户端和服务端进行网络连接的基础。

一个直接面向用户的模块,内部包含多个对外提供服务的接口,用户需要什么服务进行调用对应的接口即可,其中包含交换机的声明/删除,队列的声明与删除,队列的绑定与解绑,消息的发布与订阅,订阅和解除订阅。

表达了客户端与服务器之间在消息队列系统中协作的流程

在仿 RabbitMQ 的消息队列系统中,客户端首先通过订阅者模块注册自身的消费者身份,并指定对应的消息处理回调函数;随后通过信道模块在单一的 TCP 连接上实现多路复用,创建多个逻辑信道以并行处理不同的消息服务(如发布、订阅、队列管理等)。客户端通过连接模块建立与服务器的连接,并在信道中发起具体的请求服务。

服务器接收到连接请求后,由服务器端的连接管理器创建连接上下文,并根据信道中携带的请求类型,路由到对应的处理模块(如交换机、队列或消息模块),执行相应的业务逻辑。处理结果再由服务器的异步线程池将数据封装好并通过网络返回给客户端,客户端的异步事件机制或线程池再触发对应的回调函数完成消息消费流程。

基于以上模块实现客户端的思路就非常清晰了

1、实例化异步线程对象

2、实例化连接对象

3、通过连接对象进行创建信道

4、根据信道进行获取自己的所需服务

5、关闭信道

6、关闭连接

8.1、订阅者模块

订阅者对象的设计

一个不向用户进行直接展示的模块,在客户端中进行体现的作用就是对角色的描述,表示这就是一个消费者,用户通常不直接操作订阅逻辑,而是通过定义“消费者”的方式进行消息处理逻辑的注册。

一个信道只有一个订阅者,所以说不需要进行订阅者的管理。订阅者这个模块很简单,没有涉及到一些业务模块的内容,业务模块的服务都在信道模块进行提供。

订阅者模块(消费者)这个类中成员变量的设计

  • 首先需要定义消费者ID,描述了收到该条消息后该如何进行对这条消息进行处理。
  • 其次是要进行订阅的哪个队列的ID和自动删除标志,描述了收到消息后是否需要对消息进行回复,是否要进行自动删除。
  • 最后是回调函数,描述了从队列中进行获取消息后应该如何进行处理,这部分由用户进行决定。

订阅者模块的实现

using ConsumerCallback = std::function<void(const std::string, const ys::BasicProperties *bp, const std::string)>;struct Consumer{using ptr=std::shared_ptr<Consumer>;std::string _cid;std::string _qname;bool _auto_ack;ConsumerCallback _callback;Consumer(){DLOG("new Consumer:%p",this);}Consumer(const std::string &cid, const std::string &qname, bool auto_ack, const ConsumerCallback &cb): _cid(cid), _qname(qname), _auto_ack(auto_ack), _callback(std::move(cb)){DLOG("new Consumer:%p",this);}~Consumer(){DLOG("del Consumer:%p",this);}};

在构造函数中,ConsumerCallback(是一个 std::function)内部可能有复杂对象(比如 Lambda、绑定的资源等),如果直接写 _callback(cb),会调用拷贝构造,可能涉及较多内存分配、资源拷贝,用 std::move(cb),可以让 ConsumerCallback 的内部资源被移动到 _callback,更高效。

 8.2、异步工作线程模块

用户虽然是通过信道进行网络通信的,但是网络通信的本质还是需要进行IO事件的监控的,这就要通过IO监控线程来进行整,不能在当前线程进行IO事件的监控,这样的话就会在当前线程进行阻塞住了.下面通过表格的方式进行说明

模块loopthreadpool
订阅客户端负责监听服务器消息推送(socket 读事件)- 接收到消息后,异步将业务处理放到 pool 中去执行,因为收到的消息可能需要进行处理会耗时,防止主线程阻塞,无法进行监听服务器消息的推送
发布客户端负责监听向服务器发送消息后的 socket 可写事件方便继续向服务器进行发送数据(或者响应 ack 等)- 应用层调用发布接口时,耗时操作(如序列化、日志记录)在 pool 中处理
class AsyncWorker{public:using ptr=std::shared_ptr<AsyncWorker>;muduo::net::EventLoopThread loopthread;threadpool pool;};

8.3、连接模块

这其实就是我们在Demo模块利用muduo库进行搭建的客户端,这里其实就是换了一层皮,称为连接模块。

class Connection{public:using ptr=std::shared_ptr<Connection>;typedef std::shared_ptr<google::protobuf::Message> MessagePtr;Connection(const std::string &sip, int sport,AsyncWorker::ptr worker): _latch(1), _client(worker->loopthread.startLoop(), muduo::net::InetAddress(sip, sport), "Client"), _dispatcher(std::bind(&Connection::onUnknownMessage, this, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3)),_codec(std::make_shared<ProtobufCodec>(std::bind(&ProtobufDispatcher::onProtobufMessage, &_dispatcher,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3))),_worker(worker),_channel_manager(std::make_shared<ChannelManager>()){_dispatcher.registerMessageCallback<ys::basicConsumeResponse>(std::bind(&Connection::consumeResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_dispatcher.registerMessageCallback<ys::basicCommonResponse>(std::bind(&Connection::basicResponse, this,std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setMessageCallback(std::bind(&ProtobufCodec::onMessage, _codec.get(),std::placeholders::_1, std::placeholders::_2, std::placeholders::_3));_client.setConnectionCallback(std::bind(&Connection::onConnection, this, std::placeholders::_1));_client.connect();_latch.wait(); // 阻塞等待,直到连接建立成功}Channel::ptr openChannel() {Channel::ptr channel = _channel_manager->create(_conn,_codec);bool ret=channel->opneChannel();if(ret==false){DLOG("打开信道失败");return Channel::ptr();}return channel;}void closeChannel(const Channel::ptr& channel){channel->closeChannel();_channel_manager->remove(channel->cid());}private://收到基础响应void basicResponse(const muduo::net::TcpConnectionPtr &conn, const basicCommonResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、将得到的响应对象进行添加到信道的基础响应channel->putBasicResponse(message);}//收到消息推送void consumeResponse(const muduo::net::TcpConnectionPtr &conn, const basicConsumeResponsePtr &message, muduo::Timestamp){//1、找到信道Channel::ptr channel=_channel_manager->get(message->cid());if(channel.get()==nullptr){DLOG("未找到信道消息");return;}//2、封装异步任务(消息处理任务),抛入线程池_worker->pool.push([channel,message](){channel->consume(message);});}void onUnknownMessage(const muduo::net::TcpConnectionPtr &conn, const MessagePtr &message, muduo::Timestamp){LOG_INFO << "onUnknownMessage: " << message->GetTypeName();conn->shutdown();}void onConnection(const muduo::net::TcpConnectionPtr &conn){if (conn->connected()){_latch.countDown(); // 唤醒主线程中的阻塞_conn = conn;}else{// 连接关闭时的操作_conn.reset();}}private:muduo::CountDownLatch _latch; // 实现同步的//muduo::net::EventLoopThread _loopthread; // 异步循环处理线程AsyncWorker::ptr _worker;muduo::net::TcpConnectionPtr _conn; // 客户端对应的连接muduo::net::TcpClient _client;      // 客户端ProtobufDispatcher _dispatcher;     // 请求分发器ProtobufCodecPtr _codec;               // 协议处理器ChannelManager::ptr _channel_manager;};

在连接模块这里是有一个极易容易进行掉进坑里的陷阱 :当发布客户端进行向服务器进行发送建立连接请求的时候,由于TCP是有发送缓冲区和接收缓冲区的,当请求被发送到发送缓存区的时候,就会默认连接建立成功,但是此时的连接是没有被建立成功的,此时发布客户端误以为连接是建立成功的,就会执行后续操作,向服务器进行发送消息,此时就出现了问题。同样订阅客户端也类似。

因此在onConnection 函数中需要进行判断是否是真正的建立连接成功。

consumeResponse中

当连接收到消息推送后,需要_consumer 进行参与,因为只有consumer中有回调函数,知道进行收到消息推送时如何进行处理,这个接口到时候收到消息之后和消息一起进行封装成一个任务,把这个任务放到线程池中,并不在当前的主执行流中进行执行。

8.4、信道管理模块

信道模块的定位与主要职责

信道不仅仅是数据的通道,还承载着客户端的业务接口,因此这个模块不仅要进行信道结构的设计,还需要进行提供对应的业务逻辑。信道类可以理解为客户端在和消息服务器交互时的一条逻辑通道。它并不是单纯的数据结构,而是抽象出与服务器交互(各种请求/响应、状态维护等)的一套完整业务流程封装

换句话说,其实可以将信道模块进行理解成将订阅者模块、异步线程模块、和连接模块进行统一封装管理。

客户端信道模块和客户端其他模块之间的交互关系

模块交互内容
连接模块
muduo::net::TcpConnection
- Channel 直接持有 TCP 连接 _conn
- 通过 _conn 发送请求消息给服务器
- 服务器响应也通过该连接返回
订阅者模块
Consumer
- Channel 通过 basicConsume() 创建订阅者对象 _consumer
- 收到推送消息时,Channel::consume() 回调订阅者的处理逻辑
异步线程模块
(muduo 的 IO 线程)
- 服务器响应由 IO 线程收到
- 触发 Channel::putBasicResponse(),将响应加入 _basic_resp
- 触发 Channel::consume(),调用用户回调

8.2.1、信道管理的信息

  • 信道ID
  • 信道关联的网络通信连接的对象
  • protobuf 协议处理对象
  • 信道关联的消费者
  • 请求对应的响应信息队列(这里队列使用<请求ID,响应>hash表,一遍进行查找指定的响应)
  • 互斥锁&条件变量(大部分的请求都是阻塞操作,发送请求后需要进行等到响应才能继续,但是muduo库的通信是异步的,因此需要我们子啊收到响应后,通过判断是否是等待的响应来进行同步)。 

8.2.2、信道管理的操作

  • 创建信道的操作
  • 提供删除信道操作
  • 提供声明交换机操作(强断言-有则OK,没有则创建)
  • 提供删除交换机
  • 提供创建队列操作(强断言-有则OK,没有则创建)
  • 提供删除队列操作
  • 提供交换机-队列绑定操作
  • 提供交换机-队列解除绑定操作
  • 提供添加订阅操作
  • 提供取消订阅操作
  • 提供发布消息操作
  • 提供确认消息操作信道模块进行管理
using ProtobufCodecPtr=std::shared_ptr<ProtobufCodec>;using basicCommonResponsePtr=std::shared_ptr<ys::basicCommonResponse>;using basicConsumeResponsePtr=std::shared_ptr<ys::basicConsumeResponse>;class Channel{public:using ptr=std::shared_ptr<Channel>;Channel(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec):_cid(UUIDHelper::uuid()),_conn(conn),_codec(codec){}~Channel(){basicCancel();}bool opneChannel(){std::string rid=UUIDHelper::uuid();ys::openChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void closeChannel(){std::string rid=UUIDHelper::uuid();ys::closeChannelRequest req;req.set_rid(rid);req.set_cid(_cid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool declareExchange(const std::string& name,ys::ExchangeType type,bool durable,bool auto_delete,google::protobuf::Map<std::string,std::string>& args){//构造一个声明虚拟机的请求对象ys::declareExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);req.set_exchange_type(type);req.set_durable(durable);req.set_auto_delete(auto_delete);req.mutable_args()->swap(args);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteExchange(const std::string& name){ys::deleteExchangeRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(name);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);return;}bool declareQueue(const std::string& qname,bool qdurable,bool qexclusive,bool qauto_delete,google::protobuf::Map<std::string,std::string> qargs){ys::declareQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);req.set_durable(qdurable);req.set_exclusive(qexclusive);req.set_auto_delete(qauto_delete);req.mutable_args()->swap(qargs);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void deleteQueue(const std::string& qname){ys::deleteQueueRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(qname);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool queueBind(const std::string& ename,const std::string& qname,const std::string& key){ys::queueBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);req.set_binding_key(key);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void queueUnBind(const std::string& ename,const std::string& qname){ys::queueUnBindRequest req;std::string rid=UUIDHelper::uuid();req.set_rid(rid);req.set_cid(_cid);req.set_exchange_name(ename);req.set_queue_name(qname);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool basicPublish(const std::string &ename, ys::BasicProperties *bp,  const std::string &body){std::string rid=UUIDHelper::uuid();ys::basicPublishRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_body(body);req.set_exchange_name(ename);if(bp!=nullptr){req.mutable_properties()->set_id(bp->id());req.mutable_properties()->set_delivery_mode(bp->delivery_mode());req.mutable_properties()->set_routing_key(bp->routing_key());}//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);//返回return resp->ok();}void basicAck(const std::string &msgid)//删除参数qname,用户知道消费了哪个队列{if(_consumer.get()==nullptr){DLOG("消息确认时,找不到消费者的消息");return;}std::string rid=UUIDHelper::uuid();ys::basicAckRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_queue_name(_consumer->_qname);req.set_message_id(msgid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);//返回return;}bool basicConsume(const std::string consume_tag,const std::string queue_name,bool auto_ack,const ConsumerCallback &cb){if(_consumer.get()!=nullptr){DLOG("当前信道已订阅其他队列消息");return false;}std::string rid=UUIDHelper::uuid();ys::basicConsumeRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(consume_tag);req.set_queue_name(queue_name);req.set_auto_ack(auto_ack);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器basicCommonResponsePtr resp=waitResponse(rid);if(resp->ok()==false){DLOG("添加订阅失败!");return false;}_consumer=std::make_shared<Consumer>(consume_tag,queue_name,auto_ack,cb);//返回return resp->ok();}void basicCancel(){//不一定是消费者if(_consumer.get()==nullptr){DLOG("取消订阅时,找不到消费者信息");return;}std::string rid=UUIDHelper::uuid();ys::basicCancelRequest req;req.set_rid(rid);req.set_cid(_cid);req.set_consume_tag(_consumer->_cid);//向服务器进行发送请求_codec->send(_conn,req);//等待服务器waitResponse(rid);_consumer.reset();//返回return;}std::string cid(){return _cid;}public://连接收到基础响应后,向hash_map中进行添加响应void putBasicResponse(const basicCommonResponsePtr& resp){std::unique_lock<std::mutex> lock(_mutex);_basic_resp.insert(std::make_pair(resp->rid(),resp));_cv.notify_all();}//连接收到消息推送后,需要通过信道进行找到对应的消费对象,通过回调函数进行消息处理void consume(const basicConsumeResponsePtr& resp){if(_consumer.get()==nullptr){DLOG("消息处理时,未找到订阅者消息");return;}if(_consumer->_cid!=resp->consume_tag()){DLOG("收到的推送消息中的消费者标识,与当前信道的消费者标识不一致!");return;}_consumer->_callback(resp->consume_tag(),resp->mutable_properties(),resp->body());return;}private://等待请求的响应basicCommonResponsePtr waitResponse(const std::string& rid){std::unique_lock<std::mutex> lock(_mutex);//while(condition()) _cv.wait();_cv.wait(lock,[&rid,this](){return _basic_resp.find(rid)!=_basic_resp.end();});basicCommonResponsePtr basic_resp=_basic_resp[rid];_basic_resp.erase(rid);return basic_resp;}private:std::string _cid;muduo::net::TcpConnectionPtr _conn;ProtobufCodecPtr _codec;Consumer::ptr _consumer;std::mutex _mutex;std::condition_variable _cv;std::unordered_map<std::string,basicCommonResponsePtr> _basic_resp;};

该模块的注意事项

_consumer 这个成员是不需要在构造函数时进行初始化,当前这个信道要进行订阅某个消息的时候,才能确定这个角色是一个消费者角色,此时在进去构建,要是再构造函数的过程中就进行去构建,万一这个信道的角色是发布客户端,就造成了资源的浪费

在移除信道的时候要是消费者需要进行取消订阅一下,因此添加一个析构函数

8.2.4、对提供创建信道操作

信道的增删查

class ChannelManager{public:using ptr=std::shared_ptr<ChannelManager>;ChannelManager(){}Channel::ptr create(const muduo::net::TcpConnectionPtr& conn,const ProtobufCodecPtr& codec){std::unique_lock<std::mutex> lock(_mutex);auto channel =std::make_shared<Channel>(conn,codec);_channels.insert(std::make_pair(channel->cid(),channel));return channel;}void remove(const std::string& cid){//进行删除的时候还需要进行考虑是否为消费者(是需要进行取消订阅)std::unique_lock<std::mutex> lock(_mutex);_channels.erase(cid);}Channel::ptr get(const std::string& cid){auto pos=_channels.find(cid);if(pos==_channels.end()){return Channel::ptr();}return pos->second;}private:std::mutex _mutex;std::unordered_map<std::string,Channel::ptr> _channels;};

九、功能联调

9.1、联调的思想

  • 必须有一个生产者客户端
    • 声明一个交换机exchange1
    • 声明两个队列queue1(其中binding_key=queue1)、queue2(binding_key=new.music.#)
    • 将这两个队列和交换机进行绑定起来
  • 搭建两个消费者客户端,分别进行各自订阅一个队列的消息
    • 第一次,将交换机的类型进行定义为广播交换模式:理论结果是两个消费者客户端都能拿到消息
    • 第二次,将交换机的类型进行定义为直接交换模式:routing_key=queue1,理论是只有queue1能拿到消息
    • 第三次,将交换机的类型进行定义成主题交换模式:routing_key=news.music.pop,理论是只有queue2能拿到结果

9.2、搭建发布客户端

以广播模式下的测试为例子


int main()
{//广播交换下进行测试//直接交换下进行测试//主题交换下进行测试//1、实例化异步工作线程对象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、实例化连接对象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//3、通过连接进行创建信道brush::Channel::ptr channel=conn->openChannel();//4、通过信道提供的服务完成所需//4.1、声明一个交换机exchange1,交换机的类型为广播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、声明两个队列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、绑定queue1-exchange1,且binding_key设置成queue1//     绑定queue2-exchange1,且binding_key设置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、循环向交换机进行发布信息//广播交换// for(int i=1;i<10;i++)// {//     std::string msg="Hello world-"+std::to_string(i);//     channel->basicPublish("exchange1",nullptr,msg);//     DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());// }//直接交换// for(int i=0;i<10;i++)// {//     ys::BasicProperties bp;//     bp.set_id(brush::UUIDHelper::uuid());//     bp.set_delivery_mode(ys::DeliveryMode::DURABLE);//     bp.set_routing_key("queue1");//     std::string msg="Hello world-"+std::to_string(i);//     channel->basicPublish("exchange1",&bp,msg);//     DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());// }//主题交换for(int i=0;i<10;i++){ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.pop");std::string msg="Hello world-"+std::to_string(i);channel->basicPublish("exchange1",&bp,msg);DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());}ys::BasicProperties bp;bp.set_id(brush::UUIDHelper::uuid());bp.set_delivery_mode(ys::DeliveryMode::DURABLE);bp.set_routing_key("news.music.sport");std::string msg="Hello brush-";channel->basicPublish("exchange1",&bp,msg);DLOG("向交换机exchange进行发布了消息:%s",msg.c_str());//6、关闭信道//channel->closeChannel();conn->closeChannel(channel);
}

9.3、搭建消费者客户端

同样是以广播模式下的测试为例子

//需要进行增加传参
void cb(brush::Channel::ptr& channel, const std::string& consumer_tag, const ys::BasicProperties *bp, const std::string&body)
{DLOG("%s - 消费了消息: %s",consumer_tag.c_str(),body.c_str());std::cout<<"body:"<<body<<std::endl;channel->basicAck(bp->id());
}
int main(int argc,char*argv[])
{if(argc!=2){DLOG("usage: ./consumer_client queue1");return -1;}//1、实例化异步工作线程对象brush::AsyncWorker::ptr awp=std::make_shared<brush::AsyncWorker>();//2、实例化连接对象brush::Connection::ptr conn=std::make_shared<brush::Connection>("127.0.0.1",8085,awp);//DLOG("实例化连接成功");//3、通过连接进行创建信道brush::Channel::ptr channel=conn->openChannel();//DLOG("打开信道成功");//4、通过信道提供的服务完成所需//4.1、声明一个交换机exchange1,交换机的类型为广播模式google::protobuf::Map<std::string, std::string> temp_args;channel->declareExchange("exchange1",ys::TOPIC,true,false,temp_args);//4.2、声明两个队列queue1和queue2channel->declareQueue("queue1",true,false,false,temp_args);channel->declareQueue("queue2",true,false,false,temp_args);//4.3、绑定queue1-exchange1,且binding_key设置成queue1//     绑定queue2-exchange1,且binding_key设置成news.music.# channel->queueBind("exchange1","queue1","queue1");channel->queueBind("exchange1","queue2","news.music.#");//5、进行订阅队列的消息(回调函数对消息进行确认)//auto functor=std::bind(cb,channel,std::placeholders::_1,std::placeholders::_2,std::placeholders::_3);auto functor = std::bind(cb, channel, std::placeholders::_1, std::placeholders::_2, std::placeholders::_3);channel->basicConsume("consumer1",argv[1],false,functor);while(1) std::this_thread::sleep_for(std::chrono::seconds(3));//6、关闭信道conn->closeChannel(channel);return 0;
}

十、项目的扩展

  • 我们项目中只实现了一个虚拟机的版本,实际上是可以有多个虚拟机的
  • 我们是通过代码进行搭建客户端进行访问服务器的,可以进行拓展成管理接口,然后通过可视化的界面进行客户端的搭建
  • 交换机/队列的独占模式和自动删除
  • 发送方式的确认(broker 给生产者进行确认应答)功能也可以进行拓展实现

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.pswp.cn/web/82742.shtml

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈email:809451989@qq.com,一经查实,立即删除!

相关文章

QPair 类说明

QPair 类说明 QPair 是一个模板类&#xff0c;用于存储一对数据项。 头文件&#xff1a; cpp #include <QPair> qmake 配置&#xff1a; QT core 所有成员列表&#xff08;包括继承成员&#xff09; 公共类型 类型定义说明first_type第一个元素的类型&#xff…

4.大语言模型预备数学知识

大语言模型预备数学知识 复习一下在大语言模型中用到的矩阵和向量的运算&#xff0c;及概率统计和神经网络中常用概念。 矩阵的运算 矩阵 矩阵加减法 条件&#xff1a;行数列数相同的矩阵才能做矩阵加减法 数值与矩阵的乘除法 矩阵乘法 条件&#xff1a;矩阵A的列数 矩阵…

uniapp 设置手机不息屏

在使用 UniApp 开发应用时&#xff0c;有时需要在设备长时间未操作时实现息屏保护功能&#xff0c;以节省电量和保护屏幕。以下是如何在 UniApp 中实现这一功能的步骤。 示例一 // 保持屏幕常亮 uni.setKeepScreenOn({keepScreenOn: true });// 监听应用进入后台事件 uni.onH…

智能推荐系统:协同过滤与深度学习结合

智能推荐系统&#xff1a;协同过滤与深度学习结合 系统化学习人工智能网站&#xff08;收藏&#xff09;&#xff1a;https://www.captainbed.cn/flu 文章目录 智能推荐系统&#xff1a;协同过滤与深度学习结合摘要引言技术原理对比1. 协同过滤算法&#xff1a;基于相似性的推…

使用Python和OpenCV实现图像识别与目标检测

在计算机视觉领域&#xff0c;图像识别和目标检测是两个非常重要的任务。图像识别是指识别图像中的内容&#xff0c;例如判断一张图片中是否包含某个特定物体&#xff1b;目标检测则是在图像中定位并识别多个物体的位置和类别。OpenCV是一个功能强大的开源计算机视觉库&#xf…

《基于Apache Flink的流处理》笔记

思维导图 1-3 章 4-7章 8-11 章 参考资料 源码&#xff1a; https://github.com/streaming-with-flink 博客 https://flink.apache.org/bloghttps://www.ververica.com/blog 聚会及会议 https://flink-forward.orghttps://www.meetup.com/topics/apache-flink https://n…

LLaMA-Factory 微调 Qwen2-VL 进行人脸情感识别(二)

在上一篇文章中,我们详细介绍了如何使用LLaMA-Factory框架对Qwen2-VL大模型进行微调,以实现人脸情感识别的功能。本篇文章将聚焦于微调完成后,如何调用这个模型进行人脸情感识别的具体代码实现,包括详细的步骤和注释。 模型调用步骤 环境准备:确保安装了必要的Python库。…

Splash动态渲染技术全解析:从基础到企业级应用(2025最新版)

引言 在Web 3.0时代&#xff0c;87%的网站采用JavaScript动态渲染技术。传统爬虫难以应对Ajax加载、SPA应用等场景&#xff0c;Splash作为专业的JavaScript渲染服务&#xff0c;凭借​​Lua脚本控制​​和​​异步处理能力​​&#xff0c;已成为动态数据抓取的核心工具。本文…

【应用】Ghost Dance:利用惯性动捕构建虚拟舞伴

Ghost Dance是葡萄牙大学的一个研究项目&#xff0c;研究方向是探索人与人之间的联系&#xff0c;以及如何通过虚拟舞伴重现这种联系。项目负责人Cecilia和Rui利用惯性动捕创造出具有流畅动作的虚拟舞伴&#xff0c;让现实中的舞者也能与之共舞。 挑战&#xff1a;Ghost Danc…

广目软件GM DC Monitor

广目&#xff08;北京&#xff09;软件有限公司成立于2024年&#xff0c;技术和研发团队均来自于一家具有近10年监控系统研发的企业。广目的技术团队一共实施了9家政府单位、1家股份制银行、1家芯片制造企业的数据中心监控预警项目。这11家政企单位由2家正部级、1家副部级、6家…

12-Oracle 23ai Vector 使用ONNX模型生成向量嵌入

一、Oracle 23ai Vector Embeddings 核心概念​ 向量嵌入&#xff08;Vector Embeddings&#xff09;​​ -- 将非结构化数据&#xff08;文本/图像&#xff09;转换为数值向量 - - 捕获数据的语义含义而非原始内容 - 示例&#xff1a;"数据库" → [0.24, -0.78, 0.5…

用 NGINX 构建高效 POP3 代理`ngx_mail_pop3_module`

一、模块定位与作用 协议代理 ngx_mail_pop3_module 让 NGINX 能够充当 POP3 代理&#xff1a;客户端与后端 POP3 服务器之间的所有请求均转发到 NGINX&#xff0c;由 NGINX 负责与后端会话逻辑。认证方式控制 通过 pop3_auth 指令指定允许客户端使用的 POP3 认证方法&#xf…

每日算法 -【Swift 算法】三数之和

Swift&#xff5c;三数之和&#xff08;3Sum&#xff09;详细题解 注释 拓展&#xff08;LeetCode 15&#xff09; ✨题目描述 给你一个包含 n 个整数的数组 nums&#xff0c;判断 nums 中是否存在三个元素 a, b, c&#xff0c;使得 a b c 0。请你找出所有和为 0 且不重…

服务器磁盘空间被Docker容器日志占满处理方法

事发场景&#xff1a; 原本正常的服务停止运行了&#xff0c;查看时MQTT服务链接失败&#xff0c;查看对应的容器服务发现是EMQX镜像停止运行了&#xff0c;重启也是也报错无法正常运行&#xff0c;报错如下图&#xff1a; 报错日志中连续出现两个"no space left on devi…

令牌桶 滑动窗口->限流 分布式信号量->限并发的原理 lua脚本分析介绍

文章目录 前言限流限制并发的实际理解限流令牌桶代码实现结果分析令牌桶lua的模拟实现原理总结&#xff1a; 滑动窗口代码实现结果分析lua脚本原理解析 限并发分布式信号量代码实现结果分析lua脚本实现原理 双注解去实现限流 并发结果分析&#xff1a; 实际业务去理解体会统一注…

基于uniapp+WebSocket实现聊天对话、消息监听、消息推送、聊天室等功能,多端兼容

基于 ​UniApp + WebSocket​实现多端兼容的实时通讯系统,涵盖WebSocket连接建立、消息收发机制、多端兼容性配置、消息实时监听等功能,适配​微信小程序、H5、Android、iOS等终端 目录 技术选型分析WebSocket协议优势UniApp跨平台特性WebSocket 基础实现连接管理消息收发连接…

Linux中shell编程表达式和数组讲解

一、表达式 1.1 测试表达式 样式1: test 条件表达式 样式2: [ 条件表达式 ] 注意&#xff1a;以上两种方法的作用完全一样&#xff0c;后者为常用。但后者需要注意方括号[、]与条件表达式之间至少有一个空格。test跟 [] 的意思一样条件成立&#xff0c;状态返回值是0条件不成…

深入了解JavaScript当中如何确定值的类型

JavaScript是一种弱类型语言&#xff0c;当你给一个变量赋了一个值&#xff0c;该值是什么类型的&#xff0c;那么该变量就是什么类型的&#xff0c;并且你还可以给一个变量赋多种类型的值&#xff0c;也不会报错&#xff0c;这就是JavaScript的内部机制所决定的&#xff0c;那…

【p2p、分布式,区块链笔记 MESH】Bluetooth蓝牙通信拓扑与操作 BR/EDR(经典蓝牙)和 BLE

目录 1. BR/EDR&#xff08;经典蓝牙&#xff09;网络结构微微网&#xff08;Piconet&#xff09;散射网&#xff08;Scatternet&#xff09;蓝牙 BR/EDR 拓扑结构示意图 2. BLE&#xff08;低功耗蓝牙&#xff09;网络结构广播器与观察者&#xff08;Broadcaster and Observer…

C++虚函数表(虚表Virtual Table,简称vtable、VFT)(编译器为支持运行时多态(动态绑定)而自动生成的一种内部数据结构)虚函数指针vptr

文章目录 **1. 虚函数表的核心概念**- **虚函数表&#xff08;vtable&#xff09;**&#xff1a;- **虚函数指针&#xff08;vptr&#xff09;**&#xff1a; **2. 虚函数表的生成与工作流程****生成时机**- **当一个类中至少有一个虚函数时**&#xff0c;编译器会为该类生成一…