基于多反应堆的高并发服务器【C/C++/Reactor】(中)EventLoop初始化和启动

avatar
作者
猴君
阅读量:0

(一)详述EventLoop 

这个Dispatcher是一个事件分发模型,通过这个模型,就能够检测对应的文件描述符的事件的时候,可以使用epoll/poll/select,前面说过三选一。另外不管是哪一个底层的检测模型,它们都需要使用一个数据块,这个数据块就叫做DispatcherData。除此之外,还有另外一个部分,因为在这个反应堆模型里边对应一系列的文件描述符,都属于epoll/poll/select,但是这些文件描述符,它们不是固定的。

  1. 可能我们和客户端新建立了一个连接,那么就需要把某个节点就是某个文件描述符添加到这个Dispatcher模型上边
  2. 或者说服务器和客户端断开连接了,对应的这个文件描述符,就需要从Dispatcher对应的检测集合中删除
  3. 还有一种情况就是要修改Dispatcher检测的这些文件描述符对应的事件

struct EventLoop{     bool isQuit;// 开关     struct Dispatcher* dispatcher;     void* dispatcherData;          // 任务队列     struct ChannelElement* head;     struct ChannelElement* tail;      // 用于存储channel的map     struct ChannelMap* channelMap;          // 线程ID,Name,mutex     pthread_t threadID;     char threadName[32];     pthread_mutex_t mutex;     // 待续写... };

对于刚才描述的这三种情况,不管是哪一个,都可以把它们称为任务。既然是任务,如果产生了多个这样的任务,就需要把这些任务存储起来,所以对应的就需要有一个任务队列。在C语言里边是没有所谓现成的任务队列可直接拿来使用。C++中就有STL,可以直接拿一个队列来直接用,或者说拿一个list来直接用,在C语言里边,我们只能自己写了...

// 处理该节点中的channel的方式 enum ElemType{     ADD,    // 添加     DELETE, // 删除     MODIFY  // 修改 };  // 定义任务队列的节点 struct ChannelElement {     int type;// 如何处理该节点中的channel     struct Channel* channel;     struct ChannelElement* next; };

既然它是一个任务队列,也就意味着这个队列里边的节点的个数是不固定的,所以我们就需要一个动态的模型,可以实现一个链表。这个链表的节点是什么类型的?是ChannelElement类型.所谓ChannelElement类型。它里边主要其实是一个Channel,还有下一个节点的指针。通过指向下一个节点的指针,就可以把每一个节点连接起来了。当这个任务队列里边有了任务之后,我们就需要通过一个循环,把链表里边所有的节点都读出来。

  •     如果这是一个添加事件的节点,那么我们就把这个文件描述符添加到Dispatcher对应的检测集合中。
  •     如果它是删除的,那么我们就把这个文件描述符从Dispatcher的检测集合中删除。
  •     如果它是修改事件,那么我们就把这个节点在Dispatcher中的事件做一个修改。

这个EventLoop里边有一个任务队列,可以说这个EventLoop它是一个生产者和消费者模型

  • 消费者是谁呢?
    • 就是这个Dispatcher
  • 生产者是谁呢?
    • 生产者有可能是其他的线程,比如说主线程和客户端建立了连接,剩下的事就是通信。如果要通信,就对应一个通信的文件描述符。主线程就把这个任务添加到了子线程对应的这个EventLoop里边。此时在taskQueue里边就多出来了一个节点。在遍历这个任务队列的时候,读到这个节点之后,就需要把当前的这个节点添加到Dispatcher对应的检测模型里边。

另外,在这个EventLoop里边,还有一个ChannelMap,这个ChannelMap也是我们实现的,是通过一个数组来实现的。基于这个ChannelMap,就能够通过一个文件描述符得到对应的那个channel,为什么要得到那个channel呢?因为在这个channel里边有文件描述符,它的读事件写事件对应的回调函数,就是事件触发之后,执行什么样的处理动作。

思考】什么时候用到了这个ChannelMap了呢?

在实现epoll/select/poll的时候,分别调用了epoll_wait函数/select函数/poll函数,通过遍历内核传出来的这个集合,就得到了触发对应事件的那个文件描述符。但我们现在处理不了,因为得不到对应的channel,我们可以通过EventLoop里边提供的这个ChannelMap就能够得到对应的Channel指针,这个Channel指针就可以调用事件对应的处理函数了。

这三大块之外,还有一些其他的数据成员,比如threadID。因为在当前的服务器里边有多个EventLoop,每个EventLoop都属于一个线程。所以我们可以记录一下这个EventLoop它所对应的那个子线程的线程ID。关于子线程的这个名字肯定是我们给它起的,因为子线程创建出来之后,它只有一个ID,这是系统分配的。关于它的名字,操作系统是没有告诉我们的。

ThreadCondition是条件变量,可用于阻塞线程

思考ThreadMutex这个互斥锁它保护的是什么?

其实它保护的是这个任务队列。为什么要保护任务队列呢?对于这个EventLoop来说,它能够被多少个线程操作呢?

