I/O多路复用(multiplexing)的本质是通过一种机制(系统内核缓冲I/O数据),让单个进程可以监视多个文件描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作,下面我们就来说一说关于io流需要掌握什么?我们一起去了解并探讨一下这个问题吧!

io流需要掌握什么(IO多路复用三种机制Select)

io流需要掌握什么

1. I/O多路复用

I/O多路复用(multiplexing)的本质是通过一种机制(系统内核缓冲I/O数据),让单个进程可以监视多个文件描述符,一旦某个描述符就绪(一般是读就绪或写就绪),能够通知程序进行相应的读写操作

select、poll 和 epoll 都是 Linux API 提供的 IO 复用方式。

相信大家都了解了Unix五种IO模型,不了解的可以私信作者

1) blocking IO - 阻塞IO

2) nonblocking IO - 非阻塞IO

3) IO multiplexing - IO多路复用

4) signal driven IO - 信号驱动IO

5) asynchronous IO - 异步IO

其中前面4种IO都可以归类为synchronous IO - 同步IO,而select、poll、epoll本质上也都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的。

与多进程和多线程技术相比,I/O多路复用技术的最大优势是系统开销小,系统不必创建进程/线程,也不必维护这些进程/线程,从而大大减小了系统的开销。

在介绍select、poll、epoll之前,首先介绍一下Linux操作系统中基础的概念:

1.1 用户空间 / 内核空间

现在操作系统都是采用虚拟存储器,那么对32位操作系统而言,它的寻址空间(虚拟存储空间)为4G(2的32次方)。

操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也有访问底层硬件设备的所有权限。为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分,一部分为内核空间,一部分为用户空间。

1.2 进程切换

为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的,并且进程切换是非常耗费资源的。

1.3 进程阻塞

正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得了CPU资源),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。

1.4 文件描述符

文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。

文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。

1.5 缓存I/O

缓存I/O又称为标准I/O,大多数文件系统的默认I/O操作都是缓存I/O。在Linux的缓存I/O机制中,操作系统会将I/O的数据缓存在文件系统的页缓存中,即数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。

2. Select2.1 Select函数

intselect(intmaxfdp1,fd_set*readset,fd_set*writeset,fd_set*exceptset,conststructtimeval*timeout);

【参数说明】

int maxfdp1 指定待测试的文件描述字个数,它的值是待测试的最大描述字加1。

fd_set *readset , fd_set *writeset , fd_set *exceptset

fd_set可以理解为一个集合,这个集合中存放的是文件描述符(file descriptor),即文件句柄。中间的三个参数指定我们要让内核测试读、写和异常条件的文件描述符集合。如果对某一个的条件不感兴趣,就可以把它设为空指针。

const struct timeval *timeout timeout告知内核等待所指定文件描述符集合中的任何一个就绪可花多少时间。其timeval结构用于指定这段时间的秒数和微秒数。

【返回值】

int 若有就绪描述符返回其数目,若超时则为0,若出错则为-1

2.2 select运行机制

select()的机制中提供一种fd_set的数据结构,实际上是一个long类型的数组,每一个数组元素都能与已打开的文件句柄(不管是Socket句柄,还是其他文件或命名管道或设备句柄)建立联系,建立联系的工作由程序员完成,当调用select()时,由内核根据IO状态修改fd_set的内容,由此来通知执行了select()的进程哪一Socket或文件可读。

从流程上来看,使用select函数进行IO请求和同步阻塞模型没有太大的区别,甚至还多了添加监视socket,以及调用select函数的额外操作,效率更差。但是,使用select以后最大的优势是用户可以在一个线程内同时处理多个socket的IO请求。用户可以注册多个socket,然后不断地调用select读取被激活的socket,即可达到在同一个线程内同时处理多个IO请求的目的。而在同步阻塞模型中,必须通过多线程的方式才能达到这个目的。

2.3 select机制的问题

1) 每次调用select,都需要把fd_set集合从用户态拷贝到内核态,如果fd_set集合很大时,那这个开销也很大

2) 同时每次调用select都需要在内核遍历传递进来的所有fd_set,如果fd_set集合很大时,那这个开销也很大

3) 为了减少数据拷贝带来的性能损坏,内核对被监控的fd_set集合大小做了限制,并且这个是通过宏控制的,大小不可改变(限制为1024)

3. Poll3.1 Poll函数

poll的机制与select类似,与select在本质上没有多大差别,管理多个描述符也是进行轮询,根据描述符的状态进行处理,但是poll没有最大文件描述符数量的限制。也就是说,poll只解决了上面的问题3,并没有解决问题1,2的性能开销问题。

