netstat -apn 命令
netstat - apn | grep 8000
其中8000为端口号
poll 和 epoll 对于突破1024文件描述符设置是生效的
可以使用cat命令查看一个进程可以打开的socket描述符上限。
cat /proc/sys/fs/file-max :这个表示当前计算机所能打开的最大文件个数,受硬件影响。当前我们的计算机 是 393856
如有需要,可以通过修改配置文件的方式修改该上限值。
sudo vi /etc/security/limits.conf
在文件尾部写入以下配置,soft软限制,hard硬限制。如下图所示。
* soft nofile 65536 (默认是1024)对齐ulimit -a查看 open files
* hard nofile 100000
epoll
epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。
目前epoll是linux大规模并发网络程序中的热门首选模型。
epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
epoll 的实现原理:
epoll本质上是 一个 平衡二叉树,更加具体的说:是一个红黑树。
epoll_create 函数的可以看成是生成红色框框里面的epfd,那么要生成一个红黑树,一般要告诉系统,我这个树上 预期有多少个节点。也就是epoll_create 的参数。但是注意的是:这是一个建议值,假设我们传递的是100,当超过90个客户端访问的时候,系统可能就已经将这个建议的值改成200了,前期这么简单的理解就好。
epoll 三个重要的API
epoll_create 创建一个epoll句柄,参数size用来告诉内核监听的文件描述符的个数,跟内存大小有关。
#include <sys/epoll.h>
int epoll_create(int size)
size:监听数目(内核参考值)
返回值:成功:非负文件描述符;失败:-1,设置相应的errno
epoll_ctl 控制某个epoll监控的文件描述符上的事件:注册、修改、删除。也就是说,有了这个红黑树了,就要给树上挂节点了,或者删除节点,或者修改节点了
因此第一个参数是 这个红黑树的根节点,就是告诉系统,我要对这个红黑树进行改动。
第二个参数是说明,我是挂节点/删除节点/修改节点的哪一种
第三个参数是说明,我要挂的节点的 fd,或者删除节点的fd,或者修改节点的fd
第四个参数说明:
我对第三个参数的什么事件进行监控:events表明了是对读:EPOLLIN,写:EPOLLOUT,错误EPOLLERR, 边缘触发模式:EPOLLET
然后做为传入参数,描述了我要做什么事情,里面还有一个fd,对应了epoll_ctl的第三个参数。学到这里的时候有一个想法,就是为什么在还需要第四个参数中有一个fd,且这个fd和第三个参数是一样的,网上也没有找到合理的说明,那么只有一种可能,就是epoll_event 做为参数使用的时候,要用到fd,但是epoll的开发者又不想依赖于外部条件,因此将这个参数再填写一遍
void *ptr //这个todo
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
epfd:为epoll_creat的句柄
op:表示动作,用3个宏来表示:
EPOLL_CTL_ADD (注册新的fd到epfd),
EPOLL_CTL_MOD (修改已经注册的fd的监听事件),
EPOLL_CTL_DEL (从epfd删除一个fd);
event:告诉内核需要监听的事件
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr;
int fd; // 和 epoll_ctl的第三个参数一样
uint32_t u32; //不用
uint64_t u64; //不用
} epoll_data_t;
events 取值为:
EPOLLIN :表示对应的文件描述符可以读(包括对端SOCKET正常关闭)
EPOLLOUT:表示对应的文件描述符可以写
EPOLLPRI:表示对应的文件描述符有紧急的数据可读(这里应该表示有带外数据到来)
EPOLLERR:表示对应的文件描述符发生错误
EPOLLHUP:表示对应的文件描述符被挂断;
EPOLLET: 将EPOLL设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)而言的
EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个socket的话,需要再次把这个socket加入到EPOLL队列里
返回值:成功:0;失败:-1,设置相应的errno
epoll_wait 等待所监控文件描述符上有事件的产生,类似于select()调用。
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout)
events:用来存内核得到事件的集合,可简单看作数组。是传出参数,传出满足监听条件的那个fd结构体
maxevents:告之内核这个events有多大,这个maxevents的值不能大于创建epoll_create()时的size,
timeout:是超时时间
-1:阻塞
0:立即返回,非阻塞
>0:指定毫秒
返回值:成功返回有多少文件描述符就绪,时间到时返回0,出错返回-1
例子:一般形式的epoll1.c
#include <stdio.h> #include <stdlib.h> #include <unistd.h> #include <string.h> #include <arpa/inet.h> #include <ctype.h> #include <sys/types.h> #include <sys/socket.h> #include <errno.h> #include <sys/select.h> #include <sys/epoll.h> #include "wrap.h" #define MAXLINE 8192 #define SERV_PORT 8000 #define OPEN_MAX 5000 int main() { int ret = 0; //第一步,socket,创建套接字。On success, a file descriptor for the new socket is returned int listenfd = Socket(AF_INET, SOCK_STREAM,0); //第二步,setsockopt 设定端口复用,代码是固定的,当opt是1的时候,说明可以复用端口。 On success, zero is returned for the standard options. On error, -1 is returned, and errno is set appropriately. int opt = 1; Setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)); //第三步, bind函数,将socket和地址结构绑定 //int Bind(int fd, const struct sockaddr *sa, socklen_t salen) struct sockaddr_in servaddr; bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_port = htons(SERV_PORT); servaddr.sin_addr.s_addr = htonl(INADDR_ANY); Bind(listenfd, (struct sockaddr *)&servaddr,sizeof(servaddr)); //第四步,设置可以同时监听的最大的数量为1024,如果改成5000会不会错呢? Listen(listenfd, 1024); //Listen(listenfd, OPEN_MAX); //第五步,创建一个红黑树结点,我们建议这个红黑树的节点为5000 int epfd = Epoll_create(OPEN_MAX); //第六步,先将listenfd添加到 epfd这个红黑树上,EPOLL_CTL_ADD表示是添加节点.EPOLLIN表示监听读事件 struct epoll_event event; bzero(&event, sizeof(event)); event.events = EPOLLIN; event.data.fd = listenfd; //epoll_ct 函数的目的是给这颗红黑树上添加节点,删除节点,修改节点 Epoll_ctl(epfd,EPOLL_CTL_ADD,listenfd, &event); //第七步,这时候就需要弄一个循环监听了, 使用 epoll_wait函数等待连接 struct epoll_event realevents[OPEN_MAX]; int nready = 0; while (1) { //epoll_wait返回值nready为满足监听的总个数。realevents是传出参数,传出满足监听条件的所有的结构体 nready = epoll_wait(epfd, realevents, OPEN_MAX, -1); if (nready == -1) { perr_exit("epoll_wait error"); } for (int i = 0; i < nready;++i) { if (!(realevents[i].events & EPOLLIN)) { //如果不是"读"事件, 继续循环 continue; } int socketfd = realevents[i].data.fd; if (socketfd == listenfd) { //如果是listenfd 的读事件,说明有新的链接过来了,这时候要调用accpet struct sockaddr_in cliaddr; int cliaddrlen = sizeof(cliaddr); bzero(&cliaddr, cliaddrlen); printf("aaa\n"); int clientfd = Accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddrlen); char str[INET_ADDRSTRLEN] = {0};//#define INET_ADDRSTRLEN 16 printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port)); //然后将connfd,添加到红黑树上 struct epoll_event clientevent; bzero(&clientevent, sizeof(clientevent)); clientevent.events = EPOLLIN; clientevent.data.fd = clientfd; Epoll_ctl(epfd, EPOLL_CTL_ADD, clientfd, &clientevent); } else { //如果不是listenfd,那么就是connectfd了,说明这时候客户端有东西写过来,我们要从客户端读取数据 char buf[MAXLINE] = {0}; int realreadnum; REREADPOINT: realreadnum = Read(socketfd, buf, MAXLINE); if (realreadnum == 0) {//在网络环境下,read函数返回0,说明是对端关闭了,也就是说,客户端关闭了 //那么就应该关闭当前的connect端,并将该监听从 红黑树上 移除 printf("read done\n"); Epoll_ctl(epfd, EPOLL_CTL_DEL, socketfd, NULL); Close(socketfd); } else if (realreadnum == -1) { if (errno == EINTR) { //说明是被信号打断的,一般要重新read printf("信号打断\n"); goto REREADPOINT; } else if (errno == EAGAIN || errno == EWOULDBLOCK) { printf(" WOULDBLOCK \n"); //说明在打开文件的时候是使用的O_NONBLOCK方式打开的,但是没有读取到数据 //当前代码是不会走到这里的,因为前面代码select的最后一个参数用的NULL,是阻塞的 //一般在这里 也要重新读,但是这里有个问题,如果一直都读取不到,会不会死循环? goto REREADPOINT; } else if (errno == ECONNRESET) { //ECONNRESET 说明连接被重置了,因此要将该cfd关闭,并重新移除监听队列 printf("read done\n"); Epoll_ctl(epfd, EPOLL_CTL_DEL, socketfd, NULL); Close(socketfd); } else { //这就是真正的有问题了,注意这里不要exit程序,应该只是让打印log //不退出程序是因为,这时候还有其他的链接连上的 perror("read num <0"); } } else if (realreadnum > 0) { //真正的读取到了客户端发送过来的数据 for (int j = 0; j < realreadnum; ++j) { buf[j] = toupper(buf[j]); } Write(socketfd, buf, realreadnum); Write(STDOUT_FILENO, buf, realreadnum); } } } } Close(listenfd); Close(epfd); return ret; }
epoll进阶
事件模型
EPOLL事件有两种模型:
Edge Triggered (ET) 边缘触发只有数据到来才触发,不管缓存区中是否还有数据。
Level Triggered (LT) 水平触发只要有数据都会触发。默认是这种,举例说明:假设缓冲区有100个字节,epoll_wait在收到有读cfd事件后,我们的代码逻辑是从缓冲区中读取数据,但是我们每次只能读取10个字节,在读取10个字节后,epoll_wait继续会被调用,我们再读取10字节,直到缓冲区没有数据。ET则不一样,当第一次读取10个字节后,如果这时候没有给缓冲区写新的数据,则不会再次触发epoll_wait。
介绍这个ET模式有啥用?
LT是默认的,因此就不介绍了,ET有啥用呢?假设我们有这样一种需求,我们要从客户端读取的数据为一个mp4文件的简略信息,也就是mp4文件的名字,大小,时间长度,主演,缩略图。这些信息都会包含在一个mp4的头文件中,假设占用了8个字节(这个是编的哈,),但是实际上传递过来的数据可能包含了很多字节,这时候用这个ET模式就很好了。我们就读取8个字节,其他的我服务器端不要。
这里还存在一个问题,使用ET模式从cfd端读取数据是不能block的,这也是设计方面的,假设我们读取的数据mp4文件要读取8字节,才能将想要的信息完全搞定。但是实际上客户端在发送了2个字节后,网络卡了,甚至断了,那么由于我们一定要读满8个字节才罢休,因此有可能就阻塞了,导致程序就卡在这里等待后面的6个字节,且从代码逻辑来看,read数据的时候等不到对端数据会有大问题,(这里如不理解,需对照代码看),程序就也无法接受新的链接。
因此使用ET模式 和 non-block模式才是我们使用epoll的真正的实现方法。
思考如下步骤:
- 假定我们已经把一个用来从管道中读取数据的文件描述符(rfd)添加到epoll描述符。
- 管道的另一端写入了2KB的数据
- 调用epoll_wait,并且它会返回rfd,说明它已经准备好读取操作
- 读取1KB的数据
- 调用epoll_wait……
在这个过程中,有两种工作模式:
ET模式
ET模式即Edge Triggered工作模式。
如果我们在第1步将rfd添加到epoll描述符的时候使用了EPOLLET标志,那么在第5步调用epoll_wait之后将有可能会挂起,因为剩余的数据还存在于文件的输入缓冲区内,而且数据发出端还在等待一个针对已经发出数据的反馈信息。只有在监视的文件句柄上发生了某个事件的时候 ET 工作模式才会汇报事件。因此在第5步的时候,调用者可能会放弃等待仍在存在于文件输入缓冲区内的剩余数据。epoll工作在ET模式的时候,必须使用非阻塞套接口,以避免由于一个文件句柄的阻塞读/阻塞写操作把处理多个文件描述符的任务饿死。最好以下面的方式调用ET模式的epoll接口,在后面会介绍避免可能的缺陷。
- 基于非阻塞文件句柄
- 只有当read或者write返回EAGAIN(非阻塞读,暂时无数据)时才需要挂起、等待。但这并不是说每次read时都需要循环读,直到读到产生一个EAGAIN才认为此次事件处理完成,当read返回的读到的数据长度小于请求的数据长度时,就可以确定此时缓冲中已没有数据了,也就可以认为此事读事件已处理完成。
LT模式
LT模式即Level Triggered工作模式。
与ET模式不同的是,以LT方式调用epoll接口的时候,它就相当于一个速度比较快的poll,无论后面的数据是否被使用。
比较
LT(level triggered):LT是缺省的工作方式,并且同时支持block和no-block socket。在这种做法中,内核告诉你一个文件描述符是否就绪了,然后你可以对这个就绪的fd进行IO操作。如果你不作任何操作,内核还是会继续通知你的,所以,这种模式编程出错误可能性要小一点。传统的select/poll都是这种模型的代表。
ET(edge-triggered):ET是高速工作方式,只支持no-block socket。在这种模式下,当描述符从未就绪变为就绪时,内核通过epoll告诉你。然后它会假设你知道文件描述符已经就绪,并且不会再为那个文件描述符发送更多的就绪通知。请注意,如果一直不对这个fd作IO操作(从而导致它再次变成未就绪),内核不会发送更多的通知(only once).
实例一:基于管道epoll ET触发模式
#include <stdio.h> #include <stdlib.h> #include <sys/epoll.h> #include <errno.h> #include <unistd.h> #define MAXLINE 10 int main(int argc, char *argv[]) { int efd, i; int pfd[2]; pid_t pid; char buf[MAXLINE], ch = 'a'; pipe(pfd); pid = fork(); if (pid == 0) { close(pfd[0]); while (1) { for (i = 0; i < MAXLINE/2; i++) buf[i] = ch; buf[i-1] = '\n'; ch++; for (; i < MAXLINE; i++) buf[i] = ch; buf[i-1] = '\n'; ch++; write(pfd[1], buf, sizeof(buf)); sleep(2); } close(pfd[1]); } else if (pid > 0) { struct epoll_event event; struct epoll_event resevent[10]; int res, len; close(pfd[1]); efd = epoll_create(10); /* event.events = EPOLLIN; */ event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */ event.data.fd = pfd[0]; epoll_ctl(efd, EPOLL_CTL_ADD, pfd[0], &event); while (1) { res = epoll_wait(efd, resevent, 10, -1); printf("res %d\n", res); if (resevent[0].data.fd == pfd[0]) { len = read(pfd[0], buf, MAXLINE/2); write(STDOUT_FILENO, buf, len); } } close(pfd[0]); close(efd); } else { perror("fork"); exit(-1); } return 0; }
实例二:基于网络C/S模型的epoll ET触发模式
server
/* server.c */ #include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <signal.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/epoll.h> #include <unistd.h> #define MAXLINE 10 #define SERV_PORT 8080 int main(void) { struct sockaddr_in servaddr, cliaddr; socklen_t cliaddr_len; int listenfd, connfd; char buf[MAXLINE]; char str[INET_ADDRSTRLEN]; int i, efd; listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); listen(listenfd, 20); struct epoll_event event; struct epoll_event resevent[10]; int res, len; efd = epoll_create(10); event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */ printf("Accepting connections ...\n"); cliaddr_len = sizeof(cliaddr); connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len); printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port)); event.data.fd = connfd; epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event); while (1) { res = epoll_wait(efd, resevent, 10, -1); printf("res %d\n", res); if (resevent[0].data.fd == connfd) { len = read(connfd, buf, MAXLINE/2); write(STDOUT_FILENO, buf, len); } } return 0; }
client
/* client.c */ #include <stdio.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #define MAXLINE 10 #define SERV_PORT 8080 int main(int argc, char *argv[]) { struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd, i; char ch = 'a'; sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT); connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); while (1) { for (i = 0; i < MAXLINE/2; i++) buf[i] = ch; buf[i-1] = '\n'; ch++; for (; i < MAXLINE; i++) buf[i] = ch; buf[i-1] = '\n'; ch++; write(sockfd, buf, sizeof(buf)); sleep(10); } Close(sockfd); return 0; }
实例三:基于网络C/S非阻塞模型的epoll ET触发模式
server
/* server.c */ #include <stdio.h> #include <string.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/wait.h> #include <sys/types.h> #include <sys/epoll.h> #include <unistd.h> #include <fcntl.h> #define MAXLINE 10 #define SERV_PORT 8080 int main(void) { struct sockaddr_in servaddr, cliaddr; socklen_t cliaddr_len; int listenfd, connfd; char buf[MAXLINE]; char str[INET_ADDRSTRLEN]; int i, efd, flag; listenfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; servaddr.sin_addr.s_addr = htonl(INADDR_ANY); servaddr.sin_port = htons(SERV_PORT); bind(listenfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); listen(listenfd, 20); struct epoll_event event; struct epoll_event resevent[10]; int res, len; efd = epoll_create(10); /* event.events = EPOLLIN; */ event.events = EPOLLIN | EPOLLET; /* ET 边沿触发 ,默认是水平触发 */ printf("Accepting connections ...\n"); cliaddr_len = sizeof(cliaddr); connfd = accept(listenfd, (struct sockaddr *)&cliaddr, &cliaddr_len); printf("received from %s at PORT %d\n", inet_ntop(AF_INET, &cliaddr.sin_addr, str, sizeof(str)), ntohs(cliaddr.sin_port)); flag = fcntl(connfd, F_GETFL); flag |= O_NONBLOCK; fcntl(connfd, F_SETFL, flag); event.data.fd = connfd; epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &event); while (1) { printf("epoll_wait begin\n"); res = epoll_wait(efd, resevent, 10, -1); printf("epoll_wait end res %d\n", res); if (resevent[0].data.fd == connfd) { while ((len = read(connfd, buf, MAXLINE/2)) > 0) write(STDOUT_FILENO, buf, len); } } return 0; }
client:
/* client.c */ #include <stdio.h> #include <string.h> #include <unistd.h> #include <netinet/in.h> #define MAXLINE 10 #define SERV_PORT 8080 int main(int argc, char *argv[]) { struct sockaddr_in servaddr; char buf[MAXLINE]; int sockfd, i; char ch = 'a'; sockfd = socket(AF_INET, SOCK_STREAM, 0); bzero(&servaddr, sizeof(servaddr)); servaddr.sin_family = AF_INET; inet_pton(AF_INET, "127.0.0.1", &servaddr.sin_addr); servaddr.sin_port = htons(SERV_PORT); connect(sockfd, (struct sockaddr *)&servaddr, sizeof(servaddr)); while (1) { for (i = 0; i < MAXLINE/2; i++) buf[i] = ch; buf[i-1] = '\n'; ch++; for (; i < MAXLINE; i++) buf[i] = ch; buf[i-1] = '\n'; ch++; write(sockfd, buf, sizeof(buf)); sleep(10); } Close(sockfd); return 0; }
epoll反应堆
弄懂了前面的 ET + NONBLOCK 轮询 模型,下来我们看一下epoll 反应堆模型。
实际上是多了个参数的使用:: ET + NONBLOCK 轮询 + void *ptr
在前面的epoll_ctl函数中,我们在 event 参数中并没有使用 void *ptr。
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event)
struct epoll_event {
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
typedef union epoll_data {
void *ptr; //到现在还没有使用这个。
int fd; // 和 epoll_ctl的第三个参数一样
uint32_t u32; //不用
uint64_t u64; //不用
} epoll_data_t;
ptr 的使用: 一般是弄一个struct 来完成这个,这个struct一般要包含:fd,回调函数,也就是当条件满足的时候,调用那个函数。
这块主要是学习一下源码 libevent.c,要能理清大致流程
ET+非阻塞IO 以及 epoll 反应堆 对照图
注意:所有的回调函数都是linux系统帮你调用了,因此,你只需要准确的设置这个回调函数,在该回调的时候,系统会帮您回调