目录
一.接口
1.1epoll_creaet
1.2epoll_ctl
1.3epoll_wait
二.细节问题
2.1 工作原理
2.2 epoll的demo
2.3 epoll的优点
三. LT 与 ET模式
理解ET
四. reactor
一.接口
1.1epoll_creaet
注意返回值是一个文件描述符
创建一个epoll模型
1.2epoll_ctl
返回值:
第一个参数是epoll_create的返回值
第二个参数表示动作,用三个宏来表示.
第三个参数是需要监听的 fd.
第四个参数是告诉内核需要监听什么事.
第二个参数的取值:
• EPOLL_CTL_ADD:注册新的 fd 到 epfd 中;
• EPOLL_CTL_MOD:修改已经注册的 fd 的监听事件;
• EPOLL_CTL_DEL:从 epfd 中删除一个 fd;
第四个参数:
• EPOLLIN : 表示对应的文件描述符可以读 (包括对端 SOCKET 正常关闭);
• EPOLLOUT : 表示对应的文件描述符可以写;
• EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外 数据到来);
• EPOLLERR : 表示对应的文件描述符发生错误;
• EPOLLHUP : 表示对应的文件描述符被挂断;
• EPOLLET : 将 EPOLL 设为边缘触发(Edge Triggered)模式, 这是相对于水平 触发(Level Triggered)来说的.
• EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继 续监听这个 socket 的话, 需要再次把这个 socket 加入到 EPOLL 队列里
1.3epoll_wait
• epoll 将会把发生的事件赋值到 events 数组中 (events 不可以是空指针,内核 只负责把数据复制到这个 events 数组中,不会去帮助我们在用户态中分配内存).
• maxevents 告之内核这个 events 有多大,这个 maxevents 的值不能大于创建 epoll_create()时的 size.
• 参数 timeout 是超时时间 (毫秒,0 会立即返回,-1 是永久阻塞).
• 如果函数调用成功,返回对应 I/O 上已准备好的文件描述符数目,如返回 0 表 示已超时, 返回小于 0 表示函数失败.
二.细节问题
epoll模型实际上就是三种东西,红黑树,就绪队列,回调机制。epoll_ctl实际上就是维护红黑树的,用户告诉内核,要求内核帮我去关心哪些fd。epoll_wait就是内核告诉用户,哪一个fd上面的某些事情已经就绪了。
2.1 工作原理
细节:epoll_ctl的作用:向红黑树中插入节点,向底层回调注册回调方法。
当某一进程调用 epoll_create 方法时,Linux 内核会创建一个 eventpoll 结构 体,这个结构体中有两个成员与 epoll 的使用方式密切相关.
struct eventpoll{..../*红黑树的根节点,这颗树中存储着所有添加到 epoll 中的需要监控的事件*/struct rb_root rbr;/*双链表中则存放着将要通过 epoll_wait 返回给用户的满足条件的事件*/struct list_head rdlist;....
};
• 每一个 epoll 对象都有一个独立的 eventpoll 结构体,用于存放通过 epoll_ctl 方 法向 epoll 对象中添加进来的事件.
• 这些事件都会挂载在红黑树中,如此,重复添加的事件就可以通过红黑树而高 效的识别出来(红黑树的插入时间效率是 lgn,其中 n 为树的高度).
• 而所有添加到 epoll 中的事件都会与设备(网卡)驱动程序建立回调关系,也就是 说,当响应的事件发生时会调用这个回调方法.
• 这个回调方法在内核中叫 ep_poll_callback,它会将发生的事件添加到 rdlist 双 链表中.
• 在 epoll 中,对于每一个事件,都会建立一个 epitem 结构体.
struct epitem{struct rb_node rbn;//红黑树节点struct list_head rdllink;//双向链表节点struct epoll_filefd ffd; //事件句柄信息struct eventpoll *ep; //指向其所属的 eventpoll 对象struct epoll_event event; //期待发生的事件类型
}
• 当调用 epoll_wait 检查是否有事件发生时,只需要检查 eventpoll 对象中的 rdlist 双链表中是否有 epitem 元素即可.(有事件就绪了,提前注册的回调机制会自动的把红黑树的节点添加到双链表中)
• 如果 rdlist 不为空,则把发生的事件复制到用户态,同时将事件数量返回给用 户. 这个操作的时间复杂度是 O(1).
2.2 epoll的demo
#pragma once#include <iostream>
#include <memory>
#include <unistd.h>
#include <cstring>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Common.hpp"using namespace SocketModule;class EpollServer
{const static int size = 64;const int defaultfd = -1;public:EpollServer(int port) : _listensock(std::make_unique<TcpSocket>()), _isruning(false), _epfd(defaultfd){// 1.创建listensocket_listensock->BuildTcpSocketMethod(port);// 2.创建epoll模型_epfd = epoll_create(256);if (_epfd < 0){LOG(LogLevel::FATAL) << "epoll_create error...";exit(EPOLL_CREATE_ERR);}LOG(LogLevel::FATAL) << "epoll_create success , epfd: " << _epfd;// 3.将listensocket设置到内核中struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = _listensock->Fd();int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, _listensock->Fd(), &ev);if (n < 0){LOG(LogLevel::FATAL) << "add listensocket failed";exit(EPOLL_CTL_ERR);}}void Start(){int timeout = -1;_isruning = true;while (true){int n = epoll_wait(_epfd, revs, size, timeout);switch (n){case 0:LOG(LogLevel::DEBUG) << "timeout...";break;case -1:LOG(LogLevel::ERROR) << "epoll error";break;default:Dispatcher(n);break;}}}void Dispatcher(int n){for (int i = 0; i < n; i++){if (revs[i].events & EPOLLIN){if (revs[i].data.fd == _listensock->Fd()){// 新链接到来Accepter();}else{Recver(i);}}}}void Accepter(){InetAddr client;int sockfd = _listensock->Accept(&client); // 这里一定不会阻塞,等和拷贝分离了if (sockfd >= 0){// 获取新链接成功LOG(LogLevel::INFO) << "get a new link , sockfd: " << sockfd << "client is: " << client.StringAddr();struct epoll_event ev;ev.events = EPOLLIN;ev.data.fd = sockfd;int n = epoll_ctl(_epfd, EPOLL_CTL_ADD, sockfd, &ev);if (n < 0){LOG(LogLevel::WARING) << "add socket failed";}else{LOG(LogLevel::INFO) << "add socket success";}}}void Recver(int pos){// recv的时候肯定也不会阻塞char buffer[1024];ssize_t n = recv(revs[pos].data.fd, buffer, sizeof(buffer) - 1, 0); // 这样写是有bug的,tcp是面向字节流的if (n > 0){buffer[n] = 0;std::cout << "client say& " << buffer << std::endl;}else if (n == 0){LOG(LogLevel::DEBUG) << "client quit";// 不让epoll关心这个fd了int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);if (n < 0){LOG(LogLevel::FATAL) << "del socket failed";exit(EPOLL_CTL_ERR);}close(revs[pos].data.fd); // 先移除,在关闭}else{LOG(LogLevel::ERROR) << "recv error";// 不让epoll关心这个fd了int n = epoll_ctl(_epfd, EPOLL_CTL_DEL, revs[pos].data.fd, nullptr);if (n < 0){LOG(LogLevel::FATAL) << "del socket failed";exit(EPOLL_CTL_ERR);}close(revs[pos].data.fd); // 先移除,在关闭}}~EpollServer(){_listensock->Close();if (_epfd > 0)close(_epfd);}private:std::unique_ptr<Socket> _listensock;bool _isruning;int _epfd;struct epoll_event revs[size];
};
2.3 epoll的优点
• 接口使用方便: 虽然拆分成了三个函数, 但是反而使用起来更方便高效. 不需要 每次循环都设置关注的文件描述符, 也做到了输入输出参数分离开
• 数据拷贝轻量: 只在合适的时候调用 EPOLL_CTL_ADD 将文件描述符结构拷贝到 内核中, 这个操作并不频繁(而 select/poll 都是每次循环都要进行拷贝)
• 事件回调机制: 避免使用遍历, 而是使用回调函数的方式, 将就绪的文件描述符 结构加入到就绪队列中, epoll_wait 返回直接访问就绪队列就知道哪些文件描述符就 绪. 这个操作时间复杂度 O(1). 即使文件描述符数目很多, 效率也不会受到影响.
• 没有数量限制: 文件描述符数目无上限.
三. LT 与 ET模式
理解ET
使用 ET 模式的 epoll, 需要将文件描述设置为非阻塞. 这个不是接口上的要求, 而是 "工 程实践" 上的要求. 假设这样的场景: 服务器接收到一个 10k 的请求, 会向客户端返回一个应答数据. 如果客 户端收不到应答, 不会发送第二个 10k 请求.
如果服务端写的代码是阻塞式的 read, 并且一次只 read 1k 数据的话(read 不能保证一 次就把所有的数据都读出来, 参考 man 手册的说明, 可能被信号打断), 剩下的 9k 数据 就会待在缓冲区中.
此时由于 epoll 是 ET 模式, 并不会认为文件描述符读就绪. epoll_wait 就不会再次返 回. 剩下的 9k 数据会一直在缓冲区中. 直到下一次客户端再给服务器写数据. epoll_wait 才能返回
但是问题来了.
• 服务器只读到 1k 个数据, 要 10k 读完才会给客户端返回响应数据.
• 客户端要读到服务器的响应, 才会发送下一个请求
• 客户端发送了下一个请求, epoll_wait 才会返回, 才能去读缓冲区中剩余的数 据.
所以, 为了解决上述问题(阻塞 read 不一定能一下把完整的请求读完), 于是就可以使用 非阻塞轮训的方式来读缓冲区, 保证一定能把完整的请求都读出来.而如果是 LT 没这个问题. 只要缓冲区中的数据没读完, 就能够让 epoll_wait 返回文件 描述符读就绪.
四. reactor
Reactor.hpp
#pragma once#include <iostream>
#include <memory>
#include <unordered_map>
#include "Epoller.hpp"
#include "Connection.hpp"class Reactor
{
private:static const int revs_num = 128;bool IsConnectionExists(std::shared_ptr<Connection> &conn){return IsConnectionExistsHelper(conn->GetSockFd());}bool IsConnectionExists(int sockfd){return IsConnectionExistsHelper(sockfd);}bool IsConnectionExistsHelper(int sockfd){auto iter = _connections.find(sockfd);if (iter == _connections.end()){return false;}return true;}bool IsConnectionEmpty(){return _connections.empty();}public:Reactor() : _epoll_ptr(std::make_unique<Epoller>()), _isruning(false){}void Start(){if (IsConnectionEmpty()){return;}_isruning = true;while (true){PrintConnection();int n = _epoll_ptr->Wait(_revs, revs_num, -1);for (int i = 0; i < n; i++){int sockfd = _revs[i].data.fd;uint32_t revents = _revs[i].events;//将所有的异常处理转换为IO错误if(revents & EPOLLERR)revents |= (EPOLLIN | EPOLLOUT);//只要是出错了就打开读写端if(revents & EPOLLHUP)revents |= (EPOLLIN | EPOLLOUT);if(revents & EPOLLIN){//不用区分异常了,因为统一处理//不用区分是listensocket还是普通事件就绪if(IsConnectionExists(sockfd))_connections[sockfd]->Recver();}if(revents & EPOLLOUT){if(IsConnectionExists(sockfd))_connections[sockfd]->Sender();}}}_isruning = false;}void AddNewConnection(std::shared_ptr<Connection> &conn){if (IsConnectionExists(conn)){LOG(LogLevel::WARING) << "conn is exists: " << conn->GetSockFd();return;}// 1.把conn对应的fd和他关心的事件写到内核uint32_t events = conn->GetEvent();int sockfd = conn->GetSockFd();_epoll_ptr->Add(sockfd, events);// *.设置回指指针conn->SetOwner(this);// 2.把connection对象添加到connections内部_connections[sockfd] = conn;}void EnableReadWrite(int sockfd,bool enableread,bool enablewrite){// 不要重复添加if (!IsConnectionExists(sockfd)){LOG(LogLevel::WARING) << "EnableReadWrite: conn is exists: " << sockfd;return;}// 修改当前的sockfd对应的connection关心的事件uint32_t events = (EPOLLET | (enableread ? EPOLLIN : 0) | (enablewrite ? EPOLLOUT : 0));_connections[sockfd]->SetEvent(events);//再去写透到内核中_epoll_ptr->Mod(sockfd,events);}void DelConnection(int sockfd){_epoll_ptr->Del(sockfd);_connections.erase(sockfd);close(sockfd);}void PrintConnection(){std::cout << "当前正在管理的fd:" << std::endl;for(auto &conn : _connections){std::cout << conn.second->GetSockFd() << " ";}std::cout << "\r\n";}~Reactor(){}private:// 1.epoll模型std::unique_ptr<Epoller> _epoll_ptr;// // 2.listensocket 单独封装管理// std::shared_ptr<Listener> _listener_ptr;// 3.每一个fd都需要一个单独的输入输出缓冲区,管理套接字std::unordered_map<int, std::shared_ptr<Connection>> _connections;// 4.就绪的所有事件struct epoll_event _revs[revs_num];bool _isruning;
};
Listener.hpp
#pragma once#include <iostream>
#include <sys/epoll.h>
#include "Socket.hpp"
#include "Channel.hpp"
#include "Connection.hpp"using namespace SocketModule;class Listener : public Connection
{
public:Listener(int port = 8080) : _port(port), _listensock(std::make_unique<TcpSocket>()){_listensock->BuildTcpSocketMethod(_port);SetEvent(EPOLLIN | EPOLLET);SetNonBlock(_listensock->Fd());}int GetSockFd(){return _listensock->Fd();}void Recver() override{InetAddr client;//虽然是新链接到来了,但是只有一个链接吗,//while,ET,设置fd为非阻塞while (true){int sockfd = _listensock->Accept(&client);if(sockfd == ACCEPT_ERR){break;}else if(sockfd == ACCEPT_CONTINUE){continue;}else if(sockfd == ACCEPT_DONE){break;}else{//是一个合法的fd,但是怎么去添加到_connections里?需要回调指针std::shared_ptr<Connection> conn = std::make_shared<Channel>(sockfd,client);conn->SetEvent(EPOLLIN | EPOLLET);if(_handler != nullptr)conn->RegisterHandler(_handler);GetOwner()->AddNewConnection(conn);}}}// std::string& Inbuffer() override// {}// std::string& AppendOutBuffer(std::string& out) override// {}void Sender() override{}void Excepter() override{}~Listener(){}private:int _port;std::unique_ptr<Socket> _listensock;
};
Channel.hpp
#pragma once#include <iostream>
#include <string>
#include <sys/types.h>
#include <sys/socket.h>
#include <functional>
#include "Common.hpp"
#include "Connection.hpp"
#include "InetAddr.hpp"#define SIZE 1024class Channel : public Connection
{
public:Channel(int sockfd,const InetAddr& client) : _sockfd(sockfd),_client_addr(client){SetNonBlock(sockfd);}int GetSockFd(){return _sockfd;}//保证把本轮数据读完 (while循环)//即便是读完了,怎么知道数据由完整的报文,如果是多个报文呢?(协议)void Recver() override{char buffer[SIZE];while(true){buffer[0] = 0;ssize_t n = recv(_sockfd,buffer,sizeof(buffer) - 1, 0);if(n > 0){buffer[n] = 0;_inbuffer += buffer;}else if(n == 0){Excepter();return;}else{if(errno == EAGAIN || errno == EWOULDBLOCK){break;//本轮数据读完了}else if(errno == EINTR){continue;}else{Excepter();return;}}}LOG(LogLevel::DEBUG) << _inbuffer;if(!_inbuffer.empty()){// _handler(std::shared_ptr<Connection>(this));_outbuffer += _handler(_inbuffer);}if(!_outbuffer.empty()){Sender();// GetOwner()->EnableReadWrite(_sockfd,true,true);}}// std::string& Inbuffer() override// {// return _inbuffer;// }// std::string& AppendOutBuffer(std::string& out) override// {// _outbuffer += out;// return _outbuffer;// }void Sender() override{while (true){ssize_t n = send(_sockfd,_outbuffer.c_str(),_outbuffer.size(),0);//非阻塞发if(n > 0){_outbuffer.erase(0,n);if(_outbuffer.empty())break;}else if(n == 0){break;}else{if(errno == EAGAIN || errno == EWOULDBLOCK){break;}else if(errno == EINTR){continue;}else{Excepter();return;}}}// 1.数据发送完毕// 2.发送条件不具备if(!_outbuffer.empty()){// 开启对写事件的关心GetOwner()->EnableReadWrite(_sockfd,true,true);}else{GetOwner()->EnableReadWrite(_sockfd,true,false);}}void Excepter() override{GetOwner()->DelConnection(_sockfd);}~Channel(){}private:int _sockfd;std::string _inbuffer;std::string _outbuffer;InetAddr _client_addr;// handler_t _handler;
};
Connection.hpp
#pragma once#include <iostream>
#include <string>
#include "InetAddr.hpp"class Reactor;
class Connection;using handler_t = std::function<std::string (std::string &)>;class Connection
{
public:Connection():_owner(nullptr),_events(0){}virtual void Recver() = 0;virtual void Sender() = 0;virtual void Excepter() = 0;virtual int GetSockFd() = 0;// virtual std::string& Inbuffer() = 0;// virtual std::string& AppendOutBuffer(std::string& out) = 0;void RegisterHandler(handler_t handler){_handler = handler;}void SetEvent(const uint32_t &events){_events = events;}uint32_t GetEvent(){return _events;}void SetOwner(Reactor *owner){_owner = owner;}Reactor *GetOwner(){return _owner;}~Connection(){}private:// 回指指针,用于listensocket添加普通套接字Reactor *_owner;// 关心事件uint32_t _events;
public:handler_t _handler;
};
Main.cc
#include <iostream>
#include "Log.hpp"
#include "Reactor.hpp"
#include "Listener.hpp"
#include "Protocol.hpp"
#include "NetCal.hpp"static void Usage(std::string proc)
{std::cerr << "Usage: " << proc << " port" << std::endl;
}int main(int argc, char *argv[])
{if (argc != 2){Usage(argv[0]);exit(USAGE_ERR);}Enable_Console_Log_Strtegy();int port = std::stoi(argv[1]);// 构建业务模块std::shared_ptr<Cal> cal = std::make_shared<Cal>();// 构建协议对象std::shared_ptr<Protocol> protocal = std::make_shared<Protocol>([&cal](Request& req) -> Response{return cal->Execute(req);});// 构建listener对象std::shared_ptr<Connection> conn = std::make_shared<Listener>(port);conn->RegisterHandler([&protocal](std::string& inbuffer)->std::string{std::string response_str;//可能不止一个报文while(true){std::string package;if(!protocal->Decode(inbuffer,&package))break;response_str += protocal->Execute(package);}return response_str;});std::unique_ptr<Reactor> tsvr = std::make_unique<Reactor>();tsvr->AddNewConnection(conn);tsvr->Start();return 0;
}