下面是poll的函数原型:

intpoll(structpollfd*fds,nfds_tnfds,inttimeout); typedefstructpollfd{ intfd;//需要被检测或选择的文件描述符 shortevents;//对文件描述符fd上感兴趣的事件 shortrevents;//文件描述符fd上当前实际发生的事件 }pollfd_t;

poll改变了文件描述符集合的描述方式,使用了pollfd结构而不是select的fd_set结构,使得poll支持的文件描述符集合限制远大于select的1024

【参数说明】

struct pollfd *fds fds是一个struct pollfd类型的数组,用于存放需要检测其状态的socket描述符,并且调用poll函数之后fds数组不会被清空;一个pollfd结构体表示一个被监视的文件描述符,通过传递fds指示 poll() 监视多个文件描述符。其中,结构体的events域是监视该文件描述符的事件掩码,由用户来设置这个域,结构体的revents域是文件描述符的操作结果事件掩码,内核在调用返回时设置这个域

nfds_t nfds 记录数组fds中描述符的总数量

【返回值】

int 函数返回fds集合中就绪的读、写,或出错的描述符数量,返回0表示超时,返回-1表示出错;

4. Epoll

epoll在Linux2.6内核正式提出,是基于事件驱动的I/O方式,相对于select来说,epoll没有描述符个数限制,使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,这样在用户空间和内核空间的copy只需一次。

4.1 Epoll函数

Linux中提供的epoll相关函数如下:

intepoll_create(intsize); intepoll_ctl(intepfd,intop,intfd,structepoll_event*event); intepoll_wait(intepfd,structepoll_event*events,intmaxevents,inttimeout);

4.1.1 epoll_create函数

epoll_create 函数创建一个epoll句柄,参数size表明内核要监听的描述符数量。调用成功时返回一个epoll句柄描述符,失败时返回-1。

4.1.2 epoll_ctl函数

epoll_ctl 函数注册要监听的事件类型。四个参数解释如下:

1) epfd 表示epoll句柄

2) op 表示fd操作类型,有如下3种

EPOLL_CTL_ADD 注册新的fd到epfd中

EPOLL_CTL_MOD 修改已注册的fd的监听事件

EPOLL_CTL_DEL 从epfd中删除一个fd

3) fd 是要监听的描述符

4) event 表示要监听的事件

epoll_event 结构体定义如下:

structepoll_event{ __uint32_tevents;/*Epollevents*/ epoll_data_tdata;/*Userdatavariable*/ }; typedefunionepoll_data{ void*ptr; intfd; __uint32_tu32; __uint64_tu64; }epoll_data_t;

4.1.3 epoll_wait函数

epoll_wait 函数等待事件的就绪,成功时返回就绪的事件数目,调用失败时返回 -1,等待超时返回 0。

1) epfd 是epoll句柄

2) events 表示从内核得到的就绪事件集合

3) maxevents 告诉内核events的大小

4) timeout 表示等待的超时事件

epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。

epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。

水平触发(LT):

默认工作模式,即当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序可以不立即处理该事件;下次调用epoll_wait时,会再次通知此事件

边缘触发(ET):

当epoll_wait检测到某描述符事件就绪并通知应用程序时,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次通知此事件。(直到你做了某些操作导致该描述符变成未就绪状态了,也就是说边缘触发只在状态由未就绪变为就绪时只通知一次)。

LT和ET原本应该是用于脉冲信号的,可能用它来解释更加形象。Level和Edge指的就是触发点,Level为只要处于水平,那么就一直触发,而Edge则为上升沿和下降沿的时候触发。比如:0->1 就是Edge,1->1 就是Level。

ET模式很大程度上减少了epoll事件的触发次数,因此效率比LT模式下高。

5. 总结5.1 select,poll,epoll的区别

select

poll

epoll

操作方式

遍历

遍历

回调

底层实现

数组

链表

红黑树

IO效率

每次调用都进行线性遍历,时间复杂度为O(n)

每次调用都进行线性遍历,时间复杂度为O(n)

事件通知方式,每当fd就绪,系统注册的回调函数就会被调用,将就绪fd放到readyList里面,时间复杂度O(1)

最大连接数

1024(x86)或2048(x64)

无上限

无上限

fd拷贝

每次调用select,都需要把fd集合从用户态拷贝到内核态

每次调用poll,都需要把fd集合从用户态拷贝到内核态

调用epoll_ctl时拷贝进内核并保存,之后每次epoll_wait不拷贝

epoll是Linux目前大规模网络并发程序开发的首选模型。在绝大多数情况下性能远超select和poll。目前流行的高性能web服务器Nginx正式依赖于epoll提供的高效网络套接字轮询服务。但是,在并发连接不高的情况下,多线程 阻塞I/O方式可能性能更好。

既然select,poll,epoll都是I/O多路复用的具体的实现,之所以现在同时存在,其实他们也是不同历史时期的产物

6. 案例6.1 服务器代码

#include<stdio.h> #include<stdlib.h> #include<string.h> #include<errno.h> #include<netinet/in.h> #include<sys/socket.h> #include<arpa/inet.h> #include<sys/epoll.h> #include<unistd.h> #include<sys/types.h> #defineIPADDRESS"127.0.0.1" #definePORT8787 #defineMAXSIZE1024 #defineLISTENQ5 #defineFDSIZE1000 #defineEPOLLEVENTS100 //函数声明 //创建套接字并进行绑定 staticintsocket_bind(constchar*ip,intport); //IO多路复用epoll staticvoiddo_epoll(intlistenfd); //事件处理函数 staticvoid handle_events(intepollfd,structepoll_event*events,intnum,intlistenfd,char*buf); //处理接收到的连接 staticvoidhandle_accpet(intepollfd,intlistenfd); //读处理 staticvoiddo_read(intepollfd,intfd,char*buf); //写处理 staticvoiddo_write(intepollfd,intfd,char*buf); //添加事件 staticvoidadd_event(intepollfd,intfd,intstate); //修改事件 staticvoidmodify_event(intepollfd,intfd,intstate); //删除事件 staticvoiddelete_event(intepollfd,intfd,intstate); intmain(intargc,char*argv[]) { intlistenfd; listenfd=socket_bind(IPADDRESS,PORT); listen(listenfd,LISTENQ); do_epoll(listenfd); return0; } staticintsocket_bind(constchar*ip,intport) { intlistenfd; structsockaddr_inservaddr; listenfd=socket(AF_INET,SOCK_STREAM,0); if(listenfd==-1) { perror("socketerror:"); exit(1); } bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; inet_pton(AF_INET,ip,&servaddr.sin_addr); servaddr.sin_port=htons(port); if(bind(listenfd,(structsockaddr*)&servaddr,sizeof(servaddr))==-1) { perror("binderror:"); exit(1); } returnlistenfd; } staticvoiddo_epoll(intlistenfd) { intepollfd; structepoll_eventevents[EPOLLEVENTS]; intret; charbuf[MAXSIZE]; memset(buf,0,MAXSIZE); //创建一个描述符 epollfd=epoll_create(FDSIZE); //添加监听描述符事件 add_event(epollfd,listenfd,EPOLLIN); for(;;) { //获取已经准备好的描述符事件 ret=epoll_wait(epollfd,events,EPOLLEVENTS,-1); handle_events(epollfd,events,ret,listenfd,buf); } close(epollfd); } staticvoid handle_events(intepollfd,structepoll_event*events,intnum,intlistenfd,char*buf) { inti; intfd; //进行选好遍历 for(i=0;i<num;i ) { fd=events[i].data.fd; //根据描述符的类型和事件类型进行处理 if((fd==listenfd)&&(events[i].events&EPOLLIN)) handle_accpet(epollfd,listenfd); elseif(events[i].events&EPOLLIN) do_read(epollfd,fd,buf); elseif(events[i].events&EPOLLOUT) do_write(epollfd,fd,buf); } } staticvoidhandle_accpet(intepollfd,intlistenfd) { intclifd; structsockaddr_incliaddr; socklen_tcliaddrlen; clifd=accept(listenfd,(structsockaddr*)&cliaddr,&cliaddrlen); if(clifd==-1) perror("accpeterror:"); else { printf("acceptanewclient:%s:%d\n",inet_ntoa(cliaddr.sin_addr),cliaddr.sin_port); //添加一个客户描述符和事件 add_event(epollfd,clifd,EPOLLIN); } } staticvoiddo_read(intepollfd,intfd,char*buf) { intnread; nread=read(fd,buf,MAXSIZE); if(nread==-1) { perror("readerror:"); close(fd); delete_event(epollfd,fd,EPOLLIN); } elseif(nread==0) { fprintf(stderr,"clientclose.\n"); close(fd); delete_event(epollfd,fd,EPOLLIN); } else { printf("readmessageis:%s",buf); //修改描述符对应的事件,由读改为写 modify_event(epollfd,fd,EPOLLOUT); } } staticvoiddo_write(intepollfd,intfd,char*buf) { intnwrite; nwrite=write(fd,buf,strlen(buf)); if(nwrite==-1) { perror("writeerror:"); close(fd); delete_event(epollfd,fd,EPOLLOUT); } else modify_event(epollfd,fd,EPOLLIN); memset(buf,0,MAXSIZE); } staticvoidadd_event(intepollfd,intfd,intstate) { structepoll_eventev; ev.events=state; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev); } staticvoiddelete_event(intepollfd,intfd,intstate) { structepoll_eventev; ev.events=state; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev); } staticvoidmodify_event(intepollfd,intfd,intstate) { structepoll_eventev; ev.events=state; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev); }