  • 如果是主线程的EventLoop,那就是一个。
  • 如果是子线程的EventLoop,那就有可能是两个,为什么是两个呢?
    • 当前,线程在执行这个EventLoop的时候,它肯定要遍历这个taskQueue吧,也就是当前线程需要读这个任务队列。
    • 除此之外,如果主线程和客户端建立了一个连接,主线程是有可能要把一个任务添加到这个EventLoop,对应的任务队列里边,就是额外的另一个线程了。
    • 如果涉及到两个线程操作,同一块共享资源,那么我们是要通过互斥锁来保护这个共享资源的。如果不保护,肯定就会出现数据混乱。

整个项目的结构,在当前这个多反应堆模型的服务器程序里边,它是有多个EventLoop模型的。首先,在主线程里边就有一个EventLoop主线程的,这个EventLoop去检测客户端有没有新的连接到达。如果有新连接,就建立新连接。之后,主线程把这个通信的任务给到线程池里边,把主线程的那个EventLoop也传进来。一定要注意这个EventLoop和主线程的EventLoop是同一个实例。也就是说,线程池里边的这个MainEventLoop和外边这个MainEventLoop它们对应的是同一块内存地址

另外,在线程池里边还有若干个子线程,每个子线程里边都对应一个EventLoop。每个子线程里边的EventLoop它们主要是处理通信的文件描述符相关的操作。这些都是在子线程里边来完成的:

  • 比如说要把一个通信的文件描述符添加到EventLoop里边。
  • 如果服务器和客户端断开连接了,那么就需要把通信的文件描述符从EventLoop里边删除或者要修改这个通信的文件描述符检测的事件

思考】那么,为什么右侧的TcpConnection里边也有一个EventLoop呢?

关于这个TcpConnection,其实它是封装了用于通信的文件描述符,在这个模块里边,是把子线程里边那个EventLoop的地址传递给了TcpConnection。

在每个线程里边,都有一个EventLoop,也就是说EventLoop是属于线程的,不管是主线程还是子线程,里边都有一个EventLoop然后在这个TcpConnection里边,也有一个EventLoop,但是不是说EventLoop属于TcpConnection,而是TcpConnection属于EventLoop

如果TcpConnection,它属于EventLoop,那么这个TcpConnection就属于对应的某一个子线程。EventLoop属于哪个子线程,这个TcpConnection它就属于哪个子线程。它对应的那些任务处理就是和客户端通信,接收数据以及发送数据的操作就在哪个子线程里边来完成。在线程池里边传进来了一个主线程EventLoop,主线程的EventLoop也是一个反应堆实例。

思考】为什么要把主线程的反应堆实例传递给线程池呢?

是因为我们在给线程池做初始化的时候,如果指定线程池的子线程个数为0,此时线程池就没有了,不能工作。

为了能够保证线程池能够工作,也就传进来了一个主线程的反应堆实例,在没有子线程的情况下,那么就借用主线程的反应堆实例来完成对应的这一系列的任务处理。在此时,客户端和服务器建立连接之后,得到了用于通信的文件描述符,这个通信的文件描述符被TcpConnection封装起来了。我们就需要把这个TcpConnection放到一个反应堆模型里边,就是放到主线程的EventLoop里边,这样客户端和服务器的通信操作也就能实现了。这种比较极端的情况,对于程序来说,它就是一个单反应堆模型。

