Epoll模型详解

2021年09月15日 阅读数:4
这篇文章主要向大家介绍Epoll模型详解,主要内容包括基础应用、实用技巧、原理机制等方面,希望对大家有所帮助。
Linux I/O多路复用技术在比较多的TCP网络服务器中有使用,即比较多的用到select函数。Linux 2.6内核中有提升网络I/O性能的新方法,即epoll 。

一、为何select落后
首先,在Linux内核中,select所用到的FD_SET是有限的,即内核中有个参数__FD_SETSIZE定义了每一个FD_SET的句柄个数,在我用的2.6.15-25-386内核中,该值是1024,搜索内核源代码获得:
include/linux/posix_types.h:
#define __FD_SETSIZE         1024
也就是说,若是想要同时检测1025个句柄的可读状态是不可能用select实现的。或者同时检测1025个句柄的可写状态也是不可能的。其次, 内核中实现select是使用轮询方法,即每次检测都会遍历全部FD_SET中的句柄,显然,select函数的执行时间与FD_SET中句柄的个数有一个比例关系,即select要检测的句柄数越多就会越费时。固然,在前文中我并无说起poll方法,事实上用select的朋友必定也试过poll,我我的以为 select和poll大同小异 ,我的偏好于用select而已。

二、内核中提升I/O性能的新方法 epoll
epoll是什么?按照man手册的说法:是为处理大批量句柄而做了改进的poll。要使用epoll只须要如下的三个系统函数调用:epoll_create(2), epoll_ctl(2), epoll_wait(2)。

Linux2.6内核epoll介绍
先介绍2本书 《The Linux Networking Architecture--Design and Implementation of Network Protocols in the Linux Kernel》, 以2.4内核讲解Linux TCP/IP实现,至关不错。做为一个现实世界中的实现,不少时候你必须做不少权衡,这时候参考一个久经考验的系统更有实际意义。举个例子,linux内 核中sk_buff结构为了追求速度和安全,牺牲了部份内存,因此在发送TCP包的时候,不管应用层数据多大,sk_buff最小也有272的字节。其实 对于socket应用层程序来讲,另一本书 《UNIX Network Programming Volume 1》意义更大一点。2003年的时候,这本书出了最新的第3版本,不过主要仍是修订第2版本。 其中第6章《I/O Multiplexing》是最重要的,Stevens给出了网络IO的基本模型。在 这里最重要的莫过于select模型和Asynchronous I/O模型。从理论上说,AIO彷佛是最高效的,你的IO操做能够当即返回,而后等待os告诉你IO操做完成。可是一直以来,如何实现就没有一个完美的方 案。最著名的windows完成端口实现的AIO,实际上也只是内部用线程池实现的罢了,最后的结果是IO有个线程池,你的应用程序也须要一个线程 池...... 不少文档其实已经指出了这引起的线程context-switch所带来的代价。在linux 平台上,关于网络AIO一直是改动最多的地方,2.4的年代就有不少AIO内核patch,最著名的应该算是SGI。可是一直到2.6内核发布,网络模块 的AIO一直没有进入稳定内核版本(大部分都是使用用户线程模拟方法,在使用了NPTL的linux上面其实和windows的完成端口基本上差很少 了)。2.6内核所支持的AIO特指磁盘的AIO---支持io_submit(),io_getevents()以及对Direct IO的支持(即:就是绕过VFS系统buffer直接写硬盘,对于流服务器在内存平稳性上有至关的帮助)。
因此,剩下的select模型基本上就成为咱们在linux上面的惟一选择,其实,若是加上no-block socket的配置,能够完成一个"伪"AIO的实现,只不过推进力在于你而不是os而已。不过传统的select/poll函数有着一些没法忍受的缺 点,因此改进一直是2.4-2.5开发版本内核的任务,包括/dev/poll,realtime signal等等。最终,Davide Libenzi开发的epoll进入2.6内核成为正式的解决方案。