6.2 客户端代码

客户端也用epoll实现,控制STDIN_FILENO、STDOUT_FILENO、和sockfd三个描述符

#include<netinet/in.h> #include<sys/socket.h> #include<stdio.h> #include<string.h> #include<stdlib.h> #include<sys/epoll.h> #include<time.h> #include<unistd.h> #include<sys/types.h> #include<arpa/inet.h> #defineMAXSIZE1024 #defineIPADDRESS"127.0.0.1" #defineSERV_PORT8787 #defineFDSIZE1024 #defineEPOLLEVENTS20 staticvoidhandle_connection(intsockfd); staticvoid handle_events(intepollfd,structepoll_event*events,intnum,intsockfd,char*buf); staticvoiddo_read(intepollfd,intfd,intsockfd,char*buf); staticvoiddo_read(intepollfd,intfd,intsockfd,char*buf); staticvoiddo_write(intepollfd,intfd,intsockfd,char*buf); staticvoidadd_event(intepollfd,intfd,intstate); staticvoiddelete_event(intepollfd,intfd,intstate); staticvoidmodify_event(intepollfd,intfd,intstate); intmain(intargc,char*argv[]) { intsockfd; structsockaddr_inservaddr; sockfd=socket(AF_INET,SOCK_STREAM,0); bzero(&servaddr,sizeof(servaddr)); servaddr.sin_family=AF_INET; servaddr.sin_port=htons(SERV_PORT); inet_pton(AF_INET,IPADDRESS,&servaddr.sin_addr); connect(sockfd,(structsockaddr*)&servaddr,sizeof(servaddr)); //处理连接 handle_connection(sockfd); close(sockfd); return0; } staticvoidhandle_connection(intsockfd) { intepollfd; structepoll_eventevents[EPOLLEVENTS]; charbuf[MAXSIZE]; intret; epollfd=epoll_create(FDSIZE); add_event(epollfd,STDIN_FILENO,EPOLLIN); for(;;) { ret=epoll_wait(epollfd,events,EPOLLEVENTS,-1); handle_events(epollfd,events,ret,sockfd,buf); } close(epollfd); } staticvoid handle_events(intepollfd,structepoll_event*events,intnum,intsockfd,char*buf) { intfd; inti; for(i=0;i<num;i ) { fd=events[i].data.fd; if(events[i].events&EPOLLIN) do_read(epollfd,fd,sockfd,buf); elseif(events[i].events&EPOLLOUT) do_write(epollfd,fd,sockfd,buf); } } staticvoiddo_read(intepollfd,intfd,intsockfd,char*buf) { intnread; nread=read(fd,buf,MAXSIZE); if(nread==-1) { perror("readerror:"); close(fd); } elseif(nread==0) { fprintf(stderr,"serverclose.\n"); close(fd); } else { if(fd==STDIN_FILENO) add_event(epollfd,sockfd,EPOLLOUT); else { delete_event(epollfd,sockfd,EPOLLIN); add_event(epollfd,STDOUT_FILENO,EPOLLOUT); } } } staticvoiddo_write(intepollfd,intfd,intsockfd,char*buf) { intnwrite; nwrite=write(fd,buf,strlen(buf)); if(nwrite==-1) { perror("writeerror:"); close(fd); } else { if(fd==STDOUT_FILENO) delete_event(epollfd,fd,EPOLLOUT); else modify_event(epollfd,fd,EPOLLIN); } memset(buf,0,MAXSIZE); } staticvoidadd_event(intepollfd,intfd,intstate) { structepoll_eventev; ev.events=state; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_ADD,fd,&ev); } staticvoiddelete_event(intepollfd,intfd,intstate) { structepoll_eventev; ev.events=state; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_DEL,fd,&ev); } staticvoidmodify_event(intepollfd,intfd,intstate) { structepoll_eventev; ev.events=state; ev.data.fd=fd; epoll_ctl(epollfd,EPOLL_CTL_MOD,fd,&ev); }

,