  • 如果在创建线程池的时候指定这个子线程的个数大于0,那么就是一个多反应堆的服务器模型。
  • 如果在创建线程池的时候指定线程的个数等于0,此时就是一个单反应堆的服务器模型。

>>总结

(1)反应堆模型中的EventLoop介绍 

  • 详细介绍了反应堆模型中的EventLoop,包括它的主要作用、如何被多个线程操作,以及它与任务队列和Dispatcher检测模型的关系。对于理解反应堆模型的工作原理至关重要。

(2)主线程与子线程的交互过程

  • 主线程与子线程通过任务队列进行交互,子线程处理通信相关的文件描述符操作
  • 主线程它只能负责和客户端建立连接,如果这个连接建立好了,剩下的事情都是需要由这个子线程来完成的。所以主线程肯定不会给你去处理任务队列里边的节点。在主线程里边,其实它是有一个反应堆模型的,在当前的这个子线程里边也有一个反应堆模型。每个反应堆模型里边都有一个Dispatcher。关于这个Dispatcher就是epoll、poll、或者select,所以主线程去处理的话,这个任务就放到主线程的那个Dispatcher里边了,这样很显然是不对的。故在子线程的任务队列里边有了任务之后,还需要交给子线程的Dispatcher去处理。
  • 因此这个节点的处理,还需要判断当前线程到底是什么线程。如果它是主线程不能让它去处理,如果是子线程,直接让它去处理

(3)EventLoop与任务队列的关系

  • 当任务队列中有任务时,会通过循环遍历链表,将任务添加到Dispatcher对应的检测模型中。

(4)文件描述符的管理

  • 新连接建立或断开时,或要修改Dispatcher检测的这些文件描述符对应的事件时,文件描述符的添加、删除和修改操作都与EventLoop紧密相关。

 (二)EventLoop 的初始化

  •  EventLoop .h
// 初始化 struct EventLoop* eventLoopInit(); struct EventLoop* eventLoopInitEx(const char* threadName);
  •  EventLoop .c
// 初始化 struct EventLoop* eventLoopInit() {     return eventLoopInitEx(NULL); }  struct EventLoop* eventLoopInitEx(const char* threadName) {     struct EventLoop* evLoop = (struct EventLoop*)malloc(sizeof(struct EventLoop));     evLoop->isQuit = false; // 没有运行     evLoop->dispatcher = &EpollDispatcher;     evLoop->dispatcherData = evLoop->dispatcher->init();           // 任务队列(链表)     evLoop->head = evLoop->tail = NULL;      // 用于存储channel的map     evLoop->channelMap = channelMapInit(128);      evLoop->threadId = pthread_self(); // 当前线程ID     strcpy(evLoop->threadName,threadName == NULL ? "MainThread" : threadName); // 线程的名字     pthread_mutex_init(&evLoop->mutex, NULL);       ...(待续写)     return evLoop; }

 (二)启动 EventLoop 

