Reactor 是一种事件驱动的设计模式(Event-Driven Pattern),主要用于处理高并发 I/O,特别适合网络服务器场景。它通过一个多路复用机制监听多个事件源(如 socket 文件描述符),并在事件就绪时将事件分发给对应的处理器(回调函数)执行。
六、单reactor单线程模型
1、核心思想
利用一个统一的事件分发中心(
EventLoop
),在单个线程中通过 I/O 多路复用机制(如 epoll)高效监听和响应多个并发连接的 I/O 事件,将事件检测与事件处理解耦,从而实现结构简单、性能良好的并发 I/O 服务器。
2、核心组件
类名 | 主要职责 |
---|---|
Channel | 封装一个 fd 的事件及回调,是 fd 与 epoll 的桥梁 |
EventLoop | 管理 epoll 及事件分发,是每个线程中的核心循环 |
TcpServer | 管理监听 socket 和连接接入逻辑,创建 TcpConn |
TcpConn | 封装已建立的连接的读写处理、缓冲、生命周期等 |
1)Channel类-事件通道
Channel 是 一个 fd 与其事件处理逻辑之间的中介,负责:
事件注册:设置监听哪些事件(如
EPOLLIN
,EPOLLOUT
)事件触发:内核通知时,调用用户设置的回调函数
与 EventLoop 协作:将事件注册/修改到
epoll
中
.h
/* 负责在事件分发系统中起到“事件通道”的作用,连接底层的I/O多路复用机制(如epoll、select、poll)和具体的事件处理逻辑 */ class EventLoop;class Channel { public:Channel(EventLoop& loop, int fd);~Channel();// 设置 fd 对应的感兴趣事件(EPOLLIN/EPOLLOUT)void EnableReading();void EnableWriting();void DisableWriting();void DisableAll();// 用户提供的读写事件回调void SetReadCallback(std::function<void()> cb);void SetWriteCallback(std::function<void()> cb);// 实际处理事件触发,调用该函数判断是否调用 read_cb_ 或 write_cb_void HandleEvent(uint32_t events);// 获取 fdint GetFd() const;// 获取事件uint32_t GetEvents() const;// 将当前 Channel 注册到 epoll 或修改其状态。void Update();private:int fd_; // 监听的文件描述符bool added_ = false; // 是否已添加到 epollEventLoop& loop_; // 所属的事件循环uint32_t events_; // 当前监听的事件类型(EPOLLIN/EPOLLOUT)std::function<void()> read_cb_; // 读事件回调std::function<void()> write_cb_; // 写事件回调 };
函数名 | 功能 | 调用时机 |
---|---|---|
EnableReading() | 关注读事件并更新 epoll 状态 | 监听 socket / 连接 socket 可读时 |
EnableWriting() | 关注写事件(注册 EPOLLOUT) | Send() 数据写不完时注册 |
DisableWriting() | 注销写事件(避免忙等) | 写完所有 buffer 后 |
HandleEvent(events) | 根据 epoll 返回事件触发回调 | epoll_wait 返回后由 EventLoop 调用 |
Update() | 将本 Channel 注册或修改到 epoll | 每次 Enable/Disable 事件后必须调用 |
.cpp
Channel::Channel(EventLoop& loop, int fd): fd_(fd), loop_(loop), events_(0) {}Channel::~Channel() {DisableAll(); }void Channel::EnableReading() {events_ |= EPOLLIN;Update(); }void Channel::EnableWriting() {events_ |= EPOLLOUT;Update(); }void Channel::DisableWriting() {events_ &= ~EPOLLOUT;Update(); }void Channel::DisableAll() {loop_.DelEvent(fd_);events_ = 0; }void Channel::SetReadCallback(std::function<void()> cb) {read_cb_ = std::move(cb); }void Channel::SetWriteCallback(std::function<void()> cb) {write_cb_ = std::move(cb); }void Channel::HandleEvent(uint32_t events) {if ((events & EPOLLIN) && read_cb_) read_cb_();if ((events & EPOLLOUT) && write_cb_) write_cb_(); }int Channel::GetFd() const {return fd_; }uint32_t Channel::GetEvents() const {return events_; }void Channel::Update() {if (!added_) {loop_.AddEvent(fd_, events_, this);added_ = true;} else {loop_.ModEvent(fd_, events_, this);} }
2)EventLoop类-事件循环
EventLoop 是 Reactor 的核心调度器,负责:
管理所有 Channel 的事件监听
调用 epoll_wait 等待就绪事件
分发事件并调用 Channel 的处理函数
.h
#define MAX_EVENTS 1024/* 负责事件的等待、分发和调度执行*/ class EventLoop { public:EventLoop() ;~EventLoop();void AddEvent(int fd, uint32_t events, void *ptr);void ModEvent(int fd, uint32_t events, void *ptr);void DelEvent(int fd);void Run();void stop(); private:int epfd_;bool running_; };
函数名 | 功能 | 说明 |
---|---|---|
Run() | 启动事件循环 | 持续调用 epoll_wait 并触发回调 |
stop() | 停止事件循环 | 设置 running_ 为 false |
AddEvent(fd, events, ptr) | 添加一个 fd 到 epoll | fd 封装在 Channel 内,由 TcpConn 创建 |
ModEvent(fd, events, ptr) | 修改 fd 的监听事件 | 典型于 Send() 或关闭写事件 |
DelEvent(fd) | 从 epoll 中删除 fd | 连接关闭、Channel 销毁时 |
.cpp
EventLoop::EventLoop() : epfd_(::epoll_create1(0)), running_(true) {if (epfd_ == -1){std::cerr << "epoll_create error: " << errno << std::endl;exit(EXIT_FAILURE);} }EventLoop::~EventLoop() {close(epfd_); }void EventLoop::AddEvent(int fd, uint32_t events, void *ptr) {epoll_event ev;ev.events = events;ev.data.ptr = ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_ADD, fd, &ev) == -1){std::cerr << "epoll_ctl add error: " << errno << std::endl;} }void EventLoop::ModEvent(int fd, uint32_t events, void *ptr) {epoll_event ev;ev.events = events;ev.data.ptr = ptr;if (::epoll_ctl(epfd_, EPOLL_CTL_MOD, fd, &ev) == -1){std::cerr << "epoll_ctl mod error: " << errno << std::endl;} }void EventLoop::DelEvent(int fd) {if (::epoll_ctl(epfd_, EPOLL_CTL_DEL, fd, nullptr) == -1){std::cerr << "epoll_ctl del error: " << errno << std::endl;} }void EventLoop::Run() {epoll_event events[MAX_EVENTS];while (running_){// 超时参数传入TimerInstance()->WaitTime(),功能是确保 epoll_wait 最迟要在定时任务到期时返回,否则任务会延迟处理。int nfds = ::epoll_wait(epfd_, events, MAX_EVENTS, TimerInstance()->WaitTime());if (nfds == -1){if (errno == EINTR) // EINTR(系统中断)时忽略重试,其他错误打印后直接返回。continue;std::cerr << "epoll_wait error: " << errno << std::endl;return;}// 遍历就绪事件数组for (int i = 0; i < nfds; ++i){// 获取就绪事件存储在 data.ptr 的 Channel*Channel* ch = static_cast<Channel*>(events[i].data.ptr);// 触发读/写等回调ch->HandleEvent(events[i].events);}// 处理定时器任务TimerInstance()->HandleTimeout();} }void EventLoop::stop() {running_ = false; }
3)TcpServer-连接管理
TcpServer 是服务端框架的顶层组织者,负责:
创建监听 socket:创建 socket、bind、listen
创建Accept Channel:负责接收新连接事件并注册到 EventLoop
创建新连接回调:接收到新连接后,通过回调交由用户处理
.h
class TcpConn; class EventLoop;class TcpServer { public:// 新连接回调,供用户处理新连接事件。using NewConnCallback = std::function<void(std::shared_ptr<TcpConn>)>;TcpServer(EventLoop& loop);~TcpServer();// 启动 TCP 服务监听,注册新连接的回调void Start(uint16_t port, NewConnCallback cb);private:// 新连接建立时的处理逻辑,作为回调函数使用void HandleAccept();private:EventLoop& loop_;int listen_fd_;std::shared_ptr<Channel> accept_channel_; // 监听 fd 的事件通道,负责读事件注册(新连接到来)NewConnCallback new_conn_cb_; // 外部注入的新连接回调函数 };
函数名 | 功能 | 说明 |
---|---|---|
Start(port, cb) | 初始化 socket,设置 Channel 和回调 | 注册 EPOLLIN 用于接收连接 |
HandleAccept() | 有新连接到来时被触发 | accept() 然后调用回调构建 TcpConn |
new_conn_cb_ | 用户设置的处理逻辑 | 通常设置为 lambda 创建 TcpConn 实例 |
.cpp
TcpServer::TcpServer(EventLoop& loop): loop_(loop), listen_fd_(-1) {}TcpServer::~TcpServer() {if (listen_fd_ != -1) {accept_channel_->DisableAll();close(listen_fd_);} }void TcpServer::Start(uint16_t port, NewConnCallback cb) {new_conn_cb_ = std::move(cb); // 保存回调函数// 创建 IPv4 TCP 套接字,非阻塞模式listen_fd_ = socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK, 0);if (listen_fd_ == -1) {std::cerr << "socket error: " << errno << std::endl;return;}// 配置 SO_REUSEADDR(端口立即重用,避免“Address already in use”) 和 SO_REUSEPORT(多进程监听同一端口,可用于多核负载均衡)int opt = 1;setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));setsockopt(listen_fd_, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt));// 绑定 socket 到指定端口地址sockaddr_in addr{};addr.sin_family = AF_INET;addr.sin_addr.s_addr = INADDR_ANY;addr.sin_port = htons(port);if (bind(listen_fd_, (sockaddr*)&addr, sizeof(addr)) == -1) {std::cerr << "bind error: " << errno << std::endl;close(listen_fd_);return;}// 开始监听,监听队列长度为 SOMAXCONN(4096)if (listen(listen_fd_, SOMAXCONN) == -1) {std::cerr << "listen error: " << errno << std::endl;close(listen_fd_);return;}// 创建 Channel 对象,监听 listen_fd_ 上的事件,会注册到 epoll 中accept_channel_ = std::make_shared<Channel>(loop_, listen_fd_);// 设置读事件回调函数:即新连接到来时应调用的逻辑函数accept_channel_->SetReadCallback([this]() { HandleAccept(); });// 启动监听accept_channel_->EnableReading();std::cout << "Server listening on port " << port << std::endl; }void TcpServer::HandleAccept() {sockaddr_in client_addr{};socklen_t len = sizeof(client_addr);// 接受新连接,返回新的客户端 fd,设置非阻塞int conn_fd = accept4(listen_fd_, (sockaddr*)&client_addr, &len, SOCK_NONBLOCK);if (conn_fd == -1) return;// 创建新的连接对象(TcpConn)管理该客户端 fdauto conn = std::make_shared<TcpConn>(conn_fd, loop_);// 调用用户逻辑处理这个连接if (new_conn_cb_) new_conn_cb_(conn); }
4)TcpConn-与客户端通信
TcpConn 封装了每个 TCP 连接的生命周期、读写事件、缓冲区管理等。
.h
/**声明 TcpConn 类,同时继承 enable_shared_from_this,方便在回调中安全获取 shared_ptr<TcpConn> 自己的智能指针。*/ class TcpConn : public std::enable_shared_from_this<TcpConn> { public:// 声明回调函数using ReadCallback = std::function<void()>;using CloseCallback = std::function<void()>;TcpConn(int fd, EventLoop& loop);~TcpConn();// 设置回调函数void SetReadCallback(ReadCallback cb);void SetCloseCallback(CloseCallback cb);// 异步发送数据int Send(const char* data, size_t size);// 获取当前接收缓冲区内的全部数据std::string GetAllData();private:// 内部事件处理函数:读取、写入、关闭连接void HandleRead();void HandleWrite();void Close();private:int fd_;bool closed_;EventLoop& loop_;MessageBuffer input_buffer_; // 读缓冲区std::string output_buffer_{}; // 写缓冲区std::shared_ptr<Channel> channel_; // 封装 fd 的 epoll 管理类ReadCallback read_cb_; // 外部注入的回调函数CloseCallback close_cb_; };
函数名 | 功能 | 调用说明 |
---|---|---|
HandleRead() | 可读事件触发时从 fd 读数据到 input_buffer_ | 然后触发 read_cb_ |
HandleWrite() | 可写事件触发时将 output_buffer_ 中数据写出 | 写完后注销 EPOLLOUT |
Send(data) | 异步发送数据 | 若 fd 可写则立即写,否则加入 buffer |
GetAllData() | 获取 input_buffer_ 所有数据 | 可在 onMessage 中使用 |
Close() | 主动关闭连接 | 注销事件、关闭 fd、触发 close_cb_ |
.cpp
TcpConn::TcpConn(int fd, EventLoop& loop): fd_(fd), closed_(false), loop_(loop) {// // 设置 socket 为非阻塞模式// int flags = fcntl(fd, F_GETFL, 0);// fcntl(fd, F_SETFL, flags | O_NONBLOCK);// 创建 Channel 并设置读写回调,注册 EPOLLIN 监听可读事件。channel_ = std::make_shared<Channel>(loop_, fd);channel_->SetReadCallback([this]() { HandleRead(); });channel_->SetWriteCallback([this]() { HandleWrite(); });channel_->EnableReading(); }TcpConn::~TcpConn() {Close(); }void TcpConn::SetReadCallback(ReadCallback cb) {read_cb_ = std::move(cb); }void TcpConn::SetCloseCallback(CloseCallback cb) {close_cb_ = std::move(cb); }int TcpConn::Send(const char* data, size_t size) {// 若连接已关闭或无数据,直接返回。if (closed_ || data == nullptr || size == 0) return -1;// 如果写缓冲区中已有未发送的数据,追加数据并监听写事件。if (!output_buffer_.empty()) {output_buffer_.append(data, size);channel_->EnableWriting();return size;}// 直接发送(零拷贝),MSG_NOSIGNAL:防止对方关闭连接时触发 SIGPIPE 信号(使得进程崩溃)。int n = send(fd_, data, size, MSG_NOSIGNAL);if (n < 0 && (errno == EAGAIN || errno == EWOULDBLOCK)) { // 非阻塞 socket 下,内核发送缓冲区可能暂时满// 将数据缓存到用户态 output_buffer_, 监听写事件output_buffer_.append(data, size);channel_->EnableWriting();} else if (n > 0 && n < static_cast<int>(size)) { // 有部分数据被写入 socket(如发送了 n 字节,小于总长度 size)// 剩下的数据未能写完,需要缓存到 output_buffer_,并监听写事件output_buffer_.append(data + n, size - n);channel_->EnableWriting();} else if (n < 0) {Close();}return n; }std::string TcpConn::GetAllData() {auto data = input_buffer_.GetAllData();if (data.first != nullptr) {// 获取读缓冲区全部有效数据std::string result(reinterpret_cast<char*>(data.first), data.second);// 标记已读的数据input_buffer_.ReadCompleted(data.second);return result;}return ""; }void TcpConn::HandleRead() {int err = 0;// 调用 MessageBuffer::Recv() 使用 readv() 读取数据,读取数据到缓冲区中。int n = input_buffer_.Recv(fd_, &err);if (n > 0 && read_cb_) {read_cb_(); // 触发读取回调逻辑} else if (n == 0 || (n < 0 && err != EAGAIN && err != EWOULDBLOCK)) { // 连接关闭或错误则关闭连接。Close();} }void TcpConn::HandleWrite() {// 如果写缓冲区为空,则取消监听写事件。if (output_buffer_.empty()) {channel_->DisableWriting();return;}// 缓冲区不为空,调用 send() 发送数据,发送成功则删除发送缓冲区中的数据。int n = send(fd_, output_buffer_.data(), output_buffer_.size(), MSG_NOSIGNAL);if (n > 0) {output_buffer_.erase(0, n);if (output_buffer_.empty()) {channel_->DisableWriting();}} else if (n < 0 && errno != EAGAIN && errno != EWOULDBLOCK) { // 非阻塞错误以外的写失败则关闭连接Close();} }void TcpConn::Close() {if (closed_) return;closed_ = true;channel_->DisableAll();close(fd_);if (close_cb_) close_cb_(); }
3、完整调用流程
启动阶段:
TcpServer::Start(port, cb)
:
创建
listen_fd
,设为非阻塞创建
accept_channel_
封装listen_fd
设置其
read_cb
为TcpServer::HandleAccept
将其注册到
EventLoop
中(AddEvent()
)
有连接到来:
内核通知
listen_fd
可读 →Channel::HandleEvent()
→read_cb_
→TcpServer::HandleAccept()
:
调用
accept()
,获得conn_fd
设置为非阻塞
构建
TcpConn
对象,封装连接的生命周期创建该连接的
Channel
,并注册其read_cb_
→TcpConn::HandleRead
数据收发流程:
conn_fd
可读 → epoll 通知 →EventLoop
触发TcpConn::HandleRead()
:
从 socket 读入数据到
input_buffer_
调用
read_cb_
让上层应用逻辑处理数据
TcpConn::Send()
:
若当前 fd 可写,则直接发送
否则写入
output_buffer_
,并注册EPOLLOUT
事件
channel_->SetWriteCallback(...)
设定写回调
conn_fd
可写 →TcpConn::HandleWrite()
:
从
output_buffer_
中写出数据若 buffer 为空,注销写事件,调用
writeCompleteCallback