三、epoll的优势
<1> 支持一个进程打开大数目的socket描述符(FD)
select 最不能忍受的是一个进程所打开的FD是有必定限制的,由FD_SETSIZE设置, 默认值是2048。对于那些须要支持上万链接数目的IM服务器来讲显然太少了。这时候你一是能够选择修改这个宏而后从新编译内核,不过资料也同时指出这样 会带来网络效率的降低;二是能够选择多进程的解决方案(传统的Apache方案),不过虽然linux上面建立进程的代价比较小,但仍旧是不可忽视的,加 上进程间数据同步远比不上线程间同步高效,因此这也不是一种完美的方案。 不过epoll 没有这个限制,它所支持的FD上限是最大能够打开文件的数目,这个数字通常远大于select 所支持的2048。举个例子,在1GB内存的机器上大约是10万左右,具体数目能够cat /proc/sys/fs/file-max察看,通常来讲这个数目和系统内存关系很大。

<2> IO效率不随FD数目增长而线性降低
传统select/poll的另外一个致命弱点就是当你拥有一个很大的socket集合,因为网络得延时,使得任一时间只有部分的socket是"活跃" 的,而select/poll每次调用都会线性扫描所有的集合,致使效率呈现线性降低。可是epoll不存在这个问题,它只会对"活跃"的socket进 行操做---这是由于在内核实现中epoll是根据每一个fd上面的callback函数实现的。因而,只有"活跃"的socket才会主动去调用 callback函数,其余idle状态的socket则不会,在这点上,epoll实现了一个"伪"AIO,由于这时候推进力在os内核。在一些 benchmark中,若是全部的socket基本上都是活跃的---好比一个高速LAN环境,epoll也不比select/poll低多少效率,但若 过多使用的调用epoll_ctl,效率稍微有些降低。然而一旦使用idle connections模拟WAN环境,那么epoll的效率就远在select/poll之上了。

<3> 使用mmap加速内核与用户空间的消息传递
这点实际上涉及到epoll的具体实现。不管是select,poll仍是epoll都须要内核把FD消息通知给用户空间,如何避免没必要要的内存拷贝就显 得很重要,在这点上,epoll是经过内核于用户空间mmap同一块内存实现的。而若是你像我同样从2.5内核就开始关注epoll的话,必定不会忘记手 工mmap这一步的。

<4> 内核微调
这一点其实不算epoll的优势,而是整个linux平台的优势。也许你能够怀疑linux平台,可是你没法回避linux平台赋予你微调内核的能力。比 如,内核TCP/IP协议栈使用内存池管理sk_buff结构,能够在运行期间动态地调整这个内存pool(skb_head_pool)的大小---通 过echo XXXX>/proc/sys/net/core/hot_list_length来完成。再好比listen函数的第2个参数(TCP完成3次握 手的数据包队列长度),也能够根据你平台内存大小来动态调整。甚至能够在一个数据包面数目巨大但同时每一个数据包自己大小却很小的特殊系统上尝试最新的 NAPI网卡驱动架构。

四、epoll的工做模式
   使人高兴的是,linux2.6内核的epoll比其2.5开发版本的/dev/epoll简洁了许多,因此,大部分状况下,强大的东西每每是简单的。惟一有点麻烦的是 epoll有2种工做方式:LT和ET
   LT(level triggered)是缺省的工做方式,而且同时支持block和no-block socket。在这种作法中,内核告诉你一个文件描述符是否就绪了,而后你能够对这个就绪的fd进行IO操做。若是你不做任何操做,内核仍是会继续通知你的,因此,这种模式编程出错误可能性要小一点。 传统的select/poll都是这种模型的表明
ET (edge-triggered) 是高速工做方式,只支持no-block socket。 在这种模式下,当描述符从未就绪变为就绪时,内核就经过epoll告诉你,而后它会假设你知道文件描述符已经就绪,而且不会再为那个文件描述符发送更多的 就绪通知,直到你作了某些操做而致使那个文件描述符再也不是就绪状态(好比 你在发送,接收或是接受请求,或者发送接收的数据少于必定量时致使了一个EWOULDBLOCK 错误)。可是请注意,若是一直不对这个fd做IO操做(从而致使它再次变成未就绪),内核就不会发送更多的通知(on ly on ce)。不过在TCP协议中,ET模式的加速效用仍须要更多的benchmark确认。
epoll只有epoll_create,epoll_ctl,epoll_wait 3个系统调用,具体用法请参考 http://www.xmailserver.org/linux-patches/nio-improve.html,在 http://www.kegel.com/rn/也有一个完整的例子,你们一看就知道如何使用了。

