关于ppc、tpc的问题、i/o复用的由来、epoll和select的详细对比参考文章:io复用与并发编程
1 ppc、tpc模型
传统的网络服务器是用一个单独的线程或进程处理每一个连接。对于高性能的应用,这需要在某一个时刻同时处理大量的客户请求,这种模式效率不高,因为(process per connection,ppc), tpc(thread per connection)模型一次处理许多客户连接,那么随着连接客户的增多,那么资源使用、进程/线程环境切换等的时空花销就会很大。
2 select 模型
参看文章
1) 最大并发数限制,因为一个进程所打开的 fd (文件描述符)是有限制的,由 fd_setsize 设置,默认值是 1024,因此 select 模型的最大并发数就被相应限制了。如果要改变fd_size的大小需要重新编译内核。
int select(int maxfdp1, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
2)
效率问题, select 每次调用都会
线性扫描全部的 fd 集合,花费时间为o(n),这样效率就会呈现线性下降,即使将 fd_setsize 改大其性能也会很差。
3)
内核/用户空间内存拷贝问题,
select 采取了内存拷贝方法让内核把 fd 消息通知给用户空间。
4)事件集,select的参数类型fd_set没有将文件描述符和事件绑定,它仅仅是一个文件描述符的集合,因此select需要提供3个“值-结果”类型的参数分别传入和输出可读、可写和异常等事件(调用该函数时,指定所关心的描述符的值,函数返回时,结果将指示哪些描述符已经就绪),也就是select函数会修改指针readset、writeset、exceptset所指向的描述符集。
一方面,使得select不能处理更多类型的事件,所能处理的事件类型只有读写异常三类;
另一方面,描述符集内任何与未就绪描述符对应的位返回时都会被清空,因此每次重新调用select时,都需要再次把所有的描述符集内所关心的位置为1。
5)select函数的定时是有函数的最后一个参数决定的,它是一个timeval结构体,用于指定这段时间的秒数和微秒数。
//timeval结构:
struct timeval
{
long tv_sec; /* seconds */
long tv_usec; /* microseconds */
};
3 poll模型
1)最大并发数限制,poll的第二个参数nfds是第一个参数指示的结构数据的元素个数,这个nfds并没有select的限制,它只受限于系统的内存空间(可以达到系统所允许打开的最大描述符的个数,即65535)。
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
2)
效率问题,效率和select类似。
3)内核/用户空间内存拷贝问题,和select类似。
4)事件集,poll比select要“聪明”,它将描述符和事件定义在一起,任何事件都被统一处理,编程接口简洁许多。
一方面,poll可以监听的事件类型就可以更细分为很多种(参考文章:)。
另一方面,而且内核每次修改的是pollfd结构体的revents成员,而events成员不变,因此下次重新调用poll无需重置pollfd类型中的事件集参数(避免了类似于select使用的的“值-结果”参数)。
5)poll的定时也是由函数的最后一个参数给出,但是它是一个int类型(指定函数要等待的毫秒数),而不是timeval结构体。
此外,从当今的可移植性角度考虑,支持select的系统比支持poll的系统要多。
优点
epoll是什么?按照man手册的说法:是为处理大批量句柄而作了改进的poll。当然,这不是2.6内核才有的,它是在2.5.44内核中被引进的(epoll(4) is a new api introduced in linux kernel 2.5.44),它几乎具备了之前所说的一切优点,被公认为linux2.6下性能最好的多路i/o就绪通知方法。
(1)epoll 没有最大并发连接的限制,上限是最大可以打开文件的数目,这个数字一般远大于 2048, 一般来说这个数目和系统内存关系很大 ,具体值可以 cat /proc/sys/fs/file-max[599534] 察看。
(2)效率提升, epoll最大的优点就在于它基于事件的就绪通知方式:只管“活跃”的连接 ,而跟连接总数无关,其算法复杂度为o(1),因此在实际的网络环境中,epoll的效率就会远远高于 select 和 poll 。select和poll都是轮询方式的,每次调用要扫描整个注册文件描述符集合,并将其中就绪描述符返回给用户程序。epoll_wait采用的是回调方式,内核检测到就绪描述符时,将触发回调函数,回调函数就将该文件描述符上对应的事件插入内核就绪队列,不需要轮询。
(3)内存拷贝, epoll 在这点上使用了“共享内存“,因此没有内存拷贝的开销。
epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。
另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
5)和poll类似,epoll的定时也是int类型,单位是毫秒。
不足
epoll的局限性在于它在linux2.6才实现,而其他平台都没有,这与apache这样的优秀跨平台服务器无法并论。select跨平台性能很好,几乎每个平台都支持。
int epoll_create(int size);
int epoll_create1(int flags);
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
第一个函数:
对于epoll_create1 的flag参数: 可以设置为0 或epoll_cloexec,为0时函数表现与epoll_create一致, epoll_cloexec标志与open 时的o_cloexec 标志类似,即进程被替换时会关闭打开的文件描述符。
创建一个epoll的句柄。自从linux2.6.8之后,size参数是被忽略的。需要注意的是,当创建好epoll句柄后,它就是会占用一个fd值,在linux下如果查看/proc/进程id/fd/,是能够看到这个fd的,所以在使用完epoll后,必须调用close()关闭,否则可能导致fd被耗尽。
第二个函数:
epoll的事件注册函数,它不同于select()是在监听事件时告诉内核要监听什么类型的事件,而是在这里先注册要监听的事件类型。
第一个参数是epoll_create()的返回值。
第二个参数表示动作,用三个宏来表示:
epoll_ctl_add:注册新的fd到epfd中;epoll_ctl_mod:修改已经注册的fd的监听事件;epoll_ctl_del:从epfd中删除一个fd;
第三个参数是需要监听的fd。
第四个参数是告诉内核需要监听什么事,struct epoll_event结构如下:
struct epoll_event
{
uint32_t events; /* epoll events */
epoll_data_t data; /* user data variable */
};
typedef union epoll_data
{
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
events可以是以下几个宏的集合:
epollin :表示对应的文件描述符可以读(包括对端socket正常关闭);
epollout:表示对应的文件描述符可以写;
epollpri:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来);
epollerr:表示对应的文件描述符发生错误;
epollhup:表示对应的文件描述符被挂断;
epollet: 将epoll设为边缘触发(edge triggered)模式,这是相对于水平触发(level triggered)来说的。
epolloneshot:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到epoll队列里
第三个函数:
收集在epoll监控的事件中已经发生的事件。参数events是分配好的epoll_event结构体数组,epoll将会把发生的事件赋值到events数组中(events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存)。maxevents告之内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size,参数timeout是超时时间(毫秒,0会立即返回,-1将不确定,也有说法说是永久阻塞)。如果函数调用成功,返回对应i/o上已准备好的文件描述符数目,如返回0表示已超时。
epoll有level-triggered和edge-triggered两种工作模式。
level-triggered是缺省工作方式,有阻塞和非阻塞两种方式。内核告诉你一个描述符是否就绪,然后可以对就绪的fd进行io操作。如果不做任何操作,内核还会继续通知,因此该模式下编程出错可能性小。传统的select/poll是这样的模型。此方式可以认为是一个快速的poll。
edge-triggered是只支持非阻塞模式。当一个新的事件到达时,et模式从epoll_wait调用中获取该事件,如果这次没有将该事件对应的套接字缓冲区处理完,在这个套接字中没有新的事件再次到来时,在et模式下是无法再次从epoll_wait调用中获取这个事件的。而lt模式,只要一个事件对应的套接字缓冲区中还有数据,就总能从epoll_wait中获取这个事件。
二者的差异在于 level-trigger 模式下只要某个 fd 处于 readable/writable 状态,无论什么时候进行 epoll_wait 都会返回该 fd;而 edge-trigger 模式下只有某个 fd 从unreadable 变为 readable 或从 unwritable 变为 writable 时,epoll_wait 才会返回该 fd。
使用lt意味着只要fd处于readable/writable状态,每次 epoll_wait 时都会返回该 fd,系统开销不说,自己处理时每次都要把这些fd轮询一遍,如果fd很多的话,不管这些fd有没有事件发生,epoll_wait 都会触发这些fd的轮询判断。
查阅了一些资料,才知道常用的事件处理库很多都选择了 lt 模式,包括大家熟知的libevent和boost::asio等,为什么选择lt呢?那就不得不从et的弊端的弊端说起。
et模式下,当有事件发生时,系统只会通知你一次,也就是调用epoll_wait 返回fd后,不管事件你处理与否,或者处理完全与否,再调用epoll_wait 时,都不会再返回该fd,这样programmer要自己保证在事件发生时及时有效的处理完。比如此时fd发生了epollin事件,在调用epoll_wait 后发现此事件,programmer要保证在本次轮询中对此fd进行了读操作,并且还要循环调用recv操作,一直读到recv的返回值小于请求值,或者遇到eagain错误,不然下次轮询时,如果此fd没有再次触发事件,你就没有机会知道这个fd需要你的处理。这样无形中就增加了programmer的负担和出错的机会。
et模式的短处正是lt模式的长处,无论此fd是否有事件发生,或者有事件未处理完,每次epoll_wait 时总会得到此fd供你处理。显而易见,os在lt模式下维护的 ready list 的大小肯定比et模式下长,而且你自己轮询所有的fd时也要比et下要多,这种消耗和et模式下循环调用处理函数(如recv和send等),还要逻辑处理是否处理完毕,理论上应该是lt更大一些,不过个人感觉应该差别不会太大。但是lt模式下带来的逻辑处理的方便性和不易出错性,让我们有理由把它作为首选。我想这可能也是为什么epoll后来在et的基础上又增加了lt,并且将其作为默认模式的原因吧。
在epoll的et模式下,正确的读写方式为:
读:只要可读,就一直读,直到返回0,或者 errno = eagain
写:只要可写,就一直写,直到数据发送完,或者 errno = eagain
例如,向socket中写数据:
从socket中读数据:
epoll在et模式下的使用,在下面代码段中,非阻塞模式下,函数do_use_fd函数新到达的文件描述符知道eagain由read函数或write函数返回。
#define max_events 10
struct epoll_event ev, events[max_events];
int listen_sock, conn_sock, nfds, epollfd;
/* set up listening socket, 'listen_sock' (socket(),bind(), listen()) */
epollfd = epoll_create(10);
if (epollfd == -1) {
perror("epoll_create");
exit(exit_failure);
}
ev.events = epollin;
ev.data.fd = listen_sock;
if (epoll_ctl(epollfd, epoll_ctl_add, listen_sock, &ev) == -1) {
perror("epoll_ctl: listen_sock");
exit(exit_failure);
}
for (;;) {
nfds = epoll_wait(epollfd, events, max_events, -1);
if (nfds == -1) {
perror("epoll_pwait");
exit(exit_failure);
}
for (n = 0; n < nfds; n) {
if (events[n].data.fd == listen_sock) {
conn_sock = accept(listen_sock,
(struct sockaddr *) &local, &addrlen);
if (conn_sock == -1) {
perror("accept");
exit(exit_failure);
}
setnonblocking(conn_sock);
ev.events = epollin | epollet;
ev.data.fd = conn_sock;
if (epoll_ctl(epollfd, epoll_ctl_add, conn_sock, &ev) == -1) {
perror("epoll_ctl: conn_sock");
exit(exit_failure);
}
} else {
do_use_fd(events[n].data.fd);
}
}
}
客户端使用里的客户端。
服务器端代码(这里是github上源码):
//该版本使用epoll代替poll
#include "myheader.h"
#define open_max 1024
#define maxevents 64
//该函数是将套接字设置为非阻塞方式
//使用的代码是fcntl开启非阻塞i/o的典型代码
static int make_socket_non_blocking(int sfd) {
int flags;
if ((flags = fcntl(sfd, f_getfl, 0)) == -1)
err_quit("fcntl f_getel error");
flags |= o_nonblock;
if (fcntl(sfd, f_setfl, flags) == -1)
err_quit("fcntl f_setfl error");
return 0;
}
int main(int argc, char **argv)
{
int i, maxi, listenfd, connfd, sockfd;
int nready;
ssize_t n;
char buf[maxline];
socklen_t clilen;
struct epoll_event event, events[maxevents];
struct sockaddr_in cliaddr, servaddr;
listenfd = socket(af_inet, sock_stream, 0);
memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = af_inet;
servaddr.sin_addr.s_addr = htonl(inaddr_any);
servaddr.sin_port = htons(serv_port);
// 在socket()和bind()之间设置套接字选项避免地址使用错误:
//结束服务器程序后“bind error: address already in use”
int opt = 1;
if ( setsockopt(listenfd, sol_socket, so_reuseaddr, &opt, sizeof(opt)) == -1)
err_exit("setsockopt error\n");
bind(listenfd, (const struct sockaddr*)&servaddr, sizeof(servaddr));
//设置监听套接字为非阻塞模式
if (make_socket_non_blocking(listenfd) == -1) {
err_exit("make_socket_non_blocking error");
}
listen(listenfd, listenq);
//创建一个epoll的句柄,该句柄占用一个fd值,因此epoll使用完后要关闭
int efd = epoll_create1(epoll_cloexec);
if (efd == -1)
err_exit("epoll_create1 error");
event.data.fd = listenfd; //要监听的事件类型,这里监听listen套接字
event.events = epollin | epollet; //读入,边缘触发方式
//epoll的事件注册函数,注册新的fd:listenfd到efd,并指明要监听的事件
int s = epoll_ctl(efd, epoll_ctl_add, listenfd, &event);
if (s == -1)
err_quit("epoll_ctl error");
for( ; ; ) {
int i;
//收集监听到的消息,返回值nfds是已经准备好的描述符的个数
//下面的for循环只扫描已经准备好的描述符,这正是epoll比poll的高效之处
//定时器时间设置为-1表示一直等待知道有事件就绪
int nfds = epoll_wait(efd, events, maxevents, -1);
for (i = 0; i < nfds; i ) {
/* an error has occured on this fd, or the socket is not
ready for reading (why were we notified then?)
如果描述符发生错误、被挂断或者不是可读的描述符都关闭该描述符并继续
*/
if ( (events[i].events & epollerr) || (events[i].events & epollhup) ||
(!(events[i].events & epollin))) {
fprintf(stderr, "epoll error\n");
close(events[i].data.fd);
continue;
}
/* we have a notification on the listening socket, which
means one or more incoming connections. */
else if (listenfd == events[i].data.fd) {
for ( ; ; ) {
struct sockaddr cliaddr;
char hbuf[maxline], sbuf[maxline];
socklen_t clilen = sizeof(cliaddr);
int connfd = accept(listenfd, (const struct sockaddr*)&cliaddr,
&clilen);
if (connfd == -1) {
//因为上边设置listenfd为非阻塞了,所以accept处理完listenfd后
//没有可以处理的套接字了,所以会返回eagin错误表示accept处理完了
//参考http://blog.csdn.net/u013074465/article/details/44993227
if ((errno == eagain) || (errno == ewouldblock))
break;
else { perror("accept ddddddd"); break;};
}
//将地址转化为主机名或者服务名
//flag参数:以数字名返回主机地址和服务地址
int s = getnameinfo(&cliaddr, clilen, hbuf, sizeof(hbuf),
sbuf, sizeof(sbuf), ni_numerichost
| ni_numericserv);
if (s == 0)
printf("accepted connection on descriptor %d "
"(host=%s, port=%s)\n", connfd, hbuf, sbuf);
/* make the connection socket non-blocking and add it to the
list of fds to monitor. */
if (make_socket_non_blocking(connfd) == -1)
err_exit("make_socket_non_blocking connfd error");
event.data.fd = connfd;
event.events = epollin | epollet;
if (epoll_ctl(efd, epoll_ctl_add, connfd, &event) == -1)
err_exit("epoll_ctl error connfd");
}
continue;
}
else {
/* we have data on the fd waiting to be read. read and
display it. we must read whatever data is available
completely, as we are running in edge-triggered mode
and won't get a notification again for the same
data. */
int done = 0;
for ( ; ;) {
ssize_t count = read(events[i].data.fd, buf, sizeof(buf));
if (count == -1) {
/* if errno == eagain, that means we have read all
data. so go back to the main loop. */
if (errno != eagain) {
printf("read...");
done = 1;
}
break;
}
else if (count == 0) {
/* end of file. the remote has closed the
connection. */
done = 1;
break;
}
write(stdout_fileno, buf, count); //write to stdout
}
if (done) {
printf("closed connection on descriptor %d\n", events[i].data.fd);
close(events[i].data.fd);
}
}
}
}
close(listenfd);
return exit_success;
}
启动服务器后,服务器分别接到了两个客户端的连接,客户端1连接并发送两条消息后客户退出;客户2发送一条消息后,服务器退出。
服务器:
客户1:
客户2: