轻量级web并发服务器——TinyWebServer的学习了解
- 前言
- TinyWebServer是什么
- 用户如何与服务器进行通信
- 代码架构
- I/O多路复用
- HTTP——HTTP连接与请求响应
- ThreadPool——线程池
- CGIMysql——数据库连接池
- Timer——定时器模块
- Lock——信号同步机制封装
- Log——日志系统
- 主要参考文章
前言
本文旨在学习该项目的同时对其代码、原理等内容有更深的理解,学习过程中借鉴大量网上文章,如理解存在不当之处或有所遗漏欠缺,还望各位大佬提点指教
部分图片来自网络
TinyWebServer是什么
WebServer是什么
一个WebServer指的是一个服务器程序或者运行该服务器程序的硬件,其主要功能是通过http协议与客户端(通常是浏览器)进行通信,能够接收、存储、处理来自客户端的http请求,并对其作出一定的响应,返回客户端请求的内容或返回一个Error信息
TinyWebServer是什么
TinyWebServer是一个在Linux操作系统下的轻量级web服务器,能够实现以下几种功能:
- 使用线程池 + 非阻塞socket + epoll + 事件处理的并发模型
- 使用状态机解析http请求报文,支持解析POST和GET请求
- 访问服务器数据库实现web端用户注册、登录功能,可以请求服务器图片和视频文件
- 可以实现上万的并发连接数据交换(Webbench测试)
相关基础知识
需要对Linux编程、网络编程有一定了解
书籍推荐:《深入理解计算机系统》、《Unix网络编程》、《Linux高性能服务器编程》
项目中一些相关知识会在对应模块内提到
用户如何与服务器进行通信
用户通常使用web浏览器与服务器进行通信,web浏览器则通过将用户输入的域名解析得到对应的ip地址,通过TCP协议的三次握手建立与目标web服务器的连接,之后HTTP协议生成http请求报文发送到目标web服务器上,服务器则使用socket监听来自用户的请求。关于socket建立连接方面的内容可以通过我之前的文章进行一定的了解socket实现简单的文件传输
当服务器处理一个http请求的时候,还需要继续监听其他用户的请求并为其分配另一逻辑单元用来处理,即并发(后面会提到线程池并发)。在该项目中,服务器使用epoll这种多路I/O复用技术来实现对监听socket和连接socket的同时监听
注意:I/O复用可以同时监听多个文件描述符,但其本身是阻塞的,并且当有多个文件描述符同时就绪的时候,如不采取额外措施则程序顺序处理其中就绪的每个文件描述符
因此,为了提高效率,项目中使用了线程池来实现多线程并发,为每个就绪的文件分配一个逻辑单元(线程)来处理
代码架构
该项目中的代码架构如下
接下来我将分模块进行学习理解
I/O多路复用
I/O模型
Linux提供了五种I/O处理模型(详见《Unix网络编程》):
- 同步阻塞I/O
- 需要阻塞调用线程等待数据到来
- 需要阻塞等待数据从内核态拷贝到用户态
若服务器端采用单线程,当accept一个请求后,在recv或send调用阻塞时,无法accept其他请求,无法处理并发
若服务器端采用多线程,当accept一个请求后,开启线程进行recv,可以完成并发处理,但随请求数增加需要增加系统线程,占用大量内存空间,且线程切换会带来很大的开销
- 同步非阻塞I/O
- 调用线程不需要等待数据到来,但需要不断查询数据到来等待线程同步
- 需要阻塞等待数据从内核态拷贝到用户态
服务器端accept一个请求后,加入fds集合(一般为数组),每次轮询一遍fds集合recv数据(非阻塞),没有数据立即返回错误。轮询操作会浪费大量不必要的CPU资源
- 同步I/O多路复用
- 目前用的最多的I/O模型
- 同阻塞同步I/O,但等待的是多个文件描述符的数据
服务器端对一组文件描述符进行相关事件的注册(fd列表),采用单线程通过select/poll/epoll等系统调用获取fd列表,遍历有事件的fd进行accept/recv/send,使其能支持更多的并发连接请求
- 信号驱动I/O
- 利用信号机制让调用线程不用阻塞等待数据到来
- 需要阻塞等待数据从内核态拷贝到用户态
在网络编程中,与socket相关的读写事件太多,无法在信号对应处理函数中区分产生该信号的事件。只适合在I/O事件单一情况下使用,例如监听端口的socket
- 异步I/O
- 不需要阻塞等待数据到来,信号通知调用线程获取数据
- 不需要阻塞等待数据拷贝,内核自动完成拷贝过程
在Linux下的异步I/O是不完善的,aio系列函数是由POSIX定义的异步操作接口,不是真正的操作系统级别支持的,而是在用户空间模拟出来的异步,且仅支持基于本地文件的aio异步操作,网络编程中的socket是不支持的
在windows里实现了一套完整的支持socket的异步编程接口IOCP,是由操作系统级别实现的异步I/O
综合以上几种I/O模型的优缺点,目前Linux的高性能服务器
什么是I/O多路复用
I/O多路复用是一种同步I/O模型,实现一个线程可以监视多个文件句柄;一旦某个文件句柄就绪,就能够通知应用程序进行相应的读写操作;没有文件句柄就绪时会阻塞应用程序,交出CPU
- 多路指的是网络连接
- 复用指的是同一个线程复用
简单来说,I/O多路复用就是一种时分复用,在同一个线程中,通过类似切换开关的方式来宏观上同时传输多个I/O流
I/O多路复用的三种实现方式
select
//select函数接口 #include <sys/select.h> #include <sys/time.h> #define FD_SETSIZE 1024 #define NFDBITS (8 * sizeof(unsigned long)) #define __FDSET_LONGS (FD_SETSIZE/NFDBITS) // 数据结构 (bitmap) typedef struct { unsigned long fds_bits[__FDSET_LONGS]; } fd_set; // API int select( int max_fd, fd_set *readset, fd_set *writeset, fd_set *exceptset, struct timeval *timeout ) // 返回值就绪描述符的数目 FD_ZERO(int fd, fd_set* fds) // 清空集合 FD_SET(int fd, fd_set* fds) // 将给定的描述符加入集合 FD_ISSET(int fd, fd_set* fds) // 判断指定描述符是否在集合中 FD_CLR(int fd, fd_set* fds) // 将给定的描述符从文件中删除
select 实现多路复用的方式是,将已连接的 Socket 都放到一个文件描述符集合,然后调用 select 函数将文件描述符集合拷贝到内核里,让内核来检查是否有网络事件产生。检查的方式很粗暴,就是通过遍历文件描述符集合的方式,当检查到有事件产生后,将此 Socket 标记为可读或可写, 接着再把整个文件描述符集合拷贝回用户态里,然后用户态还需要再通过遍历的方法找到可读或可写的 Socket,然后再对其处理
所以,对于 select 这种方式,需要进行 2 次「遍历」文件描述符集合,一次是在内核态里,一次是在用户态里 ,而且还会发生 2 次「拷贝」文件描述符集合,先从用户空间传入内核空间,由内核修改后,再传出到用户空间中
select将监听的文件描述符分为三组,每一组监听不同的需要进行的IO操作。readfds是需要进行读操作的文件描述符,writefds是需要进行写操作的文件描述符,exceptfds是需要进行异常事件处理的文件描述符。这三个参数可以用NULL来表示对应的事件不需要监听。当select返回时,每组文件描述符会被select过滤,只留下可以进行对应IO操作的文件描述符
select的调用会阻塞到有文件描述符可以进行IO操作或被信号打断或者超时才会返回(条件触发)
select的缺点:
- select 使用固定长度的 BitsMap表示文件描述符集合,单个进程所打开的fd是有限制的,通过FD_SETSIZE设置,默认1024
- 每次调用select,都要把fd集合从用户态拷贝到内核态,在多个fd时开销较大
- 对socket扫描时是线性扫描,采用轮询的方式,效率较低(高并发时)
poll
#include <poll.h> // 数据结构 struct pollfd { int fd; // 需要监视的文件描述符 short events; // 需要内核监视的事件 short revents; // 实际发生的事件 }; // API int poll(struct pollfd fds[], nfds_t nfds, int timeout);
和select用三组文件描述符不同的是,poll只有一个pollfd数组,数组中的每个元素都表示一个需要监听IO操作事件的文件描述符。events参数是我们需要关心的事件,revents是所有内核监测到的事件。poll也不再用 BitsMap 来存储所关注的文件描述符,取而代之用动态数组,以链表形式来组织,突破了 select 的文件描述符个数限制,当然还会受到系统文件描述符限制
但是 poll 和 select 并没有太大的本质区别,都是使用「线性结构」存储进程关注的 Socket 集合,因此都需要遍历文件描述符集合来找到可读或可写的 Socket,时间复杂度为 O(n),而且也需要在用户态与内核态之间拷贝文件描述符集合,这种方式随着并发数上来,性能的损耗会呈指数级增长
poll的缺点:
- 每次调用poll,都需要把fd集合从用户态拷贝到内核态,多个fd时开销很大
- 对socket扫描时是线性扫描,采用轮询的方式,效率较低(高并发时)
epoll
#include <sys/epoll.h> // 数据结构 // 每一个epoll对象都有一个独立的eventpoll结构体 // 用于存放通过epoll_ctl方法向epoll对象中添加进来的事件 // epoll_wait检查是否有事件发生时,只需要检查eventpoll对象中的rdlist双链表中是否有epitem元素即可 struct eventpoll { /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/ struct rb_root rbr; /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/ struct list_head rdlist; }; // API int epoll_create(int size); // 内核中间加一个 ep 对象,把所有需要监听的 socket 都放到 ep 对象中 int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event); // epoll_ctl 负责把 socket 增加、删除到内核红黑树 int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);// epoll_wait 负责检测可读队列,没有可读 socket 则阻塞进程
以上两种方式没有解决需要多次在用户态和内核态切换造成大量数据开销和轮询扫描socket导致效率低的问题,而epoll通过两种方面很好地解决了以上问题:
- epoll在内核中使用红黑树来跟踪进程所有待检测的文件描述符,把需要监控的socket通过epoll_ctl()函数加入内核中的红黑树里,通过对红黑树进行操作,每次只需要传入一个待检测的socket,减少了内核和用户空间大量的数据拷贝和内存分配
- epoll使用事件驱动的机制,内核里维护了一个链表“rdlist”来记录事件,当某个socket有事件发生时,通过回调函数内核会将其加入到这个就绪事件列表中,当用户调用 epoll_wait() 函数时,只会返回有事件发生的文件描述符的个数,不需要轮询扫描整个 socket 集合,大大提高了检测的效率
- 程序可能随时调用epoll_ctl添加监视socket,也可能随时删除。当删除时,若该socket已经存放在就绪列表中,它也应该被移除。因此就绪列表应是一种能够快速插入和删除的数据结构,epoll选择了双向链表来实现就绪队列
- epoll将“维护监视队列”和“进程阻塞”分离,也意味着需要有个数据结构来保存监视的socket,至少要方便的添加和移除,还要便于搜索,以避免重复添加,epoll选择了效率较好的红黑树作为索引结构
- 因为操作系统要兼顾多种功能,以及有更多需要保存的数据,rdlist并非直接引用socket,而是通过epitem间接引用,红黑树的节点也是epitem对象;同理,文件系统也非直接引用socket
epoll的缺点:
- epoll在内核态维护文件描述符集合,每次添加文件描述符需要执行一个系统调用,在有多个短期活跃连接的情况下,epoll执行效率较低
epoll与LT/ET
epoll的三大函数
- 创建epoll函数
#include <sys / epoll.h> int epoll_create(int size) size:最大监听的fd+1 return:成功返回文件描述符fd;失败返回-1,可根据错误码判断错误类型
创建一个epoll的句柄eventpoll,会占用一个fd值,在linux下查看“/proc/进程id/fd/”能够看到该fd,因此在使用完epoll后需要调用close()关闭,否则可能导致fd耗尽
自从Linux2.6.8版本以后,size值只需要保证大于0,因为内核可以动态的分配大小,不需要size这个提示了
在linux 2.6.27中加入了epoll_create1(int flag)
flag为0时表示与epoll_create()完全一样;
flag = EPOLL_CLOEXEC,创建的epfd会设置FD_CLOEXEC;
flag = EPOLL_NONBLOCK,创建的epfd会设置为非阻塞
- epoll事件的注册函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event* event) epfd:epoll_create()返回的epoll fd op:操作值 fd:需要监听的fd event:需要监听的事件 return:成功返回0;失败返回-1,可根据错误码判断错误类型
- epoll_ctl()中操作数op的三种操作类型
- EPOLL_CTL_ADD:注册目标fd到epoll fd中,同时关联event到fd上
- EPOLL_CTL_MOD:修改已经注册到fd的监听事件
- EPOLL_CTL_DEL:从epoll fd中删除/移除已注册的fd
- epoll_ctl()中事件event的枚举如下:
- EPOLLIN:表示关联的fd可以进行读操作
- EPOLLOUT:表示关联的fd可以进行写操作
- EPOLLRDHUP:表示socket关闭了连接(Linux2.6.17后上层只需通过EPOLLRDHUP判断对端是否关闭socket,减少一次系统调用)
- EPOLLPRI:表示关联的fd有紧急优先事件可以进行读操作
- EPOLLERR:表示关联的fd发生了错误,epoll_wait会一直等待这个事件,一般无需设置该属性
- EPOLLHUP:表示关联的fd挂起,epoll_wait会一直等待这个事件,一般无需设置该属性
- EPOLLET:设置关联的fd为ET的工作方式,epoll默认的工作方式是LT
- EPOLLONESHOT:设置关联的fd为one-shot的工作方式,表示只监听一次事件,如果要再次监听,则需再次把该socket放入epoll队列中
当socket接收到数据后,中断程序会给eventpoll的就绪列表“rdlist”添加socket引用,而不是直接唤醒进程
- epoll等待事件函数
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout) epfd:epoll描述符 events:分配好的 epoll_event结构体数组,epoll将会把发生的事件复制到 events数组中(events不可以是空指针,内核只负责把数据复制到这个 events数组中,不会去帮助我们在用户态中分配内存。内核这种做法效率很高) maxevents:本次可以返回的最大事件数目,通常与预分配的events数组的大小是相等的 timeout:在没有检测到事件发生时最多等待的时间(单位为毫秒),如果 timeout为0,则表示 epoll_wait在 rdllist链表中为空,立刻返回,不会等待 return:成功返回需要处理的事件数目,返回0表示已超时;失败则返回-1,可以根据错误码判断错误类型
当程序执行到epoll_wait时,如果rdlist已经引用了socket,那么epoll_wait直接返回,如果rdlist为空,阻塞进程
当socket接收到数据,中断程序一方面修改rdlist,另一方面唤醒eventpoll等待队列中的进程,进程通过rdlist知道哪些socket发生了变化而无需轮询socket列表
ET(边缘触发)模式和LT(水平触发)模式
- 边缘触发(edge-triggered)
- socket的接收缓冲区状态变化时触发读事件,即空的接收缓冲区刚接收到数据时触发读事件
- socket的发送缓冲区状态变化时触发写事件,即满的缓冲区刚空出空间时触发读事件
- epoll_ctl()的events = EPOLLIN | EPOLLET 或 events = EPOLLOUT | EPOLLET 表示ET模式
- 使用边缘触发模式时,当被监控的 Socket 描述符上有可读事件发生时,服务器端只会从 epoll_wait 中苏醒一次,即使进程没有调用 read 函数从内核读取数据,也依然只苏醒一次,因此我们程序要保证一次性将内核缓冲区的数据读取完或者遇到EAGAIN错误,需要设置socket描述符为非阻塞套接字
- 事件触发(level-triggered)
- socket接收缓冲区不为空,有数据可读,则读事件一直触发
- socket发送缓冲区不满,可以继续写入数据,则写事件一直触发
- epoll_ctl()的events = EPOLLIN | EPOLLLT 或 events = EPOLLOUT | EPOLLLT 表示ET模式
- 使用水平触发模式时,当被监控的 Socket 上有可读事件发生时,服务器端不断地从 epoll_wait 中苏醒,直到内核缓冲区数据被 read 函数读完才结束,epoll默认采用LT模式工作(select和poll只有LT模式)
- ET的处理过程
- accept一个一个连接,添加到epoll中监听EPOLLIN|EPOLLOUT事件
- 当EPOLLIN事件到达时,read fd中的数据并处理,read需要一直读,直到返回EAGAIN为止
- 当需要写出数据时,把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
- 当EPOLLOUT事件到达时,继续把数据write到fd中,直到数据全部写完,或者write返回EAGAIN
- LT的处理过程
- accept一个连接,添加到epoll中监听EPOLLIN事件
- 当EPOLLIN事件到达时,read fd中的数据并处理
- 当需要写出数据时,把数据write到fd中;如果数据较大,无法一次性写出,那么在epoll中监听EPOLLOUT事件
- 当EPOLLOUT事件到达时,继续把数据write到fd中;如果数据写出完毕,那么在epoll中关闭EPOLLOUT事件
ET模式的要求是需要一直读写,直到返回EAGAIN,否则就会遗漏事件;LT的并不要求读写到返回EAGAIN为止,但通常会读写到返回EAGAIN,并且LT比ET多了一个开关EPOLLOUT的步骤
ET模式在某些场景下更加高效,但另一方面容易遗漏事件,容易产生bug
对于nginx这种高性能服务器,ET模式是很好的,而其他的通用网络库,很多是使用LT,避免使用的过程中出现bug
epoll的常用框架
for( ; ; ) { nfds = epoll_wait(epfd,events,20,500); for(i=0;i<nfds;++i) { if(events[i].data.fd==listenfd) //有新的连接 { connfd = accept(listenfd,(sockaddr *)&clientaddr, &clilen); //accept这个连接 ev.data.fd=connfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_ADD,connfd,&ev); //将新的fd添加到epoll的监听队列中 } else if( events[i].events&EPOLLIN ) //接收到数据,读socket { n = read(sockfd, line, MAXLINE)) < 0 //读 ev.data.ptr = md; //md为自定义类型,添加数据 ev.events=EPOLLOUT|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev);//修改标识符,等待下一个循环时发送数据,异步处理的精髓 } else if(events[i].events&EPOLLOUT) //有数据待发送,写socket { struct myepoll_data* md = (myepoll_data*)events[i].data.ptr; //取数据 sockfd = md->fd; send( sockfd, md->ptr, strlen((char*)md->ptr), 0 ); //发送数据 ev.data.fd=sockfd; ev.events=EPOLLIN|EPOLLET; epoll_ctl(epfd,EPOLL_CTL_MOD,sockfd,&ev); //修改标识符,等待下一个循环时接收数据 } else { //其他的处理 } } }
HTTP——HTTP连接与请求响应
在readme文档中提到,该类通过主从状态机封装了http连接类,主状态机在内部调用从状态机,从状态机将处理状态和数据传给主状态机
有限状态机
有限状态机(Finite_state machine,FSM),又称有限状态自动机,简称状态机,是表示有限个状态以及在这些状态之间的转移和动作等行为的数学模型,其作用主要是描述对象在它的生命周期内所经历的状态序列,以及如何响应来自外界的各种事件,在计算机科学中,有限状态机被广泛运用于建模、硬件电路系统设计、软件工程、编译器、网络协议等
有限状态机主要有三个特征:
- 状态总数是有限的
- 任意时刻只处在一种状态之中
- 某种条件下,会从一个状态转变到另一个状态
http模块中的主从状态机解析
http报文的结构如下图所示
以一个具体的报文为例
其中"POST"为请求方法,“/v3/cloudconf”为URL,“HTTP/1.1”为协议版本,之后到空行前的为请求头,空行后的为请求包体
头文件中分别定义了主状态机的三种状态和从状态机的三种状态
主状态机的状态表明当前正在处理请求报文的哪一部分
从状态机的状态表明对请求报文当前部分的处理是否出现问题
请求报文的解析
当webserver的线程池有空闲线程时,某一线程调用process()来完成请求报文的解析及响应
void http_conn::process() { HTTP_CODE read_ret = process_read(); if (read_ret == NO_REQUEST) //表示请求不完整,需要继续接收请求数据 { modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode); //注册并监听读事件 return; } bool write_ret = process_write(read_ret); //调用process_write完成报文响应 if (!write_ret) { close_connect(); } modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode); //注册并监听写事件 }
主状态机的状态转换使用process_read()封装,从状态机则用parse_line()封装
process_read()函数中,主状态机初始化从状态机,然后通过while循环实现主从状态机的状态转换以及循环处理报文内容。从状态机负责解析指定报文内容,并根据解析结果更改从状态机的状态;主状态机根据从状态机的返回值判断是否退出循环(终止处理/结束处理),并根据从状态机的驱动更改自身状态
主状态机与从状态机的状态转换及其关系如下图所示
从状态机
在HTTP报文中,每一行的数据由“\r”、“\n”作为结束字符,空行则是仅仅是字符“\r”、“\n”。因此,可以通过查找“\r”、“\n”将报文拆解成单独的行进行解析。从状态机负责读取buffer中的数据,将每行数据末尾的“\r”、“\n”符号改为“\0”,并更新从状态机在buffer中读取的位置m_checked_idx,以此来驱动主状态机解析
//从状态机,用于分析出一行内容 //返回值为行的读取状态,有LINE_OK,LINE_BAD,LINE_OPEN http_conn::LINE_STATUS http_conn::parse_line() { char temp; for (; m_checked_idx < m_read_idx; ++m_checked_idx) //m_read_idx指向缓冲区m_read_buf的数据末尾的下一个字节 //m_checked_idx指向从状态机目前正在分析的字节 { temp = m_read_buf[m_checked_idx]; //temp:将要分析的字节 if (temp == '\r') // \r有可能是完整行 { if ((m_checked_idx + 1) == m_read_idx) //该行仍有内容,并未读完 return LINE_OPEN; else if (m_read_buf[m_checked_idx + 1] == '\n') //出现换行符,说明该行读完 { m_read_buf[m_checked_idx++] = '\0'; // \r、\n都改为结束符\0 m_read_buf[m_checked_idx++] = '\0'; return LINE_OK; } return LINE_BAD; } else if (temp == '\n') { if (m_checked_idx > 1 && m_read_buf[m_checked_idx - 1] == '\r') //前一个字符是\r,则接收完整 { m_read_buf[m_checked_idx - 1] = '\0'; m_read_buf[m_checked_idx++] = '\0'; return LINE_OK; } return LINE_BAD; } } return LINE_OPEN; //未发现换行符,说明读取的行不完整 } /* LINE_OK:完整读取一行 LINE_BAD:报文语法有误 LINE_OPEN:读取的行不完整 */
主状态机
主状态机初始状态为CHECK_STATE_REQUESTLINE,通过调用从状态机来驱动主状态机。在主状态机解析前,从状态机已经将每一行末尾的“\r”、“\n”符号改为“\0”,以便主状态机直接取出对应字符串进行处理
为了避免用户名和密码直接暴露在url中,项目中改用了POST请求,将用户名和密码添加在报文中作为消息体进行了封装
而在POST请求报文中,消息体的末尾没有任何字符,不能使用从状态机的状态作为主状态机的while判断条件,因此在process_read()中额外添加了使用主状态机的状态进行判断的条件
解析完消息体后,报文的完整解析就完成了,但主状态机的状态还是CHECK_STATE_CONTENT,符合循环条件会再次进入循环,因此增加了“line_status == LINE_OK”并在完成消息体解析后将该变量更改为LNE_OPEN,此时可以跳出循环完成报文解析任务
//通过while循环,封装主状态机,对每一行进行循环处理 //此时,从状态机已经修改完毕,主状态机可以取出完整的行进行解析 http_conn::HTTP_CODE http_conn::process_read() { LINE_STATUS line_status = LINE_OK; //初始化从状态机的状态 HTTP_CODE ret = NO_REQUEST; char* text = 0; //判断条件,从状态机驱动主状态机 while ((m_check_state == CHECK_STATE_CONTENT && line_status == LINE_OK) || ((line_status = parse_line()) == LINE_OK)) { text = get_line(); m_start_line = m_checked_idx; //m_start_line:每一个数据行在m_read_buf中的起始位置 //m_checked_idx:从状态机在m_read_buf中的读取位置 LOG_INFO("%s", text); switch (m_check_state) //三种状态转换逻辑 { case CHECK_STATE_REQUESTLINE: //正在分析请求行 { ret = parse_request_line(text); //解析请求行 if (ret == BAD_REQUEST) return BAD_REQUEST; break; } case CHECK_STATE_HEADER: //正在分析头部字段 { ret = parse_headers(text); //解析请求头 if (ret == BAD_REQUEST) return BAD_REQUEST; else if (ret == GET_REQUEST) //get请求,需要跳转到报文响应函数 { return do_request(); //响应客户请求 } break; } case CHECK_STATE_CONTENT: //解析消息体 { ret = parse_content(text); if (ret == GET_REQUEST) //post请求,跳转到报文响应函数 return do_request(); line_status = LINE_OPEN; //更新,跳出循环,代表解析完了消息体 break; } default: return INTERNAL_ERROR; } } return NO_REQUEST; }
解析请求行
- 主状态机所处状态:CHECK_STATE_REQUESTLINE
- 解析函数从m_read_buf中解析HTTP请求行,获得请求方法、目标以及HTTP版本号
- 解析完成后主状态机的状态变为CHECK_STATE_HEADER
//解析http请求行,获得请求方法,目标url及http版本号 http_conn::HTTP_CODE http_conn::parse_request_line(char* text) { m_url = strpbrk(text, " \t"); //请求该行中最先含有空格和\t任一字符的位置并返回 if (!m_url) //没有目标字符,则代表报文格式有问题 { return BAD_REQUEST; } *m_url++ = '\0'; //将前面的数据取出,后移找到请求资源的第一个字符 char* method = text; if (strcasecmp(method, "GET") == 0) //确定请求方式 m_method = GET; else if (strcasecmp(method, "POST") == 0) { m_method = POST; cgi = 1; } else return BAD_REQUEST; m_url += strspn(m_url, " \t"); //得到url地址 m_version = strpbrk(m_url, " \t"); if (!m_version) return BAD_REQUEST; *m_version++ = '\0'; m_version += strspn(m_version, " \t"); //得到http版本号 if (strcasecmp(m_version, "HTTP/1.1") != 0) return BAD_REQUEST; //只接受HTTP/1.1版本 if (strncasecmp(m_url, "http://", 7) == 0) { m_url += 7; m_url = strchr(m_url, '/'); } if (strncasecmp(m_url, "https://", 8) == 0) { m_url += 8; m_url = strchr(m_url, '/'); } if (!m_url || m_url[0] != '/') //不符合规则的报文 return BAD_REQUEST; //当url为/时,显示判断界面 if (strlen(m_url) == 1) //url为/,显示欢迎界面 strcat(m_url, "judge.html"); m_check_state = CHECK_STATE_HEADER; //主状态机状态转移 return NO_REQUEST; }
解析请求头
- 主状态机所处状态:CHECK_STATE_HEADER
- 判断是空行还是请求头
- 是空行,则判断content-length是否为0
- 不为零,则是POST请求
- 为零,则是GET请求
- 是请求头,则主要分析connection、content-length等字段
- connection字段判断连接类型是长连接还是短连接
- content-length用于读取POST请求的消息体长度
- 是空行,则判断content-length是否为0
//解析http请求的一个头部信息 http_connect::HTTP_CODE http_connect::parse_headers(char* text) { if (text[0] == '\0') //判断是空头还是请求头 { if (m_content_length != 0) //具体判断是get请求还是post请求 { m_check_state = CHECK_STATE_CONTENT; //post请求需要改变主状态机的状态 return NO_REQUEST; } return GET_REQUEST; } else if (strncasecmp(text, "Connection:", 11) == 0) //解析头部连接字段 { text += 11; text += strspn(text, " \t"); if (strcasecmp(text, "keep-alive") == 0) //判断是否为长连接 { m_linger = true; //为长连接,设置延迟关闭连接 } } else if (strncasecmp(text, "Content-length:", 15) == 0) //解析请求头的内容长度字段 { text += 15; text += strspn(text, " \t"); m_content_length = atol(text); //atol(const char*str):将str所指的字符串转换为一个long int的长整数 } else if (strncasecmp(text, "Host:", 5) == 0) //解析请求头部host字段 { text += 5; text += strspn(text, " \t"); m_host = text; } else { LOG_INFO("oop!unknow header: %s", text); } return NO_REQUEST; }
解析消息体
- 主状态机所处状态:CHECK_STATE_CONTENT
- 仅用于解析POST请求
- 用于保存POST请求消息体,为登录和注册做准备
//判断http请求是否被完整读入 http_connect::HTTP_CODE http_connect::parse_content(char* text) { if (m_read_idx >= (m_content_length + m_checked_idx)) { text[m_content_length] = '\0'; //POST请求中最后为输入的用户名和密码 m_string = text; return GET_REQUEST; } return NO_REQUEST; }
请求报文的响应
在完成请求报文的解析后,明确用户想要登录/注册,需要跳转到相应的界面、添加用户名、验证用户等等,并将相应的数据写入相应报文返回给浏览器,具体流程图如下(图片取自微信公众号两猿社):
在头文件中根据HTTP请求的处理结果初始化了几种情形
NO_REQUEST
- 请求不完整,需要继续读取请求报文数据
- 跳转主线程继续监测读事件
GET_REQUEST
- 获得了完整的HTTP请求
- 调用do_request完成请求资源映射
BAD_REQUEST
- HTTP请求报文有语法错误或请求资源为目录
- 跳转process_write完成响应报文
NO_RESOURCE
- 请求资源不存在
- 跳转process_write完成响应报文
FORBIDDEN_REQUEST
- 请求资源禁止访问,没有读取权限
- 跳转process_write完成响应报文
FILE_REQUEST
- 请求资源可以正常访问
- 跳转process_write完成响应报文
INTERNAL_ERROR
- 服务器内部错误,该结果在主状态机逻辑switch的default下,一般不会触发
CLOSED_CONNECTION
- 客户端关闭连接
do_request函数
在process_read()中完成请求报文的解析后,状态机调用do_request()函数,该函数负责处理功能逻辑,具体做法为:将网站根目录与url文件拼接,然后通过stat判断文件属性。浏览器网址栏中的字符,即url,可以将其抽象成ip:port/xxx,xxx通过html文件的action属性进行设置。另外为了提高访问速度,通过mmap进行映射,将普通文件映射到内存逻辑地址。
//对客户请求进行响应 http_connect::HTTP_CODE http_connect::do_request() { strcpy(m_real_file, doc_root); //将初始化的m_real_file赋值为网站根目录 int len = strlen(doc_root); //printf("m_url:%s\n", m_url); const char* p = strrchr(m_url, '/'); //找到m_url中“/”的位置 //处理cgi if (cgi == 1 && (*(p + 1) == '2' || *(p + 1) == '3')) { //根据标志判断是登录检测还是注册检测 char flag = m_url[1]; char* m_url_real = (char*)malloc(sizeof(char) * 200); strcpy(m_url_real, "/"); strcat(m_url_real, m_url + 2); strncpy(m_real_file + len, m_url_real, FILENAME_LEN - len - 1); free(m_url_real); //将用户名和密码提取出来 //user=123 password=123 char name[100], password[100]; int i; for (i = 5; m_string[i] != '&'; ++i) name[i - 5] = m_string[i]; name[i - 5] = '\0'; int j = 0; for (i = i + 10; m_string[i] != '\0'; ++i, ++j) password[j] = m_string[i]; password[j] = '\0'; if (*(p + 1) == '3') { //如果是注册,先检测数据库中是否有重名的 //没有重名的,进行增加数据 char* sql_insert = (char*)malloc(sizeof(char) * 200); strcpy(sql_insert, "INSERT INTO user(username, passwd) VALUES("); strcat(sql_insert, "'"); strcat(sql_insert, name); strcat(sql_insert, "', '"); strcat(sql_insert, password); strcat(sql_insert, "')"); if (users.find(name) == users.end()) //说明库中没有重名 { m_lock.lock(); int res = mysql_query(mysql, sql_insert); users.insert(pair<string, string>(name, password)); m_lock.unlock(); if (!res) strcpy(m_url, "/log.html"); else strcpy(m_url, "/registerError.html"); } else strcpy(m_url, "/registerError.html"); } //如果是登录,直接判断 //若浏览器端输入的用户名和密码在表中可以查找到,返回1,否则返回0 else if (*(p + 1) == '2') { if (users.find(name) != users.end() && users[name] == password) strcpy(m_url, "/welcome.html"); else strcpy(m_url, "/logError.html"); } } if (*(p + 1) == '0') //如果请求资源为/0,表示跳转注册界面 { char* m_url_real = (char*)malloc(sizeof(char) * 200); strcpy(m_url_real, "/register.html"); strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); //将网站目录和/register.html进行拼接,更新到m_real_file中 free(m_url_real); } else if (*(p + 1) == '1') //如果请求资源为/1,表示跳转登录页面 { char* m_url_real = (char*)malloc(sizeof(char) * 200); strcpy(m_url_real, "/log.html"); strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); //将网站目录和/log.html进行拼接,更新到m_real_file中 free(m_url_real); } else if (*(p + 1) == '5') { char* m_url_real = (char*)malloc(sizeof(char) * 200); strcpy(m_url_real, "/picture.html"); strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else if (*(p + 1) == '6') { char* m_url_real = (char*)malloc(sizeof(char) * 200); strcpy(m_url_real, "/video.html"); strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else if (*(p + 1) == '7') { char* m_url_real = (char*)malloc(sizeof(char) * 200); strcpy(m_url_real, "/fans.html"); strncpy(m_real_file + len, m_url_real, strlen(m_url_real)); free(m_url_real); } else strncpy(m_real_file + len, m_url, FILENAME_LEN - len - 1); if (stat(m_real_file, &m_file_stat) < 0) //通过stat获取请求资源文件信息,成功则将信息更新到m_file_stat结构体;失败返回NO_RESOURCE状态,表示资源不存在 return NO_RESOURCE; if (!(m_file_stat.st_mode & S_IROTH)) //判断文件类型,客户端是否有访问权限 return FORBIDDEN_REQUEST; if (S_ISDIR(m_file_stat.st_mode)) //判断该路径是否为目录 return BAD_REQUEST; //os.open(file,flags[,mode]):打开一个文件 int fd = open(m_real_file, O_RDONLY); //以只读方式获取文件描述符,通过mmap将该文件映射到内存中 m_file_address = (char*)mmap(0, m_file_stat.st_size, PROT_READ, MAP_PRIVATE, fd, 0); close(fd); //避免文件描述符的浪费和占用 return FILE_REQUEST; //表示请求文件存在且可以访问 } /* open(file,flags[,mode])中flags的参数 O_RDONLY: 以只读的方式打开 O_WRONLY: 以只写的方式打开 O_RDWR : 以读写的方式打开 O_NONBLOCK: 打开时不阻塞 O_APPEND: 以追加的方式打开 O_CREAT: 创建并打开一个新文件 O_TRUNC: 打开一个文件并截断它的长度为零(必须有写权限) O_EXCL: 如果指定的文件存在,返回错误 O_SHLOCK: 自动获取共享锁 O_EXLOCK: 自动获取独立锁 O_DIRECT: 消除或减少缓存效果 O_FSYNC : 同步写入 O_NOFOLLOW: 不追踪软链接 */
- stat函数用于取得指定文件的文件属性,并将文件属性存储在结构体stat里
#include <sys/types.h> #include <sys/stat.h> #include <unistd.h> //获取文件属性,存储在statbuf中 int stat(const char *pathname, struct stat *statbuf); struct stat { mode_t st_mode; /* 文件类型和权限 */ off_t st_size; /* 文件大小,字节数*/ };
- mmap用于将文件等映射到内存,提高访问速度
void* mmap(void* start,size_t length,int prot,int flags,int fd,off_t offset); int munmap(void* start,size_t length); /* start:映射区的开始地址,设置为0时表示由系统决定映射区的起始地址 length:映射区的长度,从被映射文件开头offset个字节算起 prot:期望的内存保护标志,不能与文件的打开模式冲突,可取以下几个值的或:PROT_READ(可读), PROT_WRITE(可写), PROT_EXEC(可执行), PROT_NONE(不可访问) PROT_READ表示页内容可以被读取 flags:指定映射对象的类型,映射选项和映射页是否可以共享,可以是以下几个常用值的或:MAP_FIXED(使用指定的映射起始地址), MAP_SHARED(与其它所有映射这个对象的进程共享映射空间), MAP_PRIVATE(建立一个写入时拷贝的私有映射) MAP_PEIVATE建立一个写入时拷贝的私有映射,内存区域的写入不会影响到原文件 fd:有效地文件描述符,一般是由open()函数返回 offset:被映射对象内容的起点 */
- iovec定义向量元素,通常该结构用作一个多元素的数组
struct iovec { void *iov_base; /* starting address of buffer */ size_t iov_len; /* size of buffer */ }; /* iov_base指向数据的地址 iov_len表示数据的长度 */
- writev用于在一次函数调用中写多个非连续缓冲区,有时将该函数称为聚集写。若成功则返回已写的字节数,通常等于所有缓冲区长度之和;否则返回-1
#include <sys/uio.h> ssize_t writev(int filedes, const struct iovec *iov, int iovcnt); /* filedes表示文件描述符 iov为io向量机制结构体iovec iovcnt为结构体的个数 */
特别注意: 循环调用writev时,需要重新处理iovec中的指针和长度,该函数不会对这两个成员做任何处理。writev的返回值为已写的字节数,但这个返回值“实用性”并不高,因为参数传入的是iovec数组,计量单位是iovcnt,而不是字节数,我们仍然需要通过遍历iovec来计算新的基址,另外写入数据的“结束点”可能位于一个iovec的中间某个位置,因此需要调整临界iovec的io_base和io_len
process_write函数
根据do_request的返回状态,服务器子线程调用process_write向m_write_buf中写入响应报文,在生成响应报文的过程中主要调用add_reponse()函数更新m_write_idx和m_write_buf
以下几个函数为内部调用add_response函数更新m_write_idx指针和缓冲区m_write_buf中的内容
bool http_connect::add_response(const char* format, ...) { if (m_write_idx >= WRITE_BUFFER_SIZE) //如果写入内容超出m_write_buf大小则报错 return false; va_list arg_list; //定义可变参数列表 va_start(arg_list, format); //将变量arg_list初始化为传入参数 int len = vsnprintf(m_write_buf + m_write_idx, WRITE_BUFFER_SIZE - 1 - m_write_idx, format, arg_list); //将数据format从可变参数列表写入缓冲区写,返回写入数据的长度 if (len >= (WRITE_BUFFER_SIZE - 1 - m_write_idx)) //如果写入的数据长度超过缓冲区剩余空间,则报错 { va_end(arg_list); return false; } m_write_idx += len; //更新m_write_idx位置 va_end(arg_list); //清空可变参列表 LOG_INFO("request:%s", m_write_buf); return true; }
- add_status_line函数:添加状态行(http/1.1 状态码 状态消息)
//添加状态行 bool http_connect::add_status_line(int status, const char* title) { return add_response("%s %d %s\r\n", "HTTP/1.1", status, title); }
- add_headers函数:添加消息报头,内部调用add_content_length和add_linger函数
- content-length记录响应报文长度,用于浏览器端判断服务器是否发送完数据
- connection记录连接状态,用于告诉浏览器端保持长连接
bool http_connect::add_headers(int content_len) //添加消息报头,具体的添加文本长度、连接状态和空行 { return add_content_length(content_len) && add_linger() && add_blank_line(); } bool http_connect::add_content_length(int content_len) //添加Content-Length,表示响应报文的长度 { return add_response("Content-Length:%d\r\n", content_len); } bool http_connect::add_linger() //添加连接状态,通知浏览器端是保持连接还是关闭 { return add_response("Connection:%s\r\n", (m_linger == true) ? "keep-alive" : "close"); } bool http_connect::add_content_type() //添加文本类型,这里是html { return add_response("Content-Type:%s\r\n", "text/html"); }
- add_blank_line:添加空行
bool http_connect::add_blank_line() //添加空行 { return add_response("%s", "\r\n"); }
- add_content:添加文本content
bool http_connect::add_content(const char* content) { return add_response("%s", content); }
响应报文分为两种,一种是请求文件的存在,通过io向量机制iovec,声明两个iovec,第一个指向m_write_buf,第二个指向mmap的地址m_file_address ;另一种是请求出错,这时候只申请一个iovec,指向m_write_buf
- iovec是一个结构体,里面有两个元素,指针成员iov_base指向一个缓冲区,这个缓冲区是存放的是writev将要发送的数据,成员iov_len表示实际写入的长度(详见do_request函数部分中iovec结构的描述)
往响应报文里写的是服务器中html的文件数据,浏览器端对其进行解析、渲染并显示在浏览器页面上
bool http_connect::process_write(HTTP_CODE ret) { switch (ret) { case INTERNAL_ERROR: //内部错误,500 { add_status_line(500, error_500_title); //状态行 add_headers(strlen(error_500_form)); //消息报头 if (!add_content(error_500_form)) return false; break; } case BAD_REQUEST: //报文语法有误,404 { add_status_line(404, error_404_title); add_headers(strlen(error_404_form)); if (!add_content(error_404_form)) return false; break; } case FORBIDDEN_REQUEST: //资源没有访问权限,403 { add_status_line(403, error_403_title); add_headers(strlen(error_403_form)); if (!add_content(error_403_form)) return false; break; } case FILE_REQUEST: //文件存在,200 { add_status_line(200, ok_200_title); if (m_file_stat.st_size != 0) //请求的资源存在 { add_headers(m_file_stat.st_size); m_iv[0].iov_base = m_write_buf; //第一个iovec指针指向响应报文缓冲区,长度指向m_write_idx m_iv[0].iov_len = m_write_idx; m_iv[1].iov_base = m_file_address; //第二个iovec指针指向mmap返回的文件指针,长度指向文件大小 m_iv[1].iov_len = m_file_stat.st_size; m_iv_count = 2; bytes_to_send = m_write_idx + m_file_stat.st_size; //发送的全部数据为响应报文头部信息和文件大小 return true; } else { const char* ok_string = "<html><body></body></html>"; //请求的资源大小为0,则返回空白html文件 add_headers(strlen(ok_string)); if (!add_content(ok_string)) return false; } } default: return false; } m_iv[0].iov_base = m_write_buf; //除FILE_REQUEST状态外,其余状态只申请一个iovec,指向响应报文缓冲区 m_iv[0].iov_len = m_write_idx; m_iv_count = 1; bytes_to_send = m_write_idx; return true; }
write函数
服务器子线程调用process_write完成响应报文,随后注册epollout事件。服务器主线程检测写事件,并调用http_conn::write函数将响应报文发送给浏览器端
该函数具体逻辑如下:
生成响应报文时初始化byte_to_send(包括头部信息和文件数据),通过writev函数循环发送响应报文数据,根据返回值更新byte_have_send和iovec结构体的指针和长度,并判断响应报文整体是否发送成功
- 若writev单次发送成功,更新byte_to_send和byte_have_send的大小,若响应报文整体发送成功,则取消mmap映射,并判断是否是长连接
- 长连接重置http类实例,注册读事件,不关闭连接
- 短连接直接关闭连接
- 若writev单次发送不成功,判断是否是写缓冲区满了
- 若不是因为缓冲区满了而失败,取消mmap映射,关闭连接
- 若eagain则满了,更新iovec结构体的指针和长度,并注册写事件,等待下一次写事件触发(当写缓冲区从不可写变为可写,触发epollout),因此在此期间无法立即接收到同一用户的下一请求,但可以保证连接的完整性
bool http_connect::write() { int temp = 0; if (bytes_to_send == 0) //要发送的数据长度为0,表示响应报文为空,一般不会出现该情况 { modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode); init(); return true; } while (1) { temp = writev(m_sockfd, m_iv, m_iv_count); //将响应报文的状态行、消息头、空行和响应正文发送给浏览器端 if (temp < 0) { if (errno == EAGAIN) //判断缓冲区是否已满 { modfd(m_epollfd, m_sockfd, EPOLLOUT, m_TRIGMode); //重新注册写事件,等待下一次触发 return true; } unmap(); //发送失败,但不是缓冲区问题,取消映射 return false; } bytes_have_send += temp; bytes_to_send -= temp; //更新已发送字节数 if (bytes_have_send >= m_iv[0].iov_len) //第一个iovec头部信息的数据已发送完,发送第二个iovec数据 { m_iv[0].iov_len = 0; //不再继续发送头部信息 m_iv[1].iov_base = m_file_address + (bytes_have_send - m_write_idx); m_iv[1].iov_len = bytes_to_send; } else //继续发送第一个iovec头部信息的数据 { m_iv[0].iov_base = m_write_buf + bytes_have_send; m_iv[0].iov_len = m_iv[0].iov_len - bytes_have_send; } if (bytes_to_send <= 0) //判断条件,数据已全部发送完 { unmap(); modfd(m_epollfd, m_sockfd, EPOLLIN, m_TRIGMode); //在epoll树上重置EPOLLONESHOT事件 if (m_linger) //浏览器的请求为长连接 { init(); //重新初始化HTTP对象 return true; } else { return false; } } } }
ThreadPool——线程池
池式结构
在计算机体系结构中有许多池式结构,例如对象池、数据库连接池、线程池、内存池等等。使用池式结构的主要原因是需要使用池式结构的对象一般存在创建时间长、资源占用高等特点,在对象过多的情况下或导致运行效率低下,因此使用池式结构复用有限的资源,提高运行效率
线程池
线程池其实就是将多个线程对象放到一个容器中,在该项目中使用了一个数组作为容器。在创建线程时一次性创建多个线程存放进线程池中等待执行任务;需要执行任务时从线程池中取出空闲线程执行,执行完后将线程放回线程池中等待下次执行任务;通过复用线程池中的线程来避免多次创建销毁线程导致的运行效率低下问题。
线程池的使用有效降低了多线程操作中任务申请和释放产生的性能消耗,在提高线程利用率、提高线程响应速度、统一管理线程对象以及控制最大并发数等问题上起到了很好的效果
线程池主要适用的场景
由于线程池本就是用于减少线程频繁创建销毁导致的运行效率低下问题,仅当线程本身开销与线程执行任务的开销相比不可忽略的情况下作用明显;而在例如FTP服务器或Telnet服务器上,传输文件时间较长、开销较大、线程本身开销可以忽略不计的情况下,线程池能起到的作用就不够明显,可能不是一种足够理想的方法
线程池通常适用以下几种场合:
1)单位时间内处理任务频繁且任务处理时间短
2)对实时性要求较高
线程池的本质及设计要点
本质上,线程池可以简化为一个“生产者-消费者”模型。线程池执行任务是在“消耗”资源,往线程池中增加任务是“生产”资源。
线程池类的设计主要需要考虑以下几点:
- 定义线程所需要的“任务”
- 线程所需要的“任务”指的是函数,为了让线程工作,需要向线程传递一个预先声明和定义的函数,而如果要让线程执行不同功能的函数,则需要在传递给线程的函数中调用其他的函数以实现正确的任务
- 选择合适的容器
- 对于线程,只需要使用数组就能满足要求;对于工作队列,则可能需要使用队列或者链表等
- 正确的上锁
- 生产者-消费者模型中有包括互斥锁、信号量等多种上锁模式,本项目中使用了Lock模块中的互斥锁、信号量、条件变量等实现了线程同步
proactor模型和reactor模型
Reactor模型
服务器采用线程池的方式进行资源复用,解决了频繁创建销毁线程带来的性能开销和资源浪费问题,同时也引入了一个新问题:线程如何高效处理多个连接的业务?
一个连接对应一个线程时通常采用“read -> 业务处理 -> send”的处理流程,socket默认为阻塞I/O,则当目前连接无数据可读时线程阻塞在“read”操作上,该阻塞方式不影响其他线程。但采用了线程池的话,一个线程如果在处理某个连接的“read”操作时阻塞则无法继续处理其他连接的业务
为了解决该问题,最简单的方式是将socket改为非阻塞,线程不断轮询调用“read”操作来判断有无数据。但该方法线程不知道当前连接是否有数据可读,因此需要每次通过“read”操作判断,需要消耗CPU资源且效率低下。为了解决这个问题,可以使用I/O多路复用技术实现只有在连接上有数据的时候线程才去发出读请求,将I/O多路复用技术封装之后就称为Reactor模型。在Reactor模型中主要有三类处理事件:acceptor——负责连接事件、handler——负责事件读写、reactor——负责事件监听和事件分发
Reactor模型主要由Reactor和处理资源池两个核心部分组成,其中Reactor负责监听和分发事件(包含连接事件、读写事件),处理资源池负责处理事件。Reactor的数量可以是一个或多个,处理资源池可以是单个进程/线程,也可以是多个进程/线程,因此Reactor主要分为三种方案:
- 单Reactor单进程/线程
- 该模型中reactor、acceptor和handler的功能都是由一个线程来执行的
- reactor负责事件的监听和分发
- 有连接事件发生,则reactor分发给accepter建立连接,然后创建一个handler
- 有读写事件发生,则reactor分发给handler进行处理并最终给客户端返回结果
单Reactor单进程模型无法充分利用多核CPU的性能,且handler进行业务处理时进程无法处理其他连接,如果业务处理耗时较长则造成响应延迟,不适用计算机密集型的场景,只适用于业务处理非常快速的场景
单Reactor单进程模型的经典应用场景:Redis、Netty
- 单Reactor多进程/线程
- reactor、acceptor、handler的功能由一个线程来执行
- 引入线程池,handler只负责读取请求和写回结果,具体的业务处理由线程池中的worker线程来完成
单Reactor多线程模型能够充分利用多核CPU的功能,但是因为只有一个Reactor对象承担所有事件的监听和响应,且只在主线程上运行,在面对瞬间高并发的场景时reactor容易成为性能瓶颈
单Reactor多线程模型的经典应用场景:Netty
- 多Reactor多进程/线程
- 一个主reactor线程、多个子reactor线程和多个worker线程组成的一个线程池
- 主reactor负责监听客户端事件,并在同一个线程中让acceptor处理连接事件
- 建立连接后,主reactor把连接分发给子reactor线程由其负责后续事件处理
- 子reactor让同线程的handler读取请求和返回结果,具体业务处理由worker线程完成
在多Reactor多线程模型中,主线程和子线程分工明确,主线程只负责接收新连接,子线程完成后续业务处理,子线程无需返回数据,直接将处理结果发送给客户端
多Reactor多线程模型的经典应用场景:Netty、kafka、Nginx
Proactor模型
Reactor模型是非阻塞同步网络模型,而Proactor模型是异步网络模型
阻塞I/O需要等待“内核数据准备好”和“数据从内核态拷贝到用户态”两个过程;非阻塞I/O需要等待“数据从内核态拷贝到用户态”一个过程;而异步I/O两个过程都不需要等待
Proactor模型将I/O事件的监听、I/O操作的执行、I/O结果的返回统统交给内核来执行,在数据准备阶段和数据拷贝阶段全程无阻塞。在发起异步读写请求时,需要传入数据缓冲区的地址等信息,系统内核自动完成读写工作,读写工作不需要应用进程主动发起read/write来读写数据,而是由操作系统来完成。操作系统完成读写工作后就会通知应用进程直接处理数据
在Linux下的异步I/O是不完善的,aio系列函数是由POSIX定义的异步操作接口,不是真正的操作系统级别支持的,而是在用户空间模拟出来的异步,且仅支持基于本地文件的aio异步操作,网络编程中的socket是不支持的
在windows里实现了一套完整的支持socket的异步编程接口IOCP,是由操作系统级别实现的异步I/O
Reactor模型和Proactor模型的区别
- Reactor模型是非阻塞同步网络模式,感知的是就绪可读写事件,即“来了事件操作系统通知应用进程,由应用进程处理事件”;Proactor模型是异步网络模式,感知的是已完成的读写事件,即“来了事件操作系统来处理,处理完再通知应用进程”
- 在Proactor中,用户程序需要向内核传递用户空间的读缓冲区地址;Reactor则不需要。这导致Proactor中每个并发操作都要求有独立的缓冲区,在内存上有一定的开销
- Proactor的实现逻辑复杂,编码成本相较Reactor高
- Proactor在处理高耗时I/O时性能高于Reactor;对于低耗时I/O的执行效率提升并不明显
线程池的定义
class threadpool { public: /*thread_number是线程池中线程的数量,max_requests是请求队列中最多允许的、等待处理的请求的数量*/ threadpool(int actor_model, connection_pool* connPool, int thread_number = 8, int max_request = 10000); ~threadpool(); bool append(T* request, int state); bool append_p(T* request); private: /*工作线程运行的函数,它不断从工作队列中取出任务并执行之*/ static void* worker(void* arg); void run(); private: int m_thread_number; //线程池中的线程数 int m_max_requests; //请求队列中允许的最大请求数 pthread_t* m_threads; //描述线程池的数组,其大小为m_thread_number std::list<T*> m_workqueue; //请求队列 locker m_queuelocker; //保护请求队列的互斥锁 sem m_queuestat; //是否有任务需要处理 connection_pool* m_connPool; //数据库 int m_actor_model; //模型切换 };
如果把函数看成是一个对象,则在线程池类的接口中传递不同的函数也就是传递不同的对象。如上所述,一个线程如果需要执行不同功能的函数,则需要在函数内调用其他函数来完成功能,在此处的对外接口append()引用了C++的模板编程(template),实现将不同类型的函数添加进工作队列中
template <typename T> //模板编程 bool threadpool<T>::append(T* request, int state) { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } request->m_state = state; m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true; }
线程池的创建
template <typename T> //线程池构造函数 threadpool<T>::threadpool(int actor_model, connection_pool* connPool, int thread_number, int max_requests) : m_actor_model(actor_model), m_thread_number(thread_number), m_max_requests(max_requests), m_threads(NULL), m_connPool(connPool) { if (thread_number <= 0 || max_requests <= 0) //线程池异常 throw std::exception(); m_threads = new pthread_t[m_thread_number]; if (!m_threads) throw std::exception(); for (int i = 0; i < thread_number; ++i) //循环创建线程,并将工作线程按要求进行运行 { if (pthread_create(m_threads + i, NULL, worker, this) != 0) { delete[] m_threads; throw std::exception(); } if (pthread_detach(m_threads[i])) //将线程设定为分离状态,不用单独对线程进行回收,避免内存泄露 { delete[] m_threads; throw std::exception(); } } }
其中需要注意的函数有
pthread_create(pthread_t * thread, const pthread_attr_t * attr, void* (*start_routine)(void*), void *arg) *pthread:传递一个线程的指针变量或该类型变量的地址 *attr:用于手动设置新建线程的属性,一般设置为NULL由系统默认的属性值创建线程 *(start_routine)(void*):以函数指针的方式指明新建线程需要执行的函数 *arg:指定传递给start_routine函数的实参 返回值:成功创建则返回0;失败则返回非零值,对应不同的宏表明创建失败的原因 pthread_detach(pthread_t *thread) 将已经运行中的线程设定为分离状态,成功返回0,失败返回错误号指明失败原因
linux线程有两种状态:joinable状态和unjoinable状态。如果线程是joinable状态,当线程函数自己退出不会释放线程所占用的堆栈和线程描述符等资源,只有调用了PTHREAD_JOIN()后由主线程阻塞等待子线程结束,然后回收子线程资源;如果线程是unjoinable状态(分离状态),则在子线程结束时会自动回收资源
一般情况下,使用PTHREAD_CREATE()创建线程执行任务后,线程处于joinable状态。如果没有主线程调用PTHREAD_JOIN()来回收线程资源的话,该线程会继续占用系统资源成为“僵尸线程”,为了避免该种情况,项目中使用PTHREAD_DETACH()将线程分离避免后续忘记回收线程资源
PTHREAD_DETACH()用于将运行中的线程设定为分离状态,线程主动与主控线程断开关系,线程结束后,其退出状态不由其他线程获取,而是直接自己自动释放,常应用于网络、多线程服务器
c++11引入了std::thread来创建线程,支持对线程join或者detach
std::thread t(func); if (t.joinable()) { t.detach(); }
待办工作加入请求队列
当epoll检测到端口有事件激活时,即将该事件放入请求队列中,等待工作线程处理
bool threadpool<T>::append(T* request, int state) { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } request->m_state = state; m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true; } bool threadpool<T>::append_p(T* request) //proactor模式下的任务请求入队 { m_queuelocker.lock(); if (m_workqueue.size() >= m_max_requests) { m_queuelocker.unlock(); return false; } m_workqueue.push_back(request); m_queuelocker.unlock(); m_queuestat.post(); return true; }
本项目实现的是一个基于半同步/半反应堆式的并发结构(使用同步I/O模拟Proactor事件处理模式),由I/O线程完成读写,I/O线程向工作线程分发的是已接受的数据
以Proactor模式为例的工作流程如下:
- 主线程充当异步线程,负责监听所有socket上的事件
- 有新请求到来,主线程接收以得到新的连接socket,然后往epoll内核事件表中注册该socket上的读写事件
- 如果连接socket上有读写事件发生,主线程从socket上接收数据,并将数据封装成请求对象插入到请求队列中
- 所有工作线程睡眠在请求队列上,当有任务到来时通过竞争获得任务的接管权
线程处理
创建线程池时,调用pthread_create时指向了worker()静态成员函数,worker()内部调用run()
//线程回调函数/工作函数,arg其实是this template <typename T> void* threadpool<T>::worker(void* arg) { threadpool* pool = (threadpool*)arg; //将参数强行转化为线程池类,获取threadpool对象地址 pool->run(); //线程池中每个线程创建都会调用run()睡眠在队列中 return pool; }
run()函数可以看做一个回环事件,一直等待m_queuestat.post(),即新任务进入请求队列,此时从请求队列中取出一个任务进行处理
//工作线程通过run函数不断等待任务队列有新任务,然后 加锁->取任务->解锁->执行任务 template <typename T> void threadpool<T>::run() { while (true) { m_queuestat.wait(); //信号量等待 m_queuelocker.lock(); //工作线程被唤醒后先加互斥锁 if (m_workqueue.empty()) { m_queuelocker.unlock(); continue; } T* request = m_workqueue.front(); //从请求队列中取出第一个任务 m_workqueue.pop_front(); //该任务从请求队列中弹出 m_queuelocker.unlock(); if (!request) continue; if (1 == m_actor_model) //proactor模式和reactor模式切换 { /* reactor模式:只有reactor模式下,标志位improv和timer_flag才会发挥作用 imporv:在read_once和write成功后会置1,对应request完成后置0,用于判断上一个请求是否已处理完毕 timer_flag:当http的读写失败后置1,用于判断用户连接是否异常 */ if (0 == request->m_state) //请求状态:读/写 { if (request->read_once()) { request->improv = 1; connectionRAII mysqlcon(&request->mysql, m_connPool); //从连接池获取连接 request->process(); //工作线程执行任务 } else { request->improv = 1; request->timer_flag = 1; } } else { if (request->write()) { request->improv = 1; } else { request->improv = 1; request->timer_flag = 1; } } } else { connectionRAII mysqlcon(&request->mysql, m_connPool); request->process(); } } }
CGIMysql——数据库连接池
在处理用户注册、登录请求时,我们需要保存用户的用户名和密码用于注册或登录校验。该项目采用的方式是:每一个HTTP连接获取一个数据库连接,获取其中的用户账号密码进行对比,然后再释放该数据库连接。若系统需要频繁访问数据库,则需要频繁创建和断开数据库连接,而创建数据库连接是一个很耗时的操作,也容易对数据库造成安全隐患。
在程序初始化的时候,集中创建多个数据库连接,并把他们集中管理,供程序使用,可以保证较快的数据库读写速度,更加安全可靠。在本项目中,使用单例模式和链表创建数据库连接池,实现对数据库连接资源的复用,并将数据库连接的获取与释放通过RAII机制封装,避免手动释放
项目中的数据库模块分为两部分,其一是数据库连接池的定义,其二是利用连接池完成登录和注册的校验功能。具体,工作线程从数据库连接池取得一个连接,访问数据库中的数据,访问完毕后将连接交还连接池
连接池的功能主要有:初始化、获取连接、释放连接、销毁连接池
单例模式创建
使用局部静态变量创建连接池
class connection_pool { public: MYSQL* GetConnection(); //获取数据库连接 bool ReleaseConnection(MYSQL* conn); //释放连接 int GetFreeConn(); //获取连接 void DestroyPool(); //销毁所有连接 //单例模式 static connection_pool* GetInstance(); void init(string url, string User, string PassWord, string DataBaseName, int Port, int MaxConn, int close_log); void CreateConnection(string url, string User, string PassWord, string DataBaseName, int Port, int close_log); private: connection_pool(); ~connection_pool(); int m_MaxConn; //最大连接数 int m_CurConn; //当前已使用的连接数 int m_FreeConn; //当前空闲的连接数 locker lock; list<MYSQL*> connList; //连接池 sem reserve; public: string m_url; //主机地址 string m_Port; //数据库端口号 string m_User; //登陆数据库用户名 string m_PassWord; //登陆数据库密码 string m_DatabaseName; //使用数据库名 int m_close_log; //日志开关 };
初始化
销毁连接池没有直接被外部调用,而是通过RAII机制来完成自动释放;使用信号量实现多线程争夺连接的同步机制,这里将信号量初始化为数据库的连接总数
//构造初始化 void connection_pool::init(string url, string User, string PassWord, string DBName, int Port, int MaxConn, int close_log) { m_url = url; //初始化数据库信息 m_Port = Port; m_User = User; m_PassWord = PassWord; m_DatabaseName = DBName; m_close_log = close_log; for (int i = 0; i < MaxConn; i++) //创建MaxConn条数据库连接 { MYSQL* con = NULL; con = mysql_init(con); //mysql_init(MYSQL* mysql):初始化或分配与mysql_real_connect()相适应的MYSQL对象 if (con == NULL) //初始化失败 { LOG_ERROR("MySQL Error"); exit(1); } con = mysql_real_connect(con, url.c_str(), User.c_str(), PassWord.c_str(), DBName.c_str(), Port, NULL, 0); if (con == NULL) //建立连接失败 { LOG_ERROR("MySQL Error"); exit(1); } connList.push_back(con); //更新连接池和空闲连接数量 ++m_FreeConn; } reserve = sem(m_FreeConn); //将信号量初始化为最大连接次数 m_MaxConn = m_FreeConn; }
其中值得注意的有
mysql_real_connect(MYSQL* mysql, const char* host, const char* user, const char* passwd, const char* db, unsigned int port, const char* unix_socket, unsigned long client_flag) 数据库引擎建立连接函数 mysql:定义的MYSQL变量 host:MYSQL服务器的地址,决定了连接的类型。如果"host"是NULL或字符串"localhost",连接将被视为与本地主机的连接,如果操作系统支持套接字(Unix)或命名管道(Windows),将使用它们而不是TCP/IP连接到服务器 user:登录用户名,如果“user”是NULL或空字符串"",用户将被视为当前用户,在UNIX环境下,它是当前的登录名 passwd:登录密码 db:要连接的数据库,如果db为NULL,连接会将该值设为默认的数据库 port:MYSQL服务器的TCP服务端口,如果"port"不是0,其值将用作TCP/IP连接的端口号 unix_socket:unix连接方式,如果unix_socket不是NULL,该字符串描述了应使用的套接字或命名管道 clientflag:Mysql运行为ODBC数据库的标记,一般取0 返回值:连接成功,返回连接句柄,即第一个变量mysql;连接失败,返回NULL
获取、释放连接
当线程数量大于数据库连接数量时,使用信号量进行同步,每次取出连接,信号量原子减1,释放连接原子加1,若连接池内没有连接了,则阻塞等待
另外,由于多线程操作连接池,会造成竞争,这里使用互斥锁完成同步,具体的同步机制均使用lock.h中封装好的类
//当有请求时,从数据库连接池中返回一个可用连接,更新使用和空闲连接数 MYSQL* connection_pool::GetConnection() { MYSQL* con = NULL; if (0 == connList.size()) return NULL; reserve.wait(); //取出连接,信号量原子减1,为0则等待 lock.lock(); //lock互斥锁保证同一时间只有一个线程对容器connList进行操作 con = connList.front(); //得到第一个连接 connList.pop_front(); //从连接池中弹出该连接 if (con->isClosed()) //如果连接被关闭,删除后重新建立一个 { delete con; con = this->CreateConnection(this->m_url,this->m_User,this->m_PassWord,this->m_DatabaseName,this->m_Port,this->m_close_log); //CreationConnection()实际是将之前init中for循环中的“创建一条数据库连接”操作重复一遍 } if (con == NULL) { --m_CurConn; } --m_FreeConn; ++m_CurConn; lock.unlock(); return con; } //释放当前使用的连接 bool connection_pool::ReleaseConnection(MYSQL* con) { if (NULL == con) return false; lock.lock(); connList.push_back(con); ++m_FreeConn; --m_CurConn; lock.unlock(); reserve.post(); //释放连接原子加1 return true; }
销毁连接池
通过迭代器遍历连接池链表,关闭对应数据库连接,清空链表并重置空闲连接和现有连接数量
//销毁数据库连接池 void connection_pool::DestroyPool() { lock.lock(); if (connList.size() > 0) { list<MYSQL*>::iterator it; //通过迭代器遍历,关闭数据库连接 for (it = connList.begin(); it != connList.end(); ++it) { MYSQL* con = *it; mysql_close(con); } m_CurConn = 0; m_FreeConn = 0; connList.clear(); //清空连接池 } lock.unlock(); }
RAII机制释放数据库连接
RAII(Resource Acquisition Is Initialization)是由c++之父Bjarne Stroustrup提出的,中文翻译为资源获取即初始化,指使用局部对象来管理资源的技术。这里的资源主要是指操作系统中有限的东西如内存、网络套接字等等,局部对象是指存储在栈的对象,它的生命周期是由操作系统来管理的,无需人工介入
资源的使用一般经历三个步骤a.获取资源 b.使用资源 c.销毁资源,但是资源的销毁往往是程序员经常忘记的一个环节,而C++引入了智能指针(C++11新特性之智能指针)的概念,使用了引用计数的方法,让程序员不需要关系手动释放内存。RAII则是利用了C++中一个对象出了其作用域会被自动析构的特点,在构造函数中申请空间,在析构函数中释放空间来控制资源的生命周期
//定义 class connectionRAII { public: connectionRAII(MYSQL** con, connection_pool* connPool); //双指针对MYSQL *con修改 ~connectionRAII(); private: MYSQL* conRAII; connection_pool* poolRAII; } //实现 connectionRAII::connectionRAII(MYSQL** SQL, connection_pool* connPool) { *SQL = connPool->GetConnection(); conRAII = *SQL; poolRAII = connPool; } connectionRAII::~connectionRAII() { poolRAII->ReleaseConnection(conRAII); }
RAII实际上应该是一种编程思想,将资源申请、释放等成对的操作进行封装,实现在局部域内申请资源并及时销毁,使得程序员不需在局部域中时刻观察资源是否需要释放,避免遗漏
Timer——定时器模块
基础知识
- 非活跃:指客户端与服务器建立连接后,长时间不交换数据,一直占用服务器的文件描述符,造成连接资源的浪费
- 定时事件:指固定一段时间之后触发某段代码,由该段代码处理一个事件(举例:从内核事件表删除事件,并关闭文件描述符,释放连接资源)
- 定时器:指利用结构体或其他形式,将多种定时事件进行封装起来(本项目中只涉及一种定时事件,即定时检查非活跃连接,将定时事件与连接资源封装为一个结构体定时器)
- 定时器容器:指使用某种容器类数据结构,将上述多个定时器组合起来,便于对定时事件统一管理(本项目中使用升序链表将所有定时器串联组织起来)
- 统一事件源:将信号事件与其他事件一起处理。具体地,信号处理函数使用管道将信号传递给主循环,信号处理函数往管道的写端写入信号值,主循环则从管道的读端读出信号值,使用I/O复用系统调用来监听管道读端的可读事件,这样信号事件与其他文件描述符都可以通过epoll来监测,从而实现统一处理
模块功能
本项目中,服务器主循环为每一个连接创建一个定时器,并对每个连接进行定时。另外,利用升序时间链表容器将所有定时器串联起来,若主循环接收到定时通知,则在链表中依次执行定时任务
Linux提供了三种定时的方法:
socket选项SO_RECVTIMEO和SO_SNDTIMEO
- socket选项SO_SNDTIMEO:用来设置socket发送数据的超时时间,若该时间内未接收到任何数据,则socket返回一个错误码
- socket选项SO_SNDTIMEO:用来设置socket接收数据的超时时间,若该时间内无法发送完所有数据,则发送操作失败并返回一个错误
使用SO_SNDTIMEO选项时,需要先将socket设置为非阻塞模式,否则会忽略超时设置并阻塞发送操作
通过函数setsockopt()可以设置SO_SNDTIMEO和SO_RECVTIMEO选项
int setsockopt( int socket, int level, int option_name,const void *option_value, size_t ,ption_len) socket:socket描述符 level:被设置的选项的级别。如果要在socket级别上设置选项,则必须将level设置为SOL_SOCKET
SIGALRM信号
- SIGALRM是一种处理定时器信号的信号,代表在已定义的时间间隔后发生的警告信号,可用于定时器、计时器和轮询器等
- 通常使用系统调用alarm()来设置发生SIGALRM的时间间隔
I/O复用系统调用的超时参数
- I/O复用系统调用自带的超时参数,既能统一处理信号和I/O事件,也能统一处理定时事件
- I/O复用系统调用可能在超时时间到期之前就返回(有I/O事件发生),如果要利用它们来定时,就需要不断更新定时参数以反映剩余的时间
三种方法没有一劳永逸的应用场景,没有绝对的优劣。在本项目中主要使用的是SIGALRM信号
具体地,利用alarm函数周期性地触发SIGALRM信号,信号处理函数利用管道通知主循环,主循环接收到该信号后对升序链表上所有定时器进行处理,若该段时间内没有交换数据,则将该连接关闭,释放所占用的资源。该模块主要分为两部分,其一为定时方法与信号通知流程,其二为定时器及其容器设计与定时任务的处理
基础API
SIGALRM、SIGTERM信号
- SIGALRM:由alarm系统调用产生的timer时钟信号
- SIGTERM:终端发送的终止信号
sigaction结构体、检查或修改与指定信号相关联的处理动作——sigaction()函数
struct sigaction { void (*sa_handler)(int); //函数指针,指向旧的信号处理函数 void (*sa_sigaction)(int, siginfo_t *, void *); //新的信号处理函数 sigset_t sa_mask; //信号阻塞集,指定在信号处理函数执行期间需要被屏蔽的信号 int sa_flags; //信号的处理方式 void (*sa_restorer)(void); //已弃用 } #include <signal.h> int sigaction(int signum, const struct sigaction *act, struct sigaction *oldact) signum:要操作的信号 act:要设置的对信号的新处理方式 oldact:原来对信号的处理方式 return:成功返回0,出现错误则返回-1
sa_flags指定信号的处理方式,可以是以下几种的“按位或”组合 SA_RESTART:使被信号打断的系统调用自动重新发起 SA_NOCLDSTOP:使父进程在它的子进程暂停或继续运行时不会收到 SIGCHLD 信号 SA_NOCLDWAIT:使父进程在它的子进程退出时不会收到 SIGCHLD 信号,这时子进程如果退出也不会成为僵尸进程 SA_NODEFER:使对信号的屏蔽无效,即在信号处理函数执行期间仍能发出这个信号 SA_RESETHAND:信号处理之后重新设置为默认的处理方式 SA_SIGINFO:使用 sa_sigaction 成员而不是 sa_handler 作为信号处理函数
- 初始化自定义信号集——sigfillset()函数
- 实际使用中,常用sigemptyset()将信号集清空,再使用sigaddset()想信号集中添加所需的特定信号以达到更精细的控制信号的目的
#include <signal.h> int sigfillset(sigset_t *set) //将参数set信号集初始化(信号集中所有标志位置1),然后把所有的信号加入到此信号集中 set:指向信号集合的指针 return:成功返回0,出现错误则返回-1
- 设置信号传送闹钟——alarm()函数
- 如果未设置信号SIGALRM的处理函数,则alarm()默认终止进程
#include <unisted.h> unsigned int alarm(unsigned int seconds) 设置信号SIGALRM在经过参数seconds秒数后发送给目前的进程 seconds:要设定的定时时间,以秒为单位。在alarm调用后开始计时,超过该时间将触发SIGALRM信号 return:返回当前进程之前设置的定时器剩余秒数
- 创建套接字对用于通信——socketpair()函数
- 用SOCK_STREAM建立的套接字对是管道流,建立的通道是双向的,支持全双工通信
#include <sys/types.h> #include <sys/socket.h> int socketpair(int domain, int type, int protocol, int sv[2]) //创建一对无名的、相互连接的套接字 domain:协议族,只能为PF_UNIX或者AF_UNIX type:协议,可以是SOCK_STREAM(基于TCP)或SOCK_DGRAM(基于UDP) protocol:类型,只能为0 sv[2]:套接字柄对,该两个句柄作用相同,均能进行读写双向操作 return:创建成功返回0,失败返回-1
信号处理机制
Linux下的信号采用异步处理机制,信号处理函数与当前进程是两条不同的执行路线。当进程收到信号时,操作系统会中断进程当前的正常流程,转而进入信号处理函数执行操作,完成后再返回中断的地方继续执行
为避免信号竞态现象发生,信号处理期间系统不会再次触发它。所以,为确保该信号不被屏蔽太久,信号处理函数需要尽可能快地执行完毕。一般的信号处理函数需要处理该信号对应的逻辑,当该逻辑比较复杂时,信号处理函数执行时间过长,会导致信号屏蔽太久。为了解决该问题,本项目中信号处理函数仅发送信号通知程序主循环,将信号对应的处理逻辑放在程序主循环中,由主循环执行信号对应的逻辑代码。 信号处理的流程如下图所示(图片来自微信公众号两猿社)
信号的接收
接收信号的任务不是由用户进程来完成的,而是由内核代理的。当内核接收到信号后,会将其放到对应进程的信号队列中,同时向进程发送一个中断,使其陷入内核态。注意,此时信号还只是在队列中,对进程来说暂时是不知道有信号到来的
信号的检测
进程陷入内核态后,有两种场景会对信号进行检测:
- 进程从内核态返回到用户态前进行信号检测
- 进程在内核态中,从睡眠状态被唤醒的时候进行信号检测
当发现有新信号时,便会进入下一步,信号的处理
信号的处理
信号处理函数是在用户态上的
- (内核态)调用处理函数前,内核将当前内核栈的内容备份拷贝到用户栈上,并且修改指令寄存器(eip)将其指向信号处理函数
- (用户态)进程返回到用户态中,执行信号处理函数
- (内核态)信号处理函数执行完成后,需要返回内核态检查是否有其他信号未处理
- (用户态)所有信号都处理完成,则将内核栈恢复(从步骤1中的用户栈备份拷贝回来),同时恢复指令寄存器(eip)将其指向中断前的运行位置,最后回到用户态继续执行进程
- 信号处理函数
//信号处理函数中仅仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响 void Utils::sig_handler(int sig) { //为保证函数的可重入性,保留原来的errno //可重入性表示中断后再次进入该函数,环境变量与之前相同,不会丢失数据 int save_errno = errno; int msg = sig; send(u_pipefd[1], (char*)&msg, 1, 0); //将信号值从管道写端写入,传输字符类型,而非整型 errno = save_errno; //将原来的errno赋值为当前的errno }
信号处理函数中仅通过管道发送信号值,不处理信号对应的逻辑,缩短异步执行时间,减少对主程序的影响
//设置信号函数 void Utils::addsig(int sig, void(handler)(int), bool restart) //项目中设置信号函数,仅关注SIGTERM和SIGALRM两个信号 { struct sigaction sa; //创建sigaction结构体变量 memset(&sa, '\0', sizeof(sa)); sa.sa_handler = handler; //信号处理函数中仅仅发送信号值,不做对应逻辑处理 if (restart) sa.sa_flags |= SA_RESTART; sigfillset(&sa.sa_mask); //将所有信号添加到信号集中 assert(sigaction(sig, &sa, NULL) != -1); //执行sigaction函数 } 项目中设置信号函数,仅关注SIGTERM和AIGALRM两个信号
信号通知逻辑
- 创建管道,其中管道写端写入信号值,管道读端通过I/O复用系统监测读事件
- 设置信号处理函数SIGALRM(时间到了触发)和SIGTERM(kill会触发)
- 通过struct sigaction结构体和sigaction函数注册信号捕捉函数
- 在结构体的handler参数设置信号处理函数,具体地,从管道写端写入信号的名字
- 利用I/O复用系统监听管道读端文件描述符的可读事件
- 信息值传递给主循环,主循环再根据接收到的信号值执行目标信号对应的逻辑代码
定时器设计
定时器类的定义
项目中将连接资源、定时事件和超时时间封装为定时类
- 连接资源包括客户端套接字地址、文件描述符和定时器
- 定时事件为回调函数,将其封装起来由用户自定义,这里是删除非活动socket上的注册事件,并关闭
- 定时器超时时间 = 浏览器和服务器连接时刻 + 固定时间(TIMESLOT),这里定时器使用绝对时间作为超时值
//连接资源结构体成员需要用到定时器类,需要前向声明 class util_timer; struct client_data //开辟用户socket结构 对应于最大处理fd { sockaddr_in address; //客户端socket地址 int sockfd; //socket文件描述符 util_timer* timer; //定时器 }; class util_timer //定时器类 { public: util_timer() : prev(NULL), next(NULL) {} public: time_t expire; //超时时间 void (*cb_func)(client_data*); //回调函数 client_data* user_data; //连接资源 util_timer* prev; //前向定时器 util_timer* next; //后继定时器 };
定时事件,具体地,从内核事件表删除事件,关闭文件描述符,释放连接资源
//定时器回调函数 void cb_func(client_data* user_data) { epoll_ctl(Utils::u_epollfd, EPOLL_CTL_DEL, user_data->sockfd, 0); //删除非活动连接在socket上的注册事件 assert(user_data); close(user_data->sockfd); //关闭文件描述符 http_conn::m_user_count--; //减少连接数 }
定时器的创建与销毁
该项目的定时器采用升序双向链表作为容器,具体地,为每个连接创建一个定时器,将其添加到链表中,并按照超时时间升序排序。在本项目中,定时器的创建与销毁采用了RAII思想,在构造函数与析构函数中完成定时器容器的创建与销毁
sort_timer_lst::sort_timer_lst() { head = NULL; tail = NULL; } sort_timer_lst::~sort_timer_lst() //常规销毁链表 { util_timer* tmp = head; while (tmp) { head = tmp->next; delete tmp; tmp = head; } }
添加定时任务
添加定时任务即将新连接的定时器添加到链表中。若当前链表中只有头尾结点(链表为空),则直接使头尾结点指向该定时器
- 若新的定时器(当前正在添加到链表中的定时器)的超时时间小于当前头部结点,则直接将新的定时器结点作为头部结点
void sort_timer_lst::add_timer(util_timer* timer) //添加定时器,内部调用私有成员add_timer { if (!timer) { return; } if (!head) { head = tail = timer; return; } if (timer->expire < head->expire) //如果新的定时器超时时间小于当前头部结点,则直接将当前定时器结点作为头部结点 { timer->next = head; head->prev = timer; head = timer; return; } add_timer(timer, head); //否则调用私有成员,调整内部结点 }
- 若新的定时器结点的超时时间大于等于当前头部结点,则需要插入到链表中,采用双向链表的插入操作
//私有成员,被公有成员add_timer和adjust_time调用,主要用于调整链表内部结点 void sort_timer_lst::add_timer(util_timer* timer, util_timer* lst_head) //主要用于调整链表内部结点 { util_timer* prev = lst_head; util_timer* tmp = prev->next; while (tmp) //遍历当前结点之后的链表,按照超时时间找到目标定时器对应的位置,常规双向链表插入操作 { if (timer->expire < tmp->expire) { prev->next = timer; timer->next = tmp; tmp->prev = timer; timer->prev = prev; break; } prev = tmp; tmp = tmp->next; } if (!tmp) //遍历完发现,目标定时器需要放到尾结点处 { prev->next = timer; timer->prev = prev; timer->next = NULL; tail = timer; } }
任务超时时间调整
当定时任务发生变化,调整对应定时器在链表中的位置
- 客户端在设定时间内有数据收发,则当前时刻对该定时器重新设定时间
- 被调整的目标定时器在尾部,或定时器新的超时时间仍然小于下一个定时器的超时,则不用调整;否则先将定时器从链表取出,重新插入链表
void sort_timer_lst::adjust_timer(util_timer* timer) //调整定时器,任务发生变化时,调整定时器在链表中的位置 { if (!timer) { return; } util_timer* tmp = timer->next; if (!tmp || (timer->expire < tmp->expire)) { return; } if (timer == head) //被调整定时器是链表头结点,将定时器取出,重新插入 { head = head->next; head->prev = NULL; timer->next = NULL; add_timer(timer, head); } else //被调整定时器在内部,将定时器取出,重新插入 { timer->prev->next = timer->next; timer->next->prev = timer->prev; add_timer(timer, timer->next); } }
删除定时任务
定时器超时,则将其从链表中删除
void sort_timer_lst::del_timer(util_timer* timer) //删除定时器 { if (!timer) { return; } if ((timer == head) && (timer == tail)) //链表中只有一个定时器,需要删除该定时器 { delete timer; head = NULL; tail = NULL; return; } if (timer == head) //被删除的定时器为头结点 { head = head->next; head->prev = NULL; delete timer; return; } if (timer == tail) //被删除的定时器为尾结点 { tail = tail->prev; tail->next = NULL; delete timer; return; } timer->prev->next = timer->next; //被删除的定时器在链表内部,常规链表结点删除 timer->next->prev = timer->prev; delete timer; }
定时任务处理函数
使用统一事件源,SIGALRM信号每次被触发,主循环中调用一次定时任务处理函数,处理链表容器中到期的定时器
具体逻辑如下:
- 遍历定时器升序链表容器,从头结点开始依次处理每个定时器,直到遇到尚未到期的定时器
- 若当前时间小于定时器超时时间,跳出循环,即未找到到期的定时器;若当前时间大于定时器超时时间,即找到了到期的定时器,执行回调函数,然后将它从链表中删除,然后继续遍历
void sort_timer_lst::tick() { if (!head) { return; } time_t cur = time(NULL); //获取当前时间 util_timer* tmp = head; while (tmp) //遍历定时器链表 { if (cur < tmp->expire) //链表容器为升序排列,当前时间小于定时器的超时时间,后面的定时器也没有到期 { break; } tmp->cb_func(tmp->user_data); //当前定时器到期,则调用回调函数,执行定时事件 head = tmp->next; //将处理后的定时器从链表容器中删除,并重置头结点 if (head) { head->prev = NULL; } delete tmp; tmp = head; } }
定时器的使用(webserver.cpp)
- 客户端与服务器连接时,创建该连接对应的定时器,并将定时器添加到链表上
void WebServer::timer(int connfd, struct sockaddr_in client_address) { users[connfd].init(connfd, client_address, m_root, m_CONNTrigmode, m_close_log, m_user, m_passWord, m_databaseName); //初始化client_data数据 //创建定时器,设置回调函数和超时时间,绑定用户数据,将定时器添加到链表中 users_timer[connfd].address = client_address; users_timer[connfd].sockfd = connfd; util_timer *timer = new util_timer; //创建定时器 timer->user_data = &users_timer[connfd]; //绑定用户数据 timer->cb_func = cb_func; //设置回调函数 time_t cur = time(NULL); timer->expire = cur + 3 * TIMESLOT; //设置超时时间 users_timer[connfd].timer = timer; utils.m_timer_lst.add_timer(timer); //添加定时器到链表中 }
- 处理异常事件时,执行定时事件,服务器关闭连接,并将定时器从链表中移除
void WebServer::eventLoop() { bool timeout = false; //超时默认为false bool stop_server = false; while (!stop_server) { int number = epoll_wait(m_epollfd, events, MAX_EVENT_NUMBER, -1); //监测发生事件的文件描述符 if (number < 0 && errno != EINTR) { LOG_ERROR("%s", "epoll failure"); break; } for (int i = 0; i < number; i++) //轮询事件描述符 { int sockfd = events[i].data.fd; //处理新到的客户连接 if (sockfd == m_listenfd) { bool flag = dealclinetdata(); if (false == flag) continue; } else if (events[i].events & (EPOLLRDHUP | EPOLLHUP | EPOLLERR)) //处理异常事件 { //服务器端关闭连接,移除对应的定时器 util_timer* timer = users_timer[sockfd].timer; deal_timer(timer, sockfd); } //处理定时器信号 else if ((sockfd == m_pipefd[0]) && (events[i].events & EPOLLIN)) //管道读端对应文件描述符发生读事件 { bool flag = dealwithsignal(timeout, stop_server); if (false == flag) LOG_ERROR("%s", "dealclientdata failure"); } //处理客户连接上接收到的数据 else if (events[i].events & EPOLLIN) { dealwithread(sockfd); } else if (events[i].events & EPOLLOUT) { dealwithwrite(sockfd); } } if (timeout) //处理定时器为非必须事件,收到信号并不是立马处理,而是完成读写事件后再进行处理 { utils.timer_handler(); LOG_INFO("%s", "timer tick"); timeout = false; } } }
其中,处理定时器信号部分中的dealwithsignal()内容如下
bool WebServer::dealwithsignal(bool& timeout, bool& stop_server) { int ret = 0; int sig; char signals[1024]; ret = recv(m_pipefd[0], signals, sizeof(signals), 0); //从管道读端读出信号值,成功返回字节数,失败返回-1 if (ret == -1) { return false; } else if (ret == 0) { return false; } else { for (int i = 0; i < ret; ++i) { switch (signals[i]) { case SIGALRM: //接收到SIGALRM信号,timeout设置为true { timeout = true; break; } case SIGTERM: //接收到SIGTERM信号,终止进程运行,stop_server设置为true { stop_server = true; break; } } } } return true; }
处理客户连接上接收到的数据,其函数内容为
void WebServer::dealwithread(int sockfd) { util_timer* timer = users_timer[sockfd].timer; //reactor if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } //若监测到读事件,将该事件放入请求队列 m_pool->append(users + sockfd, 0); while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { //proactor if (users[sockfd].read_once()) { LOG_INFO("deal with the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); //若监测到读事件,将该事件放入请求队列 m_pool->append_p(users + sockfd); if (timer) { adjust_timer(timer); //有数据传输,则将定时器向后延迟3个单位,并对新的定时器在链表上的位置进行调整 } } else { deal_timer(timer, sockfd); //服务器端关闭连接,移除对应的定时器 } } } void WebServer::dealwithwrite(int sockfd) { util_timer* timer = users_timer[sockfd].timer; //reactor if (1 == m_actormodel) { if (timer) { adjust_timer(timer); } m_pool->append(users + sockfd, 1); while (true) { if (1 == users[sockfd].improv) { if (1 == users[sockfd].timer_flag) { deal_timer(timer, sockfd); users[sockfd].timer_flag = 0; } users[sockfd].improv = 0; break; } } } else { //proactor if (users[sockfd].write()) { LOG_INFO("send data to the client(%s)", inet_ntoa(users[sockfd].get_address()->sin_addr)); if (timer) { adjust_timer(timer); //有数据传输,则将定时器向后延迟3个单位,并对新的定时器在链表上的位置进行调整 } } else { deal_timer(timer, sockfd); //服务器端关闭连接,移除对应的定时器 } } }
Lock——信号同步机制封装
竟态
竟态指多个进程或线程在对共享资源进行读写时,由于对共享资源的访问顺序不确定或事件差等因素导致的错误行为。具体来说,当两个或多个进程或线程试图同时访问同一共享资源时,它们之间可能会产生竟态,从而导致程序出现不可预测的结果
竟态是一种非常难以调试和解决的问题,因为竟态通常只在特定的时间和特定的运行环境下才会出现,因此可能需要多次运行程序才能重现问题。为避免竟态,需要使用各种同步机制,例如互斥锁、条件变量、信号量等,来保护共享资源的访问,确保只有一个进程或线程能够访问该资源
信号量——sem类
信号量是一种特殊的变量,它只能取自然数并且只支持两种操作,P(wait)和V(signal)
- P 操作会使得信号量的值减 1,如果此时信号量的值小于 0,则调用进程或线程会被阻塞,等待其他进程或线程对信号量进行 V 操作,使得信号量的值大于 0,此时阻塞的进程或线程才能继续执行
- V 操作会使得信号量的值加 1,如果此时信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个进程或线程
信号量的取值可以是任何自然数,根据初始值的不同可以分为两类:
- 二进制信号量:指初始值为 1 的信号量,此类信号量只有 1 和 0 两个值,通常用来替代互斥锁实现线程同步
- 计数信号量:指初始值大于 1 的信号量,当进程中存在多个线程,但某公共资源允许同时访问的线程数量是有限的,这时就可以用计数信号量来限制同时访问资源的线程数量
根据使用场景的不同,信号量也可以分为两类:
- 无名信号量:也被称作基于内存的信号量,只可以在共享内存的情况下,比如实现进程中各个线程之间的互斥和同步
- 命名信号量:通常用于不共享内存的情况下,比如进程间通信
在本项目中主要实现的是线程同步,只需要使用无名信号量,主要使用以下几个函数
- 初始化信号量sem_init()
include <semaphore.h> int sem_init(sem_t *sem,int pshared,unsignedint value) //初始化一个信号量 sem:指向要初始化的信号量的指针 pshared:指定信号量的共享方式。如果值为 0,则信号量将在进程内部共享。如果值为非 0,则信号量可以在不同进程之间共享,需要使用共享内存 value:指定信号量的初值 return:成功则返回0,否则返回-1
- 销毁信号量sem_destory()
include <semaphore.h> int sem_destroy(sem_t *sem) //销毁一个信号量 sem:指向要销毁的信号量的指针 return:成功返回0,否则返回-1
- 对信号量进行P操作sem_wait()
include <semaphore.h> int sem_wait(sem_t *sem) //对信号量进行 P 操作,如果信号量的值小于等于 0,则会阻塞当前线程 sem:指向要操作的信号量的指针 return:成功返回0,否则返回-1
- 对信号进行V操作sem_post()
include <semaphore.h> int sem_post(sem_t *sem) //对信号量进行 V 操作,如果信号量的值小于等于 0,则会唤醒阻塞在该信号量上的某个线程 sem:指向要操作的信号量的指针 return:成功返回0,否则返回-1
互斥锁——locker类
互斥锁,也称互斥量,可以保护关键代码段,以确保独占式访问.当进入关键代码段,获得互斥锁将其加锁;离开关键代码段,唤醒等待该互斥锁的线程
互斥量是一种用于保护临界区的同步机制,可以确保同一时刻只有一个线程访问共享资源。当一个线程访问共享资源时,需要先获取互斥量的锁,其他线程需要等待该锁释放才能继续执行
互斥量不是为了消除竞争,实际上,资源还是共享的,线程间也还是竞争的,只不过通过这种“锁”机制就将共享资源的访问变成互斥操作,也就是说一个线程操作这个资源时,其它线程无法操作它,从而消除与时间有关的错误。但是,这种锁机制不是强制的,互斥锁实质上是操作系统提供的一把“建议锁”(又称“协同锁”),建议程序中有多线程访问共享资源的时候使用该机制
因此,即使有了mutex,其它线程如果不按照这种锁机制来访问共享数据的话,依然会造成数据混乱。所以为了避免这种情况,所有访问该共享资源的线程必须采用相同的锁机制
- 初始化互斥量pthread_mutex_init()
include <pthread.h> int pthread_mutex_init(pthread_mutex_t *mutex, const pthread_mutexattr_t *attr) //初始化互斥量 mutex:指向要初始化的互斥量的指针 attr:指向互斥量属性对象的指针,通常设置为 NULL return:成功则返回0,否则返回一个正整数的错误码
- 销毁互斥量pthread_mutex_destroy()
include <pthread.h> int pthread_mutex_destroy(pthread_mutex_t *mutex) //销毁互斥量 mutex:指向要销毁的互斥量的指针 return:成功则返回0,否则返回一个正整数的错误码
- 给互斥锁加锁pthread_mutex_lock()
include <pthread.h> int pthread_mutex_lock(pthread_mutex_t *mutex) //给互斥量加锁,如果互斥量已经被锁住,则阻塞当前线程,直到互斥量被解锁 mutex:指向要加锁的互斥量的指针 return:成功则返回0,否则返回一个正整数的错误码
- 解锁互斥量pthread_mutex_unlock()
include <pthread.h> int pthread_mutex_unlock(pthread_mutex_t *mutex) //解锁互斥量,如果有等待该互斥量的线程,则唤醒其中的一个线程 mutex:指向要加锁的互斥量的指针 return:成功则返回0,否则返回一个正整数的错误码
条件变量——cond类
条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等待这个共享数据的线程
条件变量利用线程间共享的全局变量进行同步,主要包括两个动作:一个线程等待条件变量的条件成立而挂起;另一个线程使条件成立(给出条件成立信号)。为了防止竞争,条件变量的使用总是和一个互斥量结合在一起
- 初始化条件变量pthread_cond_init()
include <pthread.h> int pthread_cond_init(pthread_cond_t *cond, const pthread_condattr_t *attr) //初始化条件变量对象,设置相关属性 cond:指向条件变量对象的指针 attr:指向线程条件属性对象的指针。一般为 NULL return:成功则返回0,失败返回错误号
- 销毁条件变量对象pthread_cond_destory()
include <pthread.h> int pthread_cond_destroy(pthread_cond_t *cond) //销毁条件变量对象,释放资源 cond:指向条件变量对象的指针 return:成功则返回0,失败返回错误号
- 唤醒线程pthread_cond_broadcast()
- 函数以广播的方式唤醒所有等待目标条件变量的线程
include <pthread.h> int pthread_cond_broadcast(pthread_cond_t *cond) //唤醒所有在条件变量上等待的线程 cond:指向条件变量对象的指针 return:成功则返回0,失败返回错误号
- 阻塞线程pthread_cond_wait()
- 函数执行时,先把调用线程放入条件变量的请求队列,然后将互斥锁mutex解锁,当函数成功返回为0时,互斥锁会再次被锁上. 也就是说函数内部会有一次解锁和加锁操作
include <pthread.h> int pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex) //让当前线程阻塞在条件变量上等待唤醒 cond:指向条件变量对象的指针 mutex:指向互斥锁对象的指针,用于保护条件变量 return:成功则返回0,失败返回错误号
- 发送信号通知线程pthread_cond_signal()
- pthread_cond_signal 只会通知一个等待该条件变量的线程,如果有多个线程在等待,则只有一个线程会收到通知,其余线程还会继续等待,直到下一次收到信号
- 必须在已经获得与条件变量相关的互斥锁之后才能调用该函数
- 如果没有等待该条件变量的线程,调用该函数也不会产生任何作用
include <pthread.h> int pthread_cond_signal(pthread_cond_t *cond) cond:指向条件变量的指针 return:成功则返回0,失败返回错误号
锁机制的功能
锁机制用来实现多线程同步,通过锁机制,确保任一时刻只能有一个线程能进入关键代码段
为便于实现同步类的RAII机制,该项目在pthread库的基础上进行了封装,实现了类似于C++11的mutex标准库功能
Log——日志系统
日志系统分为两部分,一部分是单例模式和阻塞队列的定义;另一部分是日志类的定义与使用
基础知识
- 日志
- 由服务器自动创建,并记录运行状态,错误信息,访问数据的文件
- 同步日志与异步日志
同步日志:日志写入函数与工作线程串行执行,由于涉及到I/O操作,当单条日志比较大的时候,同步模式会阻塞整个处理流程,服务器所能处理的并发能力将有所下降,尤其是在峰值的时候,写日志可能成为系统的瓶颈
异步日志:将所写的日志内容先存入阻塞队列中,写线程从阻塞队列中取出内容,写入日志
- 生产者-消费者模型
- 并发编程中的经典模型
- 以多线程为例,为了实现线程间数据同步,生产者线程与消费者线程共享一个缓冲区,其中生产者线程往缓冲区中push消息,消费者线程从缓冲区中pop消息
- 阻塞队列
- 将生产者-消费者模型进行封装,使用循环数组实现队列,作为两者共享的缓冲区
- 单例模式
- 最常用的设计模式之一,保证一个类仅有一个实例,并提供一个访问它的全局访问点,该实例被所有程序模块共享
同步日志与异步日志的比较
- 同步日志
- 优点
- 代码简单 同步日志是在主线程中直接输出日志,代码相对简单。不需要处理线程间同步和异步问题
- 日志输出顺序可控 同步日志是按照输入的顺序直接输出,因此输出的顺序比较可控,可以保证日志的顺序
- 传统且易于调试 同步日志是较传统的方式,易于排错和调试
- 缺点
- 性能问题 由于同步日志是在主线程中直接输出,因此它会阻塞该线程,增加了主线程I/O等待的时间,可能会导致性能问题
- 安全问题 如果在写日志时程序崩溃或被强制关闭,会导致日志数据丢失
- 时间上的依赖 同步日志的输出时间与输入时间相同,这可能会对程序的性能造成负面影响
- 适用范围
- 同步日志适用于需要简单、稳定且扩展性不是很高的场景,如一些小型程序或者进行调试的场景
- 优点
- 异步日志
- 优点
- 时间上的独立 异步日志是把日志数据缓存到内存或磁盘中,由专门的线程来将缓存的数据输出到日志文件或其他存储介质中,这样就可以将日志输入的时间与日志输出时间分开,并且不会被I/O等待时间影响,提高了日志输入的效率
- 数据安全 异步日志通过将日志数据缓存到内存或磁盘中来避免了数据丢失。即使程序突然崩溃或被强制关闭,尚未输出的日志数据也已被保存,避免了数据丢失
- 减少I/O负担 异步日志通过将日志数据缓存到内存或磁盘中来避免了大量的I/O操作,从而减轻了I/O操作对系统的负担,提高了程序的性能
- 缺点
- 多线程同步 异步日志需要多线程共同完成逻辑,如果线程同步不好处理,就可能出现竞争条件和死锁等问题
- 日志顺序问题 由于异步日志是异步输出的,因此无法保证日志输出的顺序与其输入的顺序一致。这可能会在某些情况下引起问题
- 代码复杂性 实现异步日志通常需要使用多线程编程、线程同步等技术,从而增加了代码的复杂性和难度
- 适用范围
- 异步日志适用于需要大量日志输出和较高的日志处理速度的场景,如高并发的网络服务器应用程序
- 优点
单例模式
- 实现思路
- 私有化它的构造函数,以防止外界创建单例类的对象
- 使用类的私有静态指针变量指向类的唯一实例
- 用一个公有的静态方法获取该实例
- 两种实现方式
懒汉模式:在第一次被使用时才初始化(延迟初始化)
class Singleton { private: static Singleton* instance; //私有化静态指针指向唯一实例 private: Singleton() {}; ~Singleton() {}; public: static Singleton* getInstance(); //公有静态方法获取实例 }; Singleton* Singleton::getInstance() { if(instance == NULL) instance = new Singleton(); return instance; } // init static member Singleton* Singleton::instance = NULL;
懒汉模式存在内存泄露的问题,可以通过使用智能指针或使用静态的嵌套类对象来解决。以下是使用静态嵌套类对象的例子
class Singleton { private: static Singleton* instance; private: Singleton() { }; ~Singleton() { }; private: class Deletor { public: ~Deletor() { if(Singleton::instance != NULL) delete Singleton::instance; } }; static Deletor deletor; public: static Singleton* getInstance(); }; Singleton* Singleton::getInstance() { if(instance == NULL) instance = new Singleton(); return instance; } // init static member Singleton::Deletor Singleton::deletor; Singleton* Singleton::instance = NULL;
懒汉模式还有个问题是在多线程环境下第一次初始化过程中会出现线程安全问题,为了解决该问题可以使用互斥锁,同时为了避免每次初始化过程都加锁而使用双检测锁(Double Check Lock,DCL)
class Singleton { private: static Singleton* instance; //私有化静态指针指向唯一实例 static pthread_mutex_t lock; //静态锁,静态函数只能访问静态成员 private: Singleton() {}; ~Singleton() {}; public: static Singleton* getInstance() //公有静态方法获取实例 }; Singleton* Singleton::instance = NULL; Singleton* Singleton::getInstance(){ if(instance == NULL) { pthread_mutex_lock(&lock); if(NULL == instance) { instance == new Singleton(); } pthread_mutex_unlock(&lock); } return instance;
C++中规定了local static在多线程条件下的初始化行为,要求编译器保证了内部静态变量的线程安全性。在C++11标准下,《Effective C++》提出了一种更优雅的单例模式实现,使用函数内的 local static 对象,这种方法不用加锁和解锁操作。这样,只有当第一次访问getInstance()方法时才创建实例。这种方法也被称为Meyers’ Singleton
class Singleton { private: Singleton() { }; ~Singleton() { }; public: static Singleton& getInstance(); }; Singleton::Singleton::getInstance() { static Singleton instance; return instance; }
饿汉模式:在程序运行时立即初始化
饿汉模式不需要用锁,就可以实现线程安全。原因在于,在程序运行时就定义了对象,并对其初始化。之后,不管哪个线程调用成员函数getinstance(),都只不过是返回一个对象的指针而已。所以是线程安全的,不需要在获取实例的成员函数中加锁
class Singleton{ private: static Singleton* instance; single(){} ~single(){} public: static Singleton* getinstance(); }; Singleton* Singleton::instance = new Singleton(); Singleton* Singleton::getInstance(){ return instance; } //测试方法 int main(){ Singleton *p1 = Singleton::getInstance(); Singleton *p2 = Singleton::getInstance(); if (p1 == p2) cout << "same" << endl; system("pause"); return 0; }
但是饿汉模式存在一个潜在问题:no-local static对象(函数外的static对象)在不同编译单元中的初始化顺序是未定义的,即“ static Singleton instance ”和“ static Singleton* getInstance() ”二者的初始化顺序不确定,如果在初始化完成之前调用 getInstance() 方法会返回一个未定义的实例
- 单例模式的优点
- 在内存里只有一个实例,减少了内存的开销,尤其是频繁的创建和销毁实例
- 避免对资源的多重占用
- 单例模式的缺点
- 没有接口,不能继承,与单一职责原则冲突,一个类应该只关心内部逻辑,而不关心外面怎么样来实例化
阻塞队列
条件变量与生产者-消费者模型
在Lock模块中提到,条件变量提供了一种线程间的通知机制,当某个共享数据达到某个值时,唤醒等待该共享数据的线程
其中pthread_cond_wait()的使用方式如下
pthread _mutex_lock(&mutex) //加锁避免因为多线程访问导致的资源竞争 while(线程执行的条件是否成立){ pthread_cond_wait(&cond, &mutex); } pthread_mutex_unlock(&mutex);
pthread_cond_wait执行后的内部操作分为以下几步:
- 将线程放在条件变量的请求队列后,内部解锁(满足执行条件,线程阻塞自身释放公共资源)
- 线程等待被pthread_cond_broadcast信号唤醒或者pthread_cond_signal信号唤醒,唤醒后去竞争锁
- 线程若竞争到互斥锁,内部再次加锁
一般来说,在多线程资源竞争的时候,在一个使用资源的线程里面(消费者)判断资源是否可用,不可用,便调用pthread_cond_wait,在另一个线程里面(生产者)如果判断资源可用的话,则调用pthread_cond_signal发送一个资源可用信号
但wait成功后,有可能多个线程都在等待这个资源可用的信号,信号发出后只有一个资源可用。未竞争到资源的线程应当继续等待资源,故使用while而不是if(如只有一个消费者,则可以使用if)
阻塞队列(block_queue.h)
阻塞队列类中封装了生产者-消费者模型,使用了循环数组实现了队列(也可使用STL中的队列)作为共享缓冲区
比较重要的是以下几个函数
- 往队列添加元素(生产者生产了一个元素)
//往队列添加元素,需要将所有使用队列的线程先唤醒 //当有元素push进队列,相当于生产者生产了一个元素 //若当前没有线程等待条件变量,则唤醒无意义 bool push(const T& item) { m_mutex.lock(); if (m_size >= m_max_size) { m_cond.broadcast(); m_mutex.unlock(); return false; } m_back = (m_back + 1) % m_max_size; 将新增数据放在循环数组的对应位置 m_array[m_back] = item; m_size++; m_cond.broadcast(); m_mutex.unlock(); return true; }
- 从队列中取出元素(消费者消费了一个元素)
//pop时,如果当前队列没有元素,将会等待条件变量 bool pop(T& item) { m_mutex.lock(); while (m_size <= 0) { if (!m_cond.wait(m_mutex.get())) { m_mutex.unlock(); return false; } } m_front = (m_front + 1) % m_max_size; item = m_array[m_front]; m_size--; m_mutex.unlock(); return true; }
- 增加的超时处理
- 线程等待相关函数
- sleep()存在缺陷:在等待期间线程无法唤醒
- pthread_cond_wait()存在缺陷:必须借助别的线程触发信号,否则线程自身无法唤醒
- pthread_cond_timedwait()的优势:超时或有信号触发唤醒线程,较为灵活
- 线程等待相关函数
//在pthread_cond_wait基础上增加了等待的时间,只指定时间内能抢到互斥锁即可 //其他逻辑不变 bool pop(T &item, int ms_timeout) { struct timespec t = {0, 0}; //struct timespec提供秒和纳秒单位,最高精度是纳秒 struct timeval now = {0, 0}; //struct timeval:提供秒和微秒单位,最高精度是微秒 gettimeofday(&now, NULL); //gettimeofday(struct timeval*tv,struct timezone *tz),返回当前时间tv以及当前时区信息tz pthread_mutex_lock(m_mutex); if (m_size <= 0) { t.tv_sec = now.tv_sec + ms_timeout / 1000; t.tv_nsec = (ms_timeout % 1000) * 1000; if (0 != pthread_cond_timedwait(m_cond, m_mutex, &t)) //pthread_cond_timedwait()中的&t需要时间的绝对值 { pthread_mutex_unlock(m_mutex); return false; } } if (m_size <= 0) //唤醒后线程未竞争到资源 { pthread_mutex_unlock(m_mutex); return false; } m_front = (m_front + 1) % m_max_size; item = m_array[m_front]; m_size--; pthread_mutex_unlock(m_mutex); return true; }
日志类
基础API
- fputs():把字符串写入到指定的stream中(不包括空字符)
int fputs(const char *str, FILE *stream) str:一个数组,包含了要写入的以空字符终止的字符序列 stream:指向 FILE 对象的指针,该 FILE 对象标识了要被写入字符串的流 return:成功返回一个非负值,错误返回EOF
- 可变参数宏_VA__ARGS__
__VA_ARGS__是一个可变参数的宏,是新的 C99 规范中新增的,目前似乎只有 gcc 支持( VC 从 VC2005 开始支持)。定义时宏定义中参数列表的最后一个参数为省略号 “…”
例如:
#include <stdio.h> #define myprintf(...) printf(__VA_ARGS__) int main() { myprintf("0123456789\n"); myprintf("www.csdn.net\n"); return 0; } /* 0123456789 www.csdn.net */ //搭配va_list的format使用 #define my_print2(format, ...) printf(format, __VA_ARGS__) #define my_print3(format, ...) printf(format, ##__VA_ARGS__)
__VA_ARGS__宏前面加上##的作用在于,当可变参数的个数为0时,上述例子中printf参数列表中的的##会把前面多余的 “,” 去掉,否则会编译出错,建议使用时加上 “##”,使得程序更加健壮
- fflush()函数:更新缓存区
- 调用fflush()会将缓冲区中的内容写到stream所指的文件中去
- 若stream为NULL,则会将所有打开的文件进行数据更新
int fflush(FILE *stream) stream:指向 FILE 对象的指针 fflush(stdin):刷新缓冲区,将缓冲区内的数据清空并丢弃 fflush(stdout):刷新缓冲区,将缓冲区内的数据强制输出到设备
在使用多个输出函数连续进行多次输出到控制台时,有可能上一个数据还没输出完毕,还在输出缓冲区中时,下一个printf就把另一个数据加入输出缓冲区,覆盖了原来的数据,出现输出错误。为了避免这种错误,可以在prinf()后加上 “fflush(stdout);” 强制马上输出到控制台
printf打印到标准输出时,终端是行缓存, 遇到\n就将缓存输出,如果多次printf中只有最后一次打印带 “\n” ,则只有最后一次碰到 “\n” 时才将所有内容一次打印出来。stdout默认是是行缓冲的,遇到 “\n” 就写内容清缓存,而加上 “fflush(stdout);” 与 有 “\n” 作用一样,只是不换行
int print1() //先进入sleep后打印 “hello world!” { printf("hello"); sleep(5); printf(" world!\n"); return 0; } int print2() //先打印 “hello” 再进入sleep 后打印 “ world!” { printf("hello"); fflush(stdout); sleep(5); printf(" world!\n"); return 0; }
日志类流程
- 日志文件
- 局部变量的懒汉模式获取实例
- 生成日志文件,并判断同步和异步写入方式
- 同步写入
- 判断是否分文件
- 直接格式化输出内容,将信息写入日志文件
- 异步写入
- 判断是否分文件
- 格式化输出内容,将内容写入阻塞队列,创建一个写线程,从阻塞队列取出内容写入日志文件
具体实现
- 日志类定义(log.h)
- 通过局部变量的懒汉单例模式创建日志实例,对其进行初始化生成日志文件后,格式化输出内容,并根据不同的写入方式,完成对应逻辑,写入日志文件
- 日志类中的方法不会被其他程序直接调用,通过四个可变参数宏提供其他程序的调用方法并对日志等级进行分类
#pragma once #ifndef LOG_H #define LOG_H #include <stdio.h> #include <iostream> #include <string> #include <stdarg.h> #include <pthread.h> #include "block_queue.h" using namespace std; class Log { public: //C++11以后,使用局部变量懒汉不用加锁 static Log* get_instance() //采用C++11提供的Meyers' Singleton,无须加锁 { static Log instance; return &instance; } static void* flush_log_thread(void* args) //异步写日志公有方法,调用私有方法async_write_log() { Log::get_instance()->async_write_log(); } //可选择的参数有日志文件、日志缓冲区大小、最大行数以及最长日志条队列 bool init(const char* file_name, int close_log, int log_buf_size = 8192, int split_lines = 5000000, int max_queue_size = 0); void write_log(int level, const char* format, ...); //将输出内容按照标准格式整理 void flush(void); //强制刷新缓冲区 private: Log(); virtual ~Log(); void* async_write_log() { string single_log; //从阻塞队列中取出一个日志string,写入文件 while (m_log_queue->pop(single_log)) { m_mutex.lock(); fputs(single_log.c_str(), m_fp); m_mutex.unlock(); } } private: char dir_name[128]; //路径名 char log_name[128]; //log文件名 int m_split_lines; //日志最大行数 int m_log_buf_size; //日志缓冲区大小 long long m_count; //日志行数记录 int m_today; //因为按天分类,记录当前时间是那一天 FILE* m_fp; //打开log的文件指针 char* m_buf; //要输出的内容 block_queue<string>* m_log_queue; //阻塞队列 bool m_is_async; //是否同步标志位 locker m_mutex; //同步类 int m_close_log; //关闭日志 }; //这四个宏定义在其他文件中使用,主要用于不同类型的日志输出 #define LOG_DEBUG(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(0, format, ##__VA_ARGS__); Log::get_instance()->flush();} #define LOG_INFO(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(1, format, ##__VA_ARGS__); Log::get_instance()->flush();} #define LOG_WARN(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(2, format, ##__VA_ARGS__); Log::get_instance()->flush();} #define LOG_ERROR(format, ...) if(0 == m_close_log) {Log::get_instance()->write_log(3, format, ##__VA_ARGS__); Log::get_instance()->flush();} #endif
- 初始化生成日志文件&判断写入方式
- 通过单例模式获取唯一的日志类,调用init方法,初始化生成日志文件,服务器启动按当前时刻创建日志,前缀为时间,后缀为自定义log文件名,并记录创建日志的时间day和行数count
- 写入方式通过初始化时是否设置阻塞队列大小来判断,若队列大小为0,则为同步,否则为异步
//异步需要设置阻塞队列的长度,同步不需要设置 bool Log::init(const char* file_name, int close_log, int log_buf_size, int split_lines, int max_queue_size) { //如果设置了max_queue_size,则设置为异步 if (max_queue_size >= 1) { m_is_async = true; m_log_queue = new block_queue<string>(max_queue_size); pthread_t tid; //flush_log_thread为回调函数,这里表示创建线程异步写日志 pthread_create(&tid, NULL, flush_log_thread, NULL); } //输出内容的长度 m_close_log = close_log; m_log_buf_size = log_buf_size; m_buf = new char[m_log_buf_size]; memset(m_buf, '\0', m_log_buf_size); m_split_lines = split_lines; //日志的最大行数 time_t t = time(NULL); struct tm* sys_tm = localtime(&t); struct tm my_tm = *sys_tm; const char* p = strrchr(file_name, '/'); //从后往前找到第一个“/”的位置 char log_full_name[256] = { 0 }; //自定义日志名 if (p == NULL) //若输入的文件名没有“/”,则以“时间+文件名”作为日志名 { snprintf(log_full_name, 255, "%d_%02d_%02d_%s", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, file_name); } else { strcpy(log_name, p + 1); strncpy(dir_name, file_name, p - file_name + 1); //p - file_name + 1:文件所在路径文件夹的长度 dir_name相当于“./” snprintf(log_full_name, 255, "%s%d_%02d_%02d_%s", dir_name, my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, log_name); } m_today = my_tm.tm_mday; m_fp = fopen(log_full_name, "a"); if (m_fp == NULL) { return false; } return true; }
- 日志文件分级分文件并写入
- 日志文件分级
- Debug:调试代码时的输出,在系统实际运行时一般不使用
- Warn:这种警告与调试时终端的warning类似,是调试代码时使用
- Info:报告系统当前的状态,当前执行的流程或接收的信息等
- Error和Fatal:输出系统的错误信息
- 日志分文件
- 若当前day为创建日志时间,写入日志,否则按当前时间创建新log,更新创建时间和行数
- 若行数超过最大行限制,在当前日志的末尾加count/max_lines为后缀创建新log
- 日志文件分级
void Log::write_log(int level, const char* format, ...) { struct timeval now = { 0, 0 }; gettimeofday(&now, NULL); time_t t = now.tv_sec; struct tm* sys_tm = localtime(&t); //得到当前时间 struct tm my_tm = *sys_tm; char s[16] = { 0 }; switch (level) //文件分级 { case 0: strcpy(s, "[debug]:"); break; case 1: strcpy(s, "[info]:"); break; case 2: strcpy(s, "[warn]:"); break; case 3: strcpy(s, "[erro]:"); break; default: strcpy(s, "[info]:"); break; } //写入一个log,对m_count++, m_split_lines最大行数 m_mutex.lock(); m_count++; if (m_today != my_tm.tm_mday || m_count % m_split_lines == 0) //日志不是今天或写入的日志行数是最大行的倍数 { char new_log[256] = { 0 }; fflush(m_fp); fclose(m_fp); char tail[16] = { 0 }; //格式化日志中的时间部分 snprintf(tail, 16, "%d_%02d_%02d_", my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday); if (m_today != my_tm.tm_mday) //日期不是今天,创建当天日志,更新m_today和m_count { snprintf(new_log, 255, "%s%s%s", dir_name, tail, log_name); m_today = my_tm.tm_mday; m_count = 0; } else //写入的日志超出最大行,在之前的日志名基础上加后缀“m_count/m_split_lines” { snprintf(new_log, 255, "%s%s%s.%lld", dir_name, tail, log_name, m_count / m_split_lines); } m_fp = fopen(new_log, "a"); } m_mutex.unlock(); va_list valst; va_start(valst, format); //将传入的format参数赋值给valst,便于格式化输出 string log_str; m_mutex.lock(); //写入的具体时间内容格式 int n = snprintf(m_buf, 48, "%d-%02d-%02d %02d:%02d:%02d.%06ld %s ", //时间格式化,snprintf成功返回写字符的总数,其中不包括结尾的null字符 my_tm.tm_year + 1900, my_tm.tm_mon + 1, my_tm.tm_mday, my_tm.tm_hour, my_tm.tm_min, my_tm.tm_sec, now.tv_usec, s); int m = vsnprintf(m_buf + n, m_log_buf_size - n - 1, format, valst); //内容格式化,用于向字符串中打印数据、数据格式用户自定义,返回写入到字符数组str中的字符个数(不包含终止符) m_buf[n + m] = '\n'; m_buf[n + m + 1] = '\0'; log_str = m_buf; m_mutex.unlock(); if (m_is_async && !m_log_queue->full()) //异步且阻塞队列未满,则将日志信息加入阻塞队列 { m_log_queue->push(log_str); } else //同步则加锁向文件中写 { m_mutex.lock(); fputs(log_str.c_str(), m_fp); m_mutex.unlock(); } va_end(valst); }
va_start和va_end
#define va_start(list,param1) ( list = (va_list)¶m1+ sizeof(param1) ) #define va_end(list) ( list = (va_list)0 )
- va_start宏,获取可变参数列表的第一个参数的地址(list是类型为va_list的指针,param1是可变参数最左边的参数)
- va_end宏,清空va_list可变参数列表
某些版本的va_start宏为了方便对va_list的遍历,就给参数列表动态分配内存。这样一种C实现很可能利用va_end宏来释放此前动态分配的内存;
如果忘记调用宏va_end,最后得到的程序可能在某些机型上没有问题,而在另一些机型上则发生“内存泄露”
——《C陷阱与缺陷》
主要参考文章
项目代码
项目整体理解
TinyWebServer—从0到服务器开发
小白视角:一文读懂社长的TinyWebServer
C++网络编程入门:轻量级web并发服务器开发
#Web服务器-原始版本(来自微信公众号:两猿社)
I/O多路复用
一文看懂IO多路复用
IO多路复用的三种机制select、poll、epoll
epoll的理解
epoll中的ET和LT模式区别
http模块理解
线程池web服务器http_conn请求类
深入浅出理解有限状态机
什么是状态机,一篇文章就够了
浅析epoll—epoll函数深入讲解
从零开始,编写一个Web服务器—HTTP部分详细讲解以及代码实现
mmap()函数参数详解
一篇让你彻底了解http请求报文和响应报文的结构
GET和POST两种基本请求方法的区别
线程池模块理解
WEB服务器——TinyWebServer代码详细讲解(threadpool模块)
基于Linux的C++轻量级web服务器webserver/httpserver——线程池
如何深刻理解Reactor和Proactor?
数据库连接池模块理解
基于MysqlConnector/C++的数据库连接池的实现
zwiley的随机——数据库连接池
Mysql接口API详细使用说明
定时器模块理解
Web服务器—TinyWebServer代码详细讲解(timer模块)
如何实现一个定时器?看这一篇就够了
定时器设计(一)
信号同步机制理解
TinyWebServer 相关函数使用与样例 [线程同步机制]
良许Linux——互斥量mutex