五、 epoll的使用方法
epoll用到的全部函数都是在头文件sys/epoll.h中声明的,下面简要说明所用到的数据结构和函数:
所用到的数据结构:
typedef union epoll_data {
void *ptr;
int fd;
__uint32_t u32;
__uint64_t u64;
} epoll_data_t;
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
结构体epoll_event 被用于注册所感兴趣的事件和回传所发生待处理的事件,而epoll_data 联合体用来保存触发事件的某个文件描述符相关的数据。例如一个client链接到服务器,服务器经过调用accept函数能够获得于这个client对应的socket文件描述符,能够把这文件描述符赋给epoll_data的fd字段,以便后面的读写操做在这个文件描述符上进行。epoll_event 结构体的events字段是表示感兴趣的事件和被触发的事件,可能的取值为:
EPOLLIN:表示对应的文件描述符能够读;
EPOLLOUT:表示对应的文件描述符能够写;
EPOLLPRI:表示对应的文件描述符有紧急的数据可读;
EPOLLERR:表示对应的文件描述符发生错误;
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET:表示对应的文件描述符有事件发生;
所用到的函数:
1)、epoll_create函数
函数声明:int epoll_create(int size)
该函数生成一个epoll专用的文件描述符,其中的参数是指定生成描述符的最大范围。
2)、epoll_ctl函数
函数声明:int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
该函数用于控制某个文件描述符上的事件,能够注册事件,修改事件,删除事件。
参数:
epfd:由 epoll_create 生成的epoll专用的文件描述符;
op:要进行的操做,可能的取值EPOLL_CTL_ADD 注册、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除;
fd:关联的文件描述符;
event:指向epoll_event的指针;
若是调用成功则返回0,不成功则返回-1。
3)、epoll_wait函数
函数声明:int epoll_wait(int epfd,struct epoll_event * events,int maxevents,int timeout)
该函数用于轮询I/O事件的发生。
参数:
epfd:由epoll_create 生成的epoll专用的文件描述符;
epoll_event:用于回传代处理事件的数组;
maxevents:每次能处理的事件数;
timeout:等待I/O事件发生的超时值;
返回发生事件数。

首先经过create_epoll(int maxfds)来建立一个epoll的句柄,其中maxfds为你的epoll所支持的最大句柄数。这个函数会返回一个新的epoll句柄,以后的全部操做都将经过这个句柄来进行操做。 在用完以后,记得用close()来关闭这个建立出来的epoll句柄
以后在你的网络主循环里面调用epoll_wait(int epfd, epoll_event events, int max_events, int timeout)来查询全部的网络接口,看哪个能够读,哪个能够写。基本的语法为:
nfds = epoll_wait(kdpfd, events, maxevents, -1);
其中 kdpfd为用epoll_create建立以后的句柄, events是一个epoll_event*的指针,当epoll_wait函数操做成功以后,events里面将储存全部的读写事件。 max_events是当前须要监听的全部socket句柄数。最后一个 timeout参 数指示 epoll_wait的超时条件,为0时表示立刻返回;为-1时表示函数会一直等下去直到有事件返回;为任意正整数时表示等这么长的时间,若是一直没有事 件,则会返回。通常状况下若是网络主循环是单线程的话,能够用-1来等待,这样能够保证一些效率,若是是和主循环在同一个线程的话,则能够用0来保证主循 环的效率。 epoll_wait返回以后,应该进入一个循环,以便遍历全部的事件。
   对epoll 的操做就这么简单,总共不过4个API:epoll_create, epoll_ctl, epoll_wait和close。如下是man中的一个例子。
