Requestor 类是一个请求-响应管理器,负责发送请求并处理响应,支持三种交互模式:同步、异步和回调。它跟踪所有发出的请求,当响应到达时将其匹配到对应的请求并进行处理。
newDescribe 函数解析
newDescribe 函数负责创建和注册一个新的请求描述对象,它是请求-响应匹配系统的核心部分。
RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}
功能详解
- 创建请求记录:为每个发出的请求创建一个跟踪记录
- 设置请求属性:记录请求类型和回调函数
- 注册到映射表:将请求描述添加到全局映射表中
- 线程安全处理:确保多线程环境下操作安全
参数说明
- req:请求消息对象
- rtype:请求类型(同步、异步或回调)
- cb:可选的回调函数,默认为空
执行流程
- 加锁保护共享数据 std::unique_lock<std::mutex> lock(_mutex)
- 创建请求描述对象 rd = std::make_shared<RequestDescribe>()
- 设置请求消息 rd->request = req
- 设置请求类型 rd->rtype = rtype
- 如果是回调类型且回调函数有效,则设置回调 rd->callback = cb
- 将请求描述插入映射表 _request_desc.insert(std::make_pair(req->rid(), rd))
- 返回创建的请求描述对象
生活类比
想象这个函数是邮局的"寄件登记处":
顾客到达:你带着一封信(请求消息)来到邮局
填写单据:工作人员拿出一张跟踪单(RequestDescribe)
记录信息:
- 工作人员记录你的信件信息(request)
- 标记你选择的服务类型(rtype)
- 如果你选择了通知服务,记录你的电话号码(callback)
系统登记:工作人员将跟踪单输入电脑系统(request_desc映射表)
单据存档:跟踪单一份交给你,一份存档(返回rd)
getDescribe 函数解析
getDescribe 函数负责根据请求ID查找对应的请求描述对象,是响应处理过程中的关键环节。
RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}
功能详解
- 查找请求记录:根据请求ID在映射表中查找对应的请求描述
- 返回查询结果:如果找到则返回请求描述对象,否则返回空指针
- 线程安全处理:确保多线程环境下操作安全
参数说明
- rid:请求ID,用于标识特定请求
执行流程
- 加锁保护共享数据 std::unique_lock<std::mutex> lock(_mutex)
- 在映射表中查找指定ID的请求描述 it = _request_desc.find(rid)
- 如果未找到(迭代器指向末尾),返回空指针
- 如果找到,返回对应的请求描述对象
生活类比
想象这个函数是邮局的"查询窗口":
- 顾客查询:当回信到达时,分拣员需要知道这封信是谁寄的
- 提供编号:分拣员拿着信封上的编号(rid)去查询系统
- 系统查询:工作人员在电脑系统中输入编号(request_desc.find)
- 查询结果:
- 如果找到记录,系统显示寄件人信息和服务类型(返回请求描述)
- 如果找不到记录,系统显示"未知寄件人"(返回空指针)
delDescribe 函数解析
delDescribe 函数负责从映射表中删除指定请求ID的请求描述对象,是请求-响应流程完成后的清理步骤。
void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}
功能详解
- 删除请求记录:从全局映射表中移除已处理完成的请求描述
- 资源释放:释放不再需要的请求描述对象占用的内存
- 线程安全处理:确保多线程环境下操作安全
参数说明
- rid:要删除的请求ID
执行流程
- 加锁保护共享数据 std::unique_lock<std::mutex> lock(_mutex)
- 从映射表中删除指定ID的条目 _request_desc.erase(rid)
生活类比
想象这个函数是邮局的"归档清理"过程:
- 任务完成:当一封信的收发过程彻底完成后
- 记录清理:工作人员将这封信的跟踪单从活跃系统中移除
- 存档或销毁:跟踪单可能被归档或直接销毁(内存释放)
- 系统更新:更新电脑系统,移除该编号的记录(映射表删除)
void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}
删除的时候为什么要加锁呢,是因为如果删除之后再去删除会造成空指针吗?
加锁的原因并不是为了防止删除后再次删除导致的空指针问题,而是为了解决多线程并发访问带来的数据竞争和一致性问题。主要原因有:
数据一致性保护:
- 在多线程环境中,可能同时有多个线程访问和修改 _request_desc 映射表
- 如果一个线程正在读取映射表时,另一个线程删除了某个条目,可能导致第一个线程访问无效内存
容器迭代器失效:
- 在 C++ 标准容器中,当进行修改操作(如 erase)时,可能导致迭代器失效
- 如果一个线程持有映射表的迭代器,而另一个线程调用 erase,可能导致第一个线程的迭代器变为无效
原子性保证:
- 映射表的修改操作需要保证原子性,即操作要么完全执行,要么完全不执行
- 没有锁保护,可能导致修改操作被部分执行,使映射表处于不一致状态
与其他操作互斥:
- 同一个映射表上执行的增加、查找、删除操作需要互斥进行
- 锁确保在一个操作完成前,不会执行其他可能干扰的操作
生活类比:
想象邮局的记录系统是一本大账本,多个工作人员需要同时使用:
- 如果一个工作人员正在查看第5页记录
- 同时另一个工作人员撕掉了这一页(删除)
- 第一个工作人员会突然发现自己正在查看的页面不见了
为了防止这种情况,邮局规定:任何人要修改账本时,必须先取得"账本钥匙"(锁),使用完毕后归还钥匙。这样确保任何时候只有一个人能修改账本,保证账本内容的一致性和完整性。
所以,加锁不是为了防止删除后的空指针问题(erase对不存在的键是安全的),而是为了保护映射表在多线程环境下的一致性和完整性。
onResponse 函数解析
onResponse 函数是整个 Requestor 类的核心,它处理收到的响应消息并将其路由到对应的处理机制。
功能详解
- 响应匹配:根据响应ID找到对应的请求描述
- 类型分发:根据请求类型选择不同的处理方式
- 结果通知:通过不同机制通知等待者响应已到达
- 资源清理:处理完后删除请求描述
参数说明
- conn:收到响应的连接对象
- msg:响应消息
执行流程
- 从响应消息获取请求ID rid = msg->rid()
- 查找对应的请求描述 rdp = getDescribe(rid)
- 检查是否找到请求描述,未找到则记录错误并返回
- 根据请求类型进行处理:
- 异步请求:设置 promise 值 rdp->response.set_value(msg)
- 回调请求:检查回调函数是否存在,然后调用 rdp->callback(msg)
- 其他类型:记录未知类型错误
- 处理完成后删除请求描述 delDescribe(rid)
生活类比
想象这个函数是邮局的"回信分拣处":
回信到达:邮局收到一封回信(响应消息)
查找记录:工作人员查看回信编号(rid),在系统中查询(getDescribe)
检查记录:
- 如果找不到记录,记录异常情况("无人认领的回信")
- 如果找到记录,查看寄件人选择的服务类型(rtype)
分发处理:
- 如果是"跟踪号服务"(REQ_ASYNC),更新系统状态,标记包裹已到达(set_value)
- 如果是"通知服务"(REQ_CALLBACK),打电话通知寄件人(callback)
- 如果是未知服务,记录错误
清理记录:处理完成后,从活跃系统中删除该记录(delDescribe)
异步 send 函数
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}
功能详解
- 异步发送请求并返回 future 对象
- 允许调用者在未来某个时间点获取结果
执行流程
- 创建异步类型的请求描述 rdp = newDescribe(req, RType::REQ_ASYNC)
- 检查创建是否成功
- 发送请求 conn->send(req)
- 获取 future 对象 async_rsp = rdp->response.get_future()
生活类比
这就像快递跟踪服务:
- 你寄出包裹(发送请求)
- 快递公司给你一个跟踪号(future 对象)
- 你可以随时通过跟踪号查询状态
- 当你需要知道包裹是否送达时,查询跟踪号(调用 get() 或 wait())
同步 send 函数
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}
功能详解
- 同步发送请求并阻塞等待响应
- 简化接口,对调用者隐藏异步细节
执行流程
- 创建临时 future 对象 AsyncResponse rsp_future
- 调用异步版本的 send 函数
- 检查发送是否成功
- 阻塞等待响应 rsp = rsp_future.get()
生活类比
这就像窗口服务:
- 你去政务大厅办理业务(发送请求)
- 工作人员让你在窗口等着(阻塞等待)
- 你不能离开,直到业务办完(响应返回)
- 办完后工作人员直接把结果给你(设置 rsp)
回调式 send 函数
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);return true;}
功能详解
- 发送请求并注册回调函数
- 当响应到达时自动调用回调函数
- 适合事件驱动编程模型
执行流程
- 创建回调类型的请求描述 rdp = newDescribe(req, RType::REQ_CALLBACK, cb)
- 检查创建是否成功
- 发送请求 conn->send(req)
生活类比
这就像叫号服务:
- 你去餐厅点餐(发送请求)
- 服务员给你一个取餐号,告诉你"好了会叫你"(注册回调)
- 你可以去做其他事(不阻塞)
- 餐点准备好时,广播系统会叫你的号(回调函数被调用)
生活类比:邮政系统
想象 Requestor 是一个现代邮政系统,处理信件的发送和接收:
RequestDescribe(请求描述)= 邮件跟踪单
当你寄出一封信时,邮局会给你一个跟踪单,上面记录了:
- 你寄出的信件内容(request)
- 你希望如何收到回复(rtype)
- 你的联系方式(callback 或 response)
request_desc(请求映射表)= 邮局的跟踪系统数据库
邮局维护一个数据库,记录所有寄出的信件及其跟踪单。当回信到达时,可以查询这个数据库找到对应的寄件人信息。
三种 send 方法 = 三种不同的邮寄服务
同步 send(等待型服务)
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp)
就像你去邮局寄信并在窗口等待回复。你不会离开,直到收到回信。
异步 send(跟踪号服务)
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp)
你寄信后得到一个订单号,可以随时查询进度,但不必一直等待。
回调 send(通知服务)
bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb)
你寄信时留下电话号码,邮局承诺收到回信时会打电话通知你。
onResponse 方法 = 邮局的分拣员
void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg)
当回信到达邮局时,分拣员查看信封上的编号,找到对应的跟踪单,然后根据寄件人选择的服务类型进行处理:- 如果是等待型服务,通知窗口的工作人员- 如果是跟踪号服务,更新系统状态- 如果是通知服务,打电话给寄件人
线程安全机制 = 邮局的工作流程规范
std::unique_lock<std::mutex> lock(_mutex);
就像邮局有严格的工作流程,确保多个工作人员不会同时修改同一条记录,避免混乱。
完整流程类比
同步调用流程
1. 你(调用者)去邮局(Requestor)寄一封信(请求)
2. 邮局记录你的信件信息,并给你一个号码牌(请求ID)
3. 你坐在等候区,等待回信(阻塞等待)
4. 回信到达后,分拣员找到你的号码牌,通知你取信(设置结果)
5. 你收到回信,离开邮局(函数返回)
异步调用流程
1. 你去邮局寄信,但不想等待
2. 邮局给你一个跟踪号(future)
3. 你离开邮局,去做其他事情(非阻塞)
4. 当你需要查看结果时,使用跟踪号查询(future.get())
5. 如果回信已到,立即获得;如果未到,等待直到到达
回调调用流程
1. 你去邮局寄信,留下电话号码(回调函数)
2. 邮局记录你的联系方式,你离开去做其他事
3. 回信到达时,邮局根据记录的电话号码通知你(调用回调)
4. 你收到通知,了解回信内容(在回调中处理结果)这个系统确保了无论你选择哪种服务方式,你的信件和回复都能可靠地处理,而且多个用户的请求不会混淆。
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>
#include <functional>namespace bitrpc {namespace client {class Requestor {public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr) {ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("请求类型未知!!");}delDescribe(rid);}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};}
}
完整代码
#pragma once
#include "../common/net.hpp"
#include "../common/message.hpp"
#include <future>
#include <functional>namespace bitrpc {namespace client {class Requestor {public:using ptr = std::shared_ptr<Requestor>;using RequestCallback = std::function<void(const BaseMessage::ptr&)>;using AsyncResponse = std::future<BaseMessage::ptr>;struct RequestDescribe {using ptr = std::shared_ptr<RequestDescribe>;BaseMessage::ptr request;RType rtype;std::promise<BaseMessage::ptr> response;RequestCallback callback;};void onResponse(const BaseConnection::ptr &conn, BaseMessage::ptr &msg){std::string rid = msg->rid();RequestDescribe::ptr rdp = getDescribe(rid);if (rdp.get() == nullptr) {ELOG("收到响应 - %s,但是未找到对应的请求描述!", rid.c_str());return;}if (rdp->rtype == RType::REQ_ASYNC) {rdp->response.set_value(msg);}else if (rdp->rtype == RType::REQ_CALLBACK){if (rdp->callback) rdp->callback(msg);}else {ELOG("请求类型未知!!");}delDescribe(rid);}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, AsyncResponse &async_rsp) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_ASYNC);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);async_rsp = rdp->response.get_future();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, BaseMessage::ptr &rsp) {AsyncResponse rsp_future;bool ret = send(conn, req, rsp_future);if (ret == false) {return false;}rsp = rsp_future.get();return true;}bool send(const BaseConnection::ptr &conn, const BaseMessage::ptr &req, const RequestCallback &cb) {RequestDescribe::ptr rdp = newDescribe(req, RType::REQ_CALLBACK, cb);if (rdp.get() == nullptr) {ELOG("构造请求描述对象失败!");return false;}conn->send(req);return true;}private:RequestDescribe::ptr newDescribe(const BaseMessage::ptr &req, RType rtype, const RequestCallback &cb = RequestCallback()) {std::unique_lock<std::mutex> lock(_mutex);RequestDescribe::ptr rd = std::make_shared<RequestDescribe>();rd->request = req;rd->rtype = rtype;if (rtype == RType::REQ_CALLBACK && cb) {rd->callback = cb;}_request_desc.insert(std::make_pair(req->rid(), rd));return rd;}RequestDescribe::ptr getDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);auto it = _request_desc.find(rid);if (it == _request_desc.end()) {return RequestDescribe::ptr();}return it->second;}void delDescribe(const std::string &rid) {std::unique_lock<std::mutex> lock(_mutex);_request_desc.erase(rid);}private:std::mutex _mutex;std::unordered_map<std::string, RequestDescribe::ptr> _request_desc;};}
}