什么是高效的IO?
正常情况下,IO=等+拷贝
高效的IO=拷贝(即让IO尽量不等)
为什么我们平常玩电脑的时候,感觉不到等待的过程呢?
任何通信场景,IO通信场景,效率一定是有上限的. 花盆里,长不出参天大树。也就是说任何通信场景下的IO都有等待的过程,只是因为我们的本地主机硬件彼此之间都离得很近,等待的过程很短,所以我们感觉不到,你换做网络通信,通信双方离千里之外,你就很能感觉到这个等待的过程了。
如何提高IO的效率?
单位时间内,等待的比重越低,IO 效率越高!
提高IO的效率,其实就是要提高拷贝操作在IO操作中的比重,减少等待的比重!
五种IO模型
五种IO模型分别是哪五种?他们有啥区别?
- 阻塞式IO
- 非阻塞IO轮询
- 信号驱动式IO
- 多路转接/多路复用IO
- 异步IO
我们可以通过下面的例子去理解:
假如说在你老家有一个池塘,然后有很多人喜欢去这个池塘里面钓鱼。
在一个风和日丽的下午,有俩人在池边钓鱼,一个叫张三,一个叫李四。这哥俩的钓鱼装备都一样,但是他们钓鱼的方式有些许差别:张三在鱼上钩之前一直在观察鱼竿有没有动静,如果发现没有动静,那他也不干别的事情,就接着等,一直等到有动静为止。而李四并不是一直在观察鱼竿有没有动静,他只是定时的过来查看一下,如果检测到鱼竿没有动静。他就会去做别的事情,然后过一段时间再来看看鱼竿有没有动静。
过了一会儿又来了一个人叫王五,他走到池塘边放杆之后,在鱼竿的头部系了一个铃铛,然后把杆子往那儿一放,自己低头玩手机。每当铃铛响的时候,王五就起来收杆,收完杆之后把杆子一放,继续玩手机。
又过了一会儿,来了一个人叫赵六,这个人很有钱。他沿着池塘的边儿放了100根鱼竿儿,自己就围着池塘来回巡逻,巡逻的过程中一看见哪个鱼竿有鱼上钩了,他赶紧就去收。
最后一个赶来的人叫田七,这个人是个大老板,平时事务非常繁忙,但是就喜欢钓鱼,来到这里之后还没钓一会儿来,这时候突然公司里有急事儿,钓不了鱼了。他就吩咐他的司机小王说,小王你给我钓一下。今天下午你钓完10条鱼以后,给我打个电话。我过来开始接你回去,完不成任务,你就一直搁这给我调。
在上面的例子中,你如果将钓鱼这件事情理解成IO。将钓鱼的人理解成计算机中的进程,那些鱼竿儿理解成Io的目标文件(文件描述符)。就可以比较好的理解。五种不同的io方式之间的区别。其实我们可以看到这5个人的做法应该是一个比一个高效的,因为他们从上往下等待的时间越来越少。钓鱼过程中自己腾出来的时间越来越多
问题1:阻塞式IO 与 非阻塞式IO的区别在哪里?(张三和李四的做法有什么区别?)
张三是在鱼上钩之前一直在观察鱼竿有没有动静,如果发现没有动静,那他就接着等,一直等到有动静为止。而李四并不是一直在观察鱼竿有没有动静,他只是定时的过来查看一下,如果检测到鱼竿没有动静。他就会去做别的事情,然后过一段时间再来看看鱼竿有没有动静。
这俩人的做法的核心区别在于,当他们检测到鱼竿没有动静的时候,他们的处理方式是不一样的,张三的处理方式是没有等到我接着等。而李四的处理方式是没有等到,我就去干别的事儿,过一会儿我再来看。
阻塞式IO与非阻塞式IO的核心区别也在于此,阻塞式IO如果没等到,就会一直在等,而非阻塞式IO如果没等到,他不会一直等,而是回去干别的事情,过一段时间之后再来检测。
但是值得注意的是,张三和李四在鱼没有上钩之前干了什么,鱼一旦上钩,他们干的事情都是一样的,也就是说——阻塞式IO 与 非阻塞式IO 拷贝操作的效率没有任何区别!
问题2:如何理解 非阻塞式IO 的效率比 阻塞式IO 要高 ?
我们经常会听到一种说法叫做:非阻塞式IO 的效率比 阻塞式IO 要高。这个效率应该如何理解?是不是意味着相同的时间内李四钓的鱼比张三钓的鱼多呢?
其实并不是,这两个人钓到鱼的多少取决于池塘里边的鱼咬谁的钩咬的多。但是在池塘里的鱼看来,我又不知道张三和李四在我没咬钩的时候在干什么?我看到的水底下的两个钩子是差不多的。那我咬他们两个钩的概率就是一样的。那按照这个道理。同样的时间内李四钓到的鱼应该和张三钓的鱼一样多。
也就是说对于同一个IO任务来说,计算机无论是采用 非阻塞式IO 的策略,还是 阻塞式IO 的策略,可能处理这一个任务的时间都是相同的
既然如此,那为什么我们还说非阻塞式IO 的效率比 阻塞式IO 要高呢?我们可以从下面两种角度去理解。
(1)计算机除了要处理io,还要进行很多其他的操作。在相同的时间内,计算机采用非阻塞式IO 的方案进行处理,完成的总工作量要比采用阻塞式IO要多。
李是在下午钓鱼的这段时间内。不仅钓上鱼他还看了很多小说,刷了很多视频。是张三在这个下午完成的工作仅仅是钓了这么多鱼。因此李四完成的总任务比张还要多,因此我们说李四的效率比张三高。
(2)采用非阻塞式IO ,在没有等到IO事件之前。计算机可以去做别的事情,这个别的事情也可以是其他类型的IO,这样采用非阻塞式IO,计算机处理的全部IO工作量就比阻塞式IO多了
用我们的例子去理解,“非阻塞IO效率高” 他的意思是—— 李四可以在同样的时间,做更多的其他事情!
问题3:上面的五种IO,谁的效率是最高的? (这一下午这5个人谁钓的鱼最多?)
赵六!!下午这5个人全部加入之后,池塘里边一共有104根儿鱼竿儿。其中100根鱼竿儿都是赵六的!除了赵六之外,其他的所有人钓鱼都只用一根鱼竿。我们假设这池子里面的鱼咬每一根鱼竿儿的概率是一样的,那傻子都知道这一下午肯定是赵六钓的鱼是最多的。
那么在计算机中,多路转接/多路复用IO的效率就是最高的,也就是说在相同的时间内,采用这种方式处理的IO工作量是最多的。
赵六, 任意一个鱼竿(fd), 鱼(数据)就绪的概率很高 IO = 等+拷贝
一个人, 检测多个鱼竿, 降低了等的比重
问题3:信号驱动式IO最大的特点是什么?效率咋样?(王五的做法和前面俩人的做法最本质的区别在哪里?)
张三就是一直在主动的检测这个鱼有没有上钩。李四虽然经常去干别的事情,但他也会定期去看看这个鱼竿。有没有动静,他心里至少还挂念着这个鱼竿。但是王五他是真一点儿都不挂念,全程低头玩手机,如果不是有这个铃铛叫,他是绝对不会主动抬头的。
我们前面讲IO的工作分成等待和拷贝两个部分。
在信号驱动式IO中,拷贝工作的开始是由信号触发的,也就是说你不给我发信号,我永远都不会开始拷贝。
问题4:这五种IO方式中,最后一种叫做异步IO,那有没有同步IO呢?同步IO的定义是啥?信号产生不是异步的嘛?为什么信号驱动式IO属于同步IO呢?
我们说的五种IO方式中,前面四种都是同步IO。
结合我们前面举的例子,张三李四王五赵六这4个人。他们虽然在鱼上钩之前,等待的方式不一样。但是当鱼上钩的时候,他们都会亲手握住鱼竿儿把鱼拉上来。
对应在计算机中,虽然前四种IO方式的策略不同,但是当等待的IO事件发生的时候,都是由等待这个IO的进程亲自去处理这个IO。而异步io指的是?我专门儿创建一个进程去处理这个IO,我后边儿就一点儿都不问了。当IO完成的时候,让那个进程自动把我要的东西给我。
问题5:同步IO和异步IO的区别是什么?(田七和前面四个钓鱼佬的区别是什么?)
结合我们刚刚说的例子。那4个人你别管他们等待的方式是什么的,但是他们都是在那鱼塘边儿待了一下午,都是亲自拉杆儿把鱼钓上来的。而田七调到一半他就跑了,就去当甩手掌柜去了。这个就很像我们现实生活中黑心煤矿的老板。你别看他的公司干的是煤矿,但是这个公司的中高层有百分之八九十都没下过矿,他们都不知道这个煤是怎么挖的,他们也不知道煤矿有多危险,他们只知道工人冒着生命危险提取出来的煤矿,可以卖掉赚大钱。领导就是只负责给下属提要求,他告诉下属,我不管你们是怎么实现的,反正我只负责验收,你们看着办吧。
我们前面说,IO=等+拷贝
只要你参与了IO的过程(可能你只参与了等的过程,可能你只参与了拷贝的过程,这都算参与IO的过程),就是同步IO。像煤老板那种连煤矿都没下过的,全程不参与IO过程的就是异步IO。
五种IO模型的实现方式
我们以UDP通信中,用户调用recvfrom接收数据的场景为例,说明五种IO模型的不同实现方式
阻塞式IO
这个最简单,只需要正常调用recvfrom就行了(因为创建套接字时默认就是阻塞方式),阻塞式IO的基本过程如下
下面的通过socket网络编程实现简单阻塞式IO的代码
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <unistd.h>int main() {// 1. 创建 UDP Socket(默认即为阻塞模式)int sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0) {perror("socket 创建失败");return -1;}// 2. 绑定地址和端口(作为服务器必须绑定)struct sockaddr_in local_addr;memset(&local_addr, 0, sizeof(local_addr));local_addr.sin_family = AF_INET; // IPv4local_addr.sin_addr.s_addr = INADDR_ANY; // 监听所有网卡local_addr.sin_port = htons(8888); // 端口 8888if (bind(sockfd, (struct sockaddr*)&local_addr, sizeof(local_addr)) < 0) {perror("绑定端口失败");close(sockfd);return -1;}printf("服务器启动,等待数据(阻塞模式)...\n");// 3. 阻塞式接收数据(关键:未收到数据会一直卡在这里)char buf[1024] = {0};struct sockaddr_in client_addr; // 存储客户端地址socklen_t client_len = sizeof(client_addr);// 调用 recvfrom:若没有数据,进程会进入休眠状态(阻塞)ssize_t recv_len = recvfrom(sockfd, buf, sizeof(buf) - 1, // 留一个位置给字符串结束符0, (struct sockaddr*)&client_addr, &client_len);if (recv_len < 0) {perror("接收数据失败");close(sockfd);return -1;}// 输出收到的数据和客户端信息buf[recv_len] = '\0'; // 手动添加字符串结束符printf("收到来自 %s:%d 的数据:%s\n",inet_ntoa(client_addr.sin_addr), // 客户端 IPntohs(client_addr.sin_port), // 客户端端口buf); // 数据内容close(sockfd);return 0;
}
非阻塞式IO
非阻塞IO的处理过程
具体实现原理也很简单,只需要在创建套接字时将其设置为非阻塞模式即可
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/types.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>int main() {// 1. 创建 UDP Socketint sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0) {perror("socket failed");return -1;}// 2. 设置为非阻塞模式int flags = fcntl(sockfd, F_GETFL, 0);fcntl(sockfd, F_SETFL, flags | O_NONBLOCK);// 3. 绑定地址(可选,若作为服务端需要绑定)struct sockaddr_in local_addr;memset(&local_addr, 0, sizeof(local_addr));local_addr.sin_family = AF_INET;local_addr.sin_addr.s_addr = INADDR_ANY;local_addr.sin_port = htons(8888);if (bind(sockfd, (struct sockaddr*)&local_addr, sizeof(local_addr)) < 0) {perror("bind failed");close(sockfd);return -1;}// 4. 模拟非阻塞 recvfromchar buf[1024] = {0};struct sockaddr_in peer_addr;socklen_t peer_len = sizeof(peer_addr);// 首次调用:数据未准备好时直接返回错误ssize_t ret = recvfrom(sockfd, buf, sizeof(buf), 0, (struct sockaddr*)&peer_addr, &peer_len);if (ret < 0) {// 非阻塞特有错误码:EWOULDBLOCK/EAGAINif (errno == EWOULDBLOCK || errno == EAGAIN) {printf("数据未准备好,非阻塞直接返回\n");} else {perror("recvfrom error");}}// 5. 模拟「重试」或结合多路复用(如 select/poll/epoll)// 这里简化为休眠 2 秒,假设期间有数据到达sleep(2);// 再次调用 recvfrom(假设此时数据已准备好)ret = recvfrom(sockfd, buf, sizeof(buf), 0, (struct sockaddr*)&peer_addr, &peer_len);if (ret > 0) {printf("收到数据:%s (来自 %s:%d)\n", buf, inet_ntoa(peer_addr.sin_addr), ntohs(peer_addr.sin_port));} else {perror("再次 recvfrom 失败");}close(sockfd);return 0;
}
信号驱动式IO
下面是信号驱动式IO的流程图
他具体实现起来的思想也很简单,由于在这种方式中IO事件是靠信号递达的,我们就在信号处理函数handle中调用recvfrom进行数据拷贝就行
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <signal.h>
#include <sys/socket.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <errno.h>int sockfd; // 全局套接字描述符,供信号处理函数使用
struct sockaddr_in client_addr;
socklen_t client_len;// 信号处理函数:当内核通知IO事件就绪时被调用
void sigio_handler(int signo) {char buf[1024] = {0};ssize_t recv_len;// 读取数据(此时数据已就绪,不会阻塞)recv_len = recvfrom(sockfd, buf, sizeof(buf)-1, 0,(struct sockaddr*)&client_addr, &client_len);if (recv_len < 0) {perror("recvfrom failed");return;}buf[recv_len] = '\0';printf("收到来自 %s:%d 的数据: %s\n",inet_ntoa(client_addr.sin_addr),ntohs(client_addr.sin_port),buf);// 简单回复客户端const char* reply = "已收到数据";sendto(sockfd, reply, strlen(reply), 0,(struct sockaddr*)&client_addr, client_len);
}int main() {struct sockaddr_in server_addr;struct sigaction sa;// 1. 创建UDP套接字sockfd = socket(AF_INET, SOCK_DGRAM, 0);if (sockfd < 0) {perror("socket creation failed");exit(EXIT_FAILURE);}// 2. 绑定服务器地址和端口memset(&server_addr, 0, sizeof(server_addr));server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(8888);if (bind(sockfd, (struct sockaddr*)&server_addr, sizeof(server_addr)) < 0) {perror("bind failed");close(sockfd);exit(EXIT_FAILURE);}// 3. 设置信号处理函数(捕获SIGIO信号)sa.sa_handler = sigio_handler;sigemptyset(&sa.sa_mask);sa.sa_flags = 0;if (sigaction(SIGIO, &sa, NULL) < 0) {perror("sigaction failed");close(sockfd);exit(EXIT_FAILURE);}// 4. 设置套接字属主,让内核知道该向哪个进程发送SIGIO信号if (fcntl(sockfd, F_SETOWN, getpid()) < 0) {perror("fcntl F_SETOWN failed");close(sockfd);exit(EXIT_FAILURE);}// 5. 启用信号驱动式IO(设置O_ASYNC标志)int flags = fcntl(sockfd, F_GETFL, 0);if (fcntl(sockfd, F_SETFL, flags | O_ASYNC) < 0) {perror("fcntl F_SETFL O_ASYNC failed");close(sockfd);exit(EXIT_FAILURE);}printf("信号驱动式IO服务器启动,端口 8888...\n");printf("等待数据中(主线程可执行其他任务)...\n");// 6. 主线程可以执行其他任务,无需阻塞等待IOwhile (1) {// 模拟主线程处理其他业务sleep(1);// printf("主线程正在执行其他任务...\n");}close(sockfd);return 0;
}
IO多路转接
操作系统给我们提供了专门的接口用于实现IO多路转接,我们只需要学会如何使用就行了
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <fcntl.h>#define MAX_EVENTS 100
#define BUFFER_SIZE 1024// 设置套接字为非阻塞模式
void set_nonblocking(int fd) {int flags = fcntl(fd, F_GETFL, 0);if (flags == -1) {perror("fcntl F_GETFL");exit(EXIT_FAILURE);}if (fcntl(fd, F_SETFL, flags | O_NONBLOCK) == -1) {perror("fcntl F_SETFL");exit(EXIT_FAILURE);}
}int main() {// 创建监听套接字int listen_fd = socket(AF_INET, SOCK_STREAM, 0);if (listen_fd == -1) {perror("socket");exit(EXIT_FAILURE);}// 绑定地址和端口struct sockaddr_in server_addr;server_addr.sin_family = AF_INET;server_addr.sin_addr.s_addr = INADDR_ANY;server_addr.sin_port = htons(8888);if (bind(listen_fd, (struct sockaddr *)&server_addr, sizeof(server_addr)) == -1) {perror("bind");close(listen_fd);exit(EXIT_FAILURE);}// 开始监听if (listen(listen_fd, 5) == -1) {perror("listen");close(listen_fd);exit(EXIT_FAILURE);}// 创建 epoll 实例int epoll_fd = epoll_create1(0);if (epoll_fd == -1) {perror("epoll_create1");close(listen_fd);exit(EXIT_FAILURE);}// 注册监听套接字到 epollstruct epoll_event event;event.events = EPOLLIN;event.data.fd = listen_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {perror("epoll_ctl add listen_fd");close(listen_fd);close(epoll_fd);exit(EXIT_FAILURE);}struct epoll_event events[MAX_EVENTS];while (1) {// 等待事件发生,最多等待 MAX_EVENTS 个事件int num_events = epoll_wait(epoll_fd, events, MAX_EVENTS, -1);if (num_events == -1) {perror("epoll_wait");break;}for (int i = 0; i < num_events; i++) {if (events[i].data.fd == listen_fd) {// 有新的客户端连接请求struct sockaddr_in client_addr;socklen_t client_addr_len = sizeof(client_addr);int client_fd = accept(listen_fd, (struct sockaddr *)&client_addr, &client_addr_len);if (client_fd == -1) {perror("accept");continue;}// 设置客户端套接字为非阻塞模式set_nonblocking(client_fd);// 注册客户端套接字到 epollevent.events = EPOLLIN;event.data.fd = client_fd;if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, client_fd, &event) == -1) {perror("epoll_ctl add client_fd");close(client_fd);}printf("新客户端连接: %s:%d\n", inet_ntoa(client_addr.sin_addr), ntohs(client_addr.sin_port));} else if (events[i].events & EPOLLIN) {// 客户端有数据可读int client_fd = events[i].data.fd;char buffer[BUFFER_SIZE];ssize_t bytes_read = recv(client_fd, buffer, sizeof(buffer) - 1, 0);if (bytes_read == -1) {perror("recv");close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);} else if (bytes_read == 0) {// 客户端关闭连接printf("客户端断开连接: %d\n", client_fd);close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);} else {buffer[bytes_read] = '\0';printf("收到客户端 %d 数据: %s\n", client_fd, buffer);// 简单回显数据给客户端if (send(client_fd, buffer, bytes_read, 0) == -1) {perror("send");close(client_fd);epoll_ctl(epoll_fd, EPOLL_CTL_DEL, client_fd, NULL);}}}}}close(listen_fd);close(epoll_fd);return 0;
}
异步IO
简单来说就是,用户进程将这个IO的任务交给内核,内核把数据拷贝完成之后,再通知应用程序(在信号驱动式IO中,内核通过信号告知应用程序何时可以开始拷贝数据,拷贝数据这活还是得用户进程自己来)
实现代码
#include <iostream>
#include <fcntl.h>
#include <aio.h>
#include <signal.h>
#include <unistd.h>
#include <cstring>
#include <errno.h>// 缓冲区大小
#define BUFFER_SIZE 1024// 异步IO操作的控制块
struct aiocb aio_cb;// 信号处理函数:当异步IO完成时被调用
void aio_completion_handler(int signo, siginfo_t* info, void* context) {if (info->si_signo == SIGIO) {// 检查异步操作是否成功完成if (aio_error(&aio_cb) == 0) {// 获取实际读取的字节数ssize_t bytes_read = aio_return(&aio_cb);if (bytes_read > 0) {std::cout << "异步读取完成,读取了 " << bytes_read << " 字节: " << std::endl;std::cout << static_cast<char*>(aio_cb.aio_buf) << std::endl;} else if (bytes_read == 0) {std::cout << "已到达文件末尾" << std::endl;}} else {std::cerr << "异步读取失败: " << strerror(aio_error(&aio_cb)) << std::endl;}}
}int main(int argc, char* argv[]) {if (argc != 2) {std::cerr << "用法: " << argv[0] << " <文件名>" << std::endl;return 1;}const char* filename = argv[1];// 1. 打开文件(同步操作)int fd = open(filename, O_RDONLY);if (fd == -1) {std::cerr << "打开文件失败: " << strerror(errno) << std::endl;return 1;}// 2. 初始化异步IO控制块memset(&aio_cb, 0, sizeof(struct aiocb));// 分配缓冲区char* buffer = new char[BUFFER_SIZE];aio_cb.aio_buf = buffer;aio_cb.aio_nbytes = BUFFER_SIZE - 1; // 留一个字节给终止符aio_cb.aio_fildes = fd; // 文件描述符aio_cb.aio_offset = 0; // 读取起始位置// 3. 设置信号处理:当异步IO完成时接收SIGIO信号struct sigaction sa;memset(&sa, 0, sizeof(struct sigaction));sa.sa_sigaction = aio_completion_handler; // 信号处理函数sa.sa_flags = SA_SIGINFO; // 使用sigaction风格的处理函数if (sigaction(SIGIO, &sa, NULL) == -1) {std::cerr << "设置信号处理失败: " << strerror(errno) << std::endl;close(fd);delete[] buffer;return 1;}// 4. 设置文件描述符的所有者,让内核知道向哪个进程发送信号if (fcntl(fd, F_SETOWN, getpid()) == -1) {std::cerr << "设置文件所有者失败: " << strerror(errno) << std::endl;close(fd);delete[] buffer;return 1;}// 5. 启动异步读取操作if (aio_read(&aio_cb) == -1) {std::cerr << "启动异步读取失败: " << strerror(errno) << std::endl;close(fd);delete[] buffer;return 1;}std::cout << "异步读取已启动,主线程可以执行其他任务..." << std::endl;// 6. 主线程执行其他任务(模拟)for (int i = 0; i < 5; ++i) {std::cout << "主线程正在执行任务 " << i + 1 << std::endl;sleep(1); // 模拟耗时操作}// 7. 清理资源close(fd);delete[] buffer;return 0;
}