struct epoll_event ev, *events;
for(;;) {
nfds = epoll_wait(kdpfd, events, maxevents, -1); //等待I/O事件
for(n = 0; n < nfds; ++n) {
if(events[n].da ta.fd == listener) { //若是是主socket的事件,则表示有新链接进入,须要进行新链接的处理。
client = accept(listener, (struct sockaddr *) &local,  &addrlen);
if(client < 0){
perror("accept error");
continue;
}
setnonblocking(client); // 将新链接置于非阻塞模式
ev.events = EPOLLIN | EPOLLET;
//注意这里的参数EPOLLIN | EPOLLET并无设置对写socket的监听,
//若是有写操做的话,这个时候epoll是不会返回事件的,
//若是要对写操做也监听的话,应该是EPOLLIN | EPOLLOUT | EPOLLET。
ev.da ta.fd = client; // 而且将新链接也加入EPOLL的监听队列
if ( epoll_ctl(kdpfd, EPOLL_CTL_ADD, client, &ev ) < 0) {  // 设置好event以后,将这个新的event经过epoll_ctl
//加入到epoll的监听队列里,这里用EPOLL_CTL_ADD
//来加一个新的 epoll事件。能够经过EPOLL_CTL_DEL来减小
//一个epoll事件,经过EPOLL_CTL_MOD来改变一个事件的监听方式。
fprintf(stderr, "epoll set insertion error: fd=%d"0, client);
return -1;
}
}  else // 若是不是主socket的事件的话,则表明这是一个用户的socket的事件,
// 则用来处理这个用户的socket的事情是,好比说read(fd,xxx)之类,或者一些其余的处理。
do_use_fd(events[n].da ta.fd);
}
}
六、Linux下epoll编程实例
 epoll 模型彷佛只有一种格式,因此你们只要参考下面的代码,就可以对epoll有所了解了。
while (TRUE) {
int nfds = epoll_wait (m_epoll_fd, m_events, MAX_EVENTS, EPOLL_TIME_OUT); //等待EPOLL事件的发生
//至于相关的端口,则须要在初始化EPOLL的时候绑定。
if (nfds <= 0)    continue;
m_bOnTimeChecking = FALSE;
g_CurTime = time(NULL);
for (int i=0; i<nfds; i++) {
try {
if (m_events[i].da ta.fd == m_listen_http_fd) //若是新监测到一个HTTP用户链接到绑定的HTTP端口则创建新链接。
{
On AcceptHttpEpoll ();
} else if (m_events[i].da ta.fd == m_listen_sock_fd) //若是新监测到一个SOCKET用户链接到了绑定的SOCKET端口则
//创建新的链接。
{
On AcceptSockEpoll ();
} else if (m_events[i].events & EPOLLIN) //若是是已经链接的用户,而且收到数据,那么进行读入操做。
{
On ReadEpoll (i);
}
On WriteEpoll (i); //查看当前的活动链接是否有须要写出的数据。
} catch (int) {
PRINTF ("CATCH捕获错误\n");
continue;
}
}
m_bOnTimeChecking = TRUE;
On Timer (); //进行一些定时的操做,主要就是删除一些断线用户等。
}

*****************************************************************************************************************************************


<img title="Linux" epoll模型="" -="" 小伟="" 小伟的博客"="" src="http://www.cppblog.com/p_w_picpaths/cppblog_com/khan/6584/r_epoll.jpg" style="padding: 0px; margin: 0px; vertical-align: top; border: none;">

Epoll模型主要负责对大量并发用户的请求进行及时处理,完成服务器与客户端的数据交互。其具体的实现步骤以下:
(a) 使用epoll_create()函数建立文件描述,设定可管理的最大socket描述符数目。
(b) 建立与epoll关联的接收线程,应用程序能够建立多个接收线程来处理epoll上的读通知事件,线程的数量依赖于程序的具体须要。
(c) 建立一个侦听socket的描述符ListenSock, 并将该描述符设定为非阻塞模式,调用Listen()函数在该套接字上侦听有无新的链接请求,在epoll_event结构中设置要处理的事件类型EPOLLIN,工做方式为 epoll_ET,以提升工做效率,同时使用epoll_ctl()来注册事件,最后启动网络监视线程。
(d) 网络监视线程启动循环,epoll_wait()等待epoll事件发生。
(e) 若是epoll事件代表有新的链接请求,则调用accept()函数,将用户socket描述符添加到epoll_da ta联合体, 同时设定该描述符为非阻塞,并在epoll_event结构中设置要处理的事件类型为读和写,工做方式为epoll_ET。
(f) 若是epoll事件代表socket描述符上有数据可读,则将该socket描述符加入可读队列,通知接收线程读入数据,并将接收到的数据放入到接收数据的链表中,经逻辑处理后,将反馈的数据包放入到发送数据链表中,等待由发送线程发送。
例子代码:
#include <iostream>
#include <sys/socket.h>
#include <sys/epoll.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <fcntl.h>
#include <unistd.h>
#include <stdio.h>
#define MAXLINE 10
#define OPEN_MAX 100
#define LISTENQ 20
#define SERV_PORT 5555
#define INFTIM 1000
void setnonblocking(int sock)
{
int opts;
opts=fcntl(sock,F_GETFL);
 if(opts<0)
{
perror("fcntl(sock,GETFL)");
exit(1);
}
 opts = opts | O_NONBLOCK;
 if(fcntl(sock,F_SETFL,opts)<0)
{
perror("fcntl(sock,SETFL,opts)");
exit(1);
}
}