  •  EventLoop .h
// 启动反应堆模型 int eventLoopRun(struct EventLoop* evLoop);
  •  EventLoop .c

参数有效性检查 启动函数要求传入一个有效的指针。通过断言,我们可以确保指针的有效性。如果无效,程序将抛出异常。

模型提取与循环EventLoop 中取出模型后,需要让它进行while循环。这个循环中,可调用的是epoll模型epoll_wait函数/poll模型poll函数/select模型select函数。但这个调用不是直接的,而是通过dispatcher指针的dispatch函数。

线程安全检查 在执行dispatch函数时,会检查当前线程的ID与保存的线程ID是否匹配。如果不匹配,则视为异常情况,返回-1 

动态任务函数理解dispatch指向的任务函数是动态的,根据初始化的底层lO模型(如epoll、poll或select)不同,dispatch函数内部调用的函数也不同

文件描述符处理 每次调用 select/poll/epoll_wait 时,会得到一些被激活的文件描述符。通过for循环,对这些文件描述符进行处理

// 启动反应堆模型 int eventLoopRun(struct EventLoop* evLoop) {     assert(evLoop != NULL);     // 取出事件分发和检测模型     struct Dispatcher* dispatcher = evLoop->dispatcher;     // 比较线程ID是否正常     if(evLoop->threadID != pthread_self()) {         return -1;     }     // 循环进行事件处理     while(!evLoop->isQuit) {         /*             dispatch指向的任务函数其实是动态的,由于在初始化的时候指向的是底层的             IO模型用的是epoll模型,那么dispatcher->dispatch(evLoop,2);             就是调用epollDispatch,里头的epoll_wait函数会随之被调用,每             调用一次epoll_wait函数,就会得到一些被激活的文件描述符             然后通过for循环,对被激活的文件描述符做一系列的处理              如果是poll,就是调用pollDispatch,里头的poll函数会随之被调用,每             调用一次poll函数,就会得到一些被激活的文件描述符             然后通过for循环,对被激活的文件描述符做一系列的处理              如果是select,就是调用selectDispatch,里头的select函数会随之被调用,每             调用一次select函数,就会得到一些被激活的文件描述符             然后通过for循环,对被激活的文件描述符做一系列的处理          */         dispatcher->dispatch(evLoop,2); // 超时时长 2s         ...(待续写)     }     return 0;    }

详解注释:

dispatch指向的任务函数其实是动态的:由于在初始化的时候指向的是底层的IO模型用的是epoll模型,那么dispatcher->dispatch(evLoop,2);就是调用epollDispatch,里头的epoll_wait函数会随之被调用,每调用一次epoll_wait函数,就会得到一些被激活的文件描述符然后通过for循环,对被激活的文件描述符做一系列的处理 

static int epollDispatch(struct EventLoop* evLoop,int timeout) {     struct EpollData* data = (struct EpollData*)evLoop->dispatcherData;     int count = epoll_wait(data->epfd,data->events,Max,timeout * 1000);     for(int i=0;i<count;++i) {         int events = data->events[i].events;         int fd = data->events[i].data.fd;         if(events & EPOLLERR || events & EPOLLHUP) {             // 对方断开了连接,删除 fd             // epollRemove(&evLoop->channels[fd],evLoop);             continue;         }         if(events & EPOLLIN) {             ...(待续写)         }         if(events & EPOLLOUT) {             ...(待续写)         }     }     return 0; }

如果是poll,就是调用pollDispatch,里头的poll函数会随之被调用,每调用一次poll函数,就会得到一些被激活的文件描述符,然后通过for循环,对被激活的文件描述符做一系列的处理 

static int pollDispatch(struct EventLoop* evLoop,int timeout) {     struct PollData* data = (struct PollData*)evLoop->dispatcherData;     int count = poll(data->fds,data->maxfd + 1,timeout * 1000);     if(count == -1) {         perror("poll");         exit(0);     }     for(int i=0;i<=data->maxfd;++i) {         if(data->fds[i].fd == -1) {             continue;         }         if(data->fds[i].revents & POLLIN) {             ...(待续写)         }         if(data->fds[i].revents & POLLOUT) {             ...(待续写)         }     }     return 0; }

如果是select,就是调用selectDispatch,里头的select函数会随之被调用,每调用一次select函数,就会得到一些被激活的文件描述符,然后通过for循环,对被激活的文件描述符做一系列的处理 

static int selectDispatch(struct EventLoop* evLoop,int timeout) {     struct SelectData* data = (struct SelectData*)evLoop->dispatcherData;     struct timeval val;     val.tv_sec = timeout;     val.tv_usec = 0;     fd_set rdtmp = data->readSet;     fd_set wrtmp = data->writeSet;     int count = select(Max,&rdtmp,&wrtmp,NUll,&val);     if(count == -1) {         perror("select");         exit(0);     }     for(int i=0;i<Max;++i) {          if(FD_ISSET(i,&rdtmp)) {             ...(待续写)         }         if(FD_ISSET(i,&wrtmp)) {             ...(待续写)         }     }     return 0; }

总结 :反应堆模型的启动流程涉及多个关键环节,从参数有效性检查到线程安全检查,再到动态任务函数的执行。理解这一流程对于维护和优化系统的稳定性至关重要。 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!