int main()
{
int i, maxi, listenfd, connfd, sockfd, epfd, nfds;
ssize_t n;
char line[MAXLINE];
socklen_t clilen;
 struct epoll_event ev,events[20]; //声明epoll_event结构体的变量, ev用于注册事件, events数组用于回传要处理的事件
epfd=epoll_create(256); //生成用于处理accept的epoll专用的文件描述符, 指定生成描述符的最大范围为256
 struct sockaddr_in clientaddr;
struct sockaddr_in serveraddr;
 listenfd = socket(AF_INET, SOCK_STREAM, 0);
 setnonblocking(listenfd); //把用于监听的socket设置为非阻塞方式
 ev.da ta.fd=listenfd; //设置与要处理的事件相关的文件描述符
ev.events=EPOLLIN | EPOLLET; //设置要处理的事件类型
epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd,&ev); //注册epoll事件
 bzero(&serveraddr, sizeof(serveraddr));
serveraddr.sin_family = AF_INET;
char *local_addr="200.200.200.204";
inet_aton(local_addr,&(serveraddr.sin_addr));
serveraddr.sin_port=htons(SERV_PORT);  //或者htons(SERV_PORT);
 bind(listenfd,(sockaddr *)&serveraddr, sizeof(serveraddr));
 listen(listenfd, LISTENQ);
 maxi = 0;
for( ; ; ) {
nfds=epoll_wait(epfd,events,20,500); //等待epoll事件的发生
for(i=0;i<nfds;++i) //处理所发生的全部事件
{
if(events[i].da ta.fd==listenfd)    /**监听事件**/
{
connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen);
if(connfd<0){
perror("connfd<0");
exit(1);
}
        setnonblocking(connfd); //把客户端的socket设置为非阻塞方式
        char *str = inet_ntoa(clientaddr.sin_addr);
std::cout<<"connect from "<_u115 ? tr<<std::endl;
        ev.da ta.fd=connfd; //设置用于读操做的文件描述符
ev.events=EPOLLIN | EPOLLET; //设置用于注测的读操做事件
epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //注册ev事件
}
else if(events[i].events&EPOLLIN)     /**读事件**/
{
if ( (sockfd = events[i].da ta.fd) < 0) continue;
if ( (n = read(sockfd, line, MAXLINE)) < 0) {
if (errno == ECONNRESET) {
close(sockfd);
events[i].da ta.fd = -1;
} else
{
std::cout<<"readline error"<<std::endl;
}
} else if (n == 0) {
close(sockfd);
events[i].da ta.fd = -1;
}
         ev.da ta.fd=sockfd; //设置用于写操做的文件描述符
ev.events=EPOLLOUT | EPOLLET; //设置用于注测的写操做事件
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要处理的事件为EPOLLOUT
}
else if(events[i].events&EPOLLOUT)    /**写事件**/
{
sockfd = events[i].da ta.fd;
write(sockfd, line, n);
         ev.da ta.fd=sockfd; //设置用于读操做的文件描述符
ev.events=EPOLLIN | EPOLLET; //设置用于注册的读操做事件
epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改sockfd上要处理的事件为EPOLIN
}
}
}
}