【项目】仿muduo库One Thread One Loop式主从Reactor模型实现高并发服务器(TcpServer板块)

avatar
作者
猴君
阅读量:2

【项目】仿muduo库One Thread One Loop式主从Reactor模型实现⾼并发服务器(TcpServer板块)


一、思路图

在这里插入图片描述

二、模式关系图

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

三、定时器的设计

1、Linux本身给我们的定时器

#include <sys/timerfd.h> // 头文件 int timerfd_create(int clockid, int flags); // 创造定时器  clockid: CLOCK_REALTIME-系统实时时间,如果修改了系统时间就会出问题;   CLOCK_MONOTONIC-从开机到现在的时间是⼀种相对时间;  flags: 0-默认阻塞属性 int timerfd_settime(int fd, int flags, struct itimerspec *new, struct itimerspec *old); // 设置定时时间  fd: timerfd_create返回的⽂件描述符  flags: 0-相对时间, 1-绝对时间;默认设置为0即可.  new: ⽤于设置定时器的新超时时间  old: ⽤于接收原来的超时时间  struct timespec   { 	time_t tv_sec; /* Seconds */  	long tv_nsec; /* Nanoseconds */  };  struct itimerspec   {  	struct timespec it_interval; /* 第⼀次之后的超时间隔时间 */  	struct timespec it_value; /* 第⼀次超时时间 */  }; 

我们利用linux给的定时器来进行代码的书写(简单使用一下):

#include <iostream> #include <cstdio> #include <string> #include <ctime> #include <cstdlib> #include <unistd.h> #include <sys/timerfd.h> #include <sys/select.h>  int main() {   // 创建一个定时器   int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);   struct itimerspec iem; // 使用该结构体   /* struct timespec {   time_t tv_sec; // Seconds    long tv_nsec; // Nanoseconds    };   struct itimerspec {   struct timespec it_interval; // 第 次之后的超时间隔时间    struct timespec it_value; // 第 次超时时间    };*/   iem.it_value.tv_sec = 2; // 第一次的超时时间(秒)   iem.it_value.tv_nsec = 0; // 第一次的超时时间(毫秒)   iem.it_interval.tv_sec = 2; // 这次过后的每隔多长时间超时(秒)   iem.it_interval.tv_nsec = 0; // 这次过后的每隔多长时间超时(毫秒)   // 启动定时器   timerfd_settime(timerfd, 0, &iem, NULL); // 启动定时器   time_t start = time(NULL);   // 每隔三秒进行一次定时器的读取操作   while(1)   {     uint64_t temp;     int fd = read(timerfd, &temp, sizeof(temp));     if (fd < 0)     {       perror("read failed!");       return -1;     }     std::cout << temp << " " << time(NULL) - start << std::endl;   }   return 0; } 

在这里插入图片描述

2、我们自己实现的定时器

(1)代码部分

先上代码:

#include <iostream> #include <unordered_map> #include <vector> #include <cstdint> #include <unistd.h> #include <memory> #include <functional>  using FuncTask = std::function<void()>; // 回调函数,用来回调函数任务的 using ReleaseTask = std::function<void()>; // 回调函数,用来回调释放任务的 class TimerTask {     private:         uint64_t _id; // 定时器任务对象ID         uint32_t _timeout; // 定时器任务超时的时间         bool _cancel;  // 用来取消定时任务, false表示没有被取消,true表示被取消了         FuncTask _taskcb; // 定时器任务要执行的定时任务         ReleaseTask _releasecb; // 定时器任务在时间轮中保存的定时器信息     public:         TimerTask(uint64_t id, uint32_t delay, const FuncTask& cb) // 构造函数             : _id(id)             , _timeout(delay)             , _cancel(false) // 刚开始定为false             , _taskcb(cb)         {}         ~TimerTask() // 析构函数         {             // 在析构函数中进行两个回调函数的创建             if (_cancel == false)                  _taskcb();             _releasecb();         }         void SetRelease(const ReleaseTask& cb) // 设置释放的回调函数         {             _releasecb = cb;         }         uint32_t DelayTimer() // 将延迟时间开放出去         {             return _timeout;         }         void Cancel()         {             _cancel = true;         } }; // 时间轮 class WheelTask {     private:         using SharedPtrTask = std::shared_ptr<TimerTask>; // 用一个shared_ptr,因为有引用计数,当引用计数为0的时候,就是释放资源的时候         using WeakPtrTask = std::weak_ptr<TimerTask>; // weak_ptr用来配合shared_ptr使用         int _tick; // 当前的秒钟,指到哪就将哪释放掉         int _capacity; // 表盘的最大数量(最大延迟时间)         std::vector<std::vector<SharedPtrTask>> _wheel; // 时间轮(二维数组,存放的数据更规整)         std::unordered_map<uint64_t, WeakPtrTask> _timers; // 映射关系     private:         void RemovePtr(uint64_t id) // 移动指针         {             auto it = _timers.find(id); // 先找到id在_timers中             if (it != _timers.end())             {             	_timers.erase(it); // 删除掉it所指向的位置,也就是id的位置             }         }     public:         WheelTask()             : _tick(0)             , _capacity(60)             , _wheel(_capacity)         {}     void TimerAdd(uint64_t id, uint32_t delay, const FuncTask& cb) // 添加到定时任务     {         SharedPtrTask pt(new TimerTask(id, delay, cb)); // 先建立一个shared_ptr         pt->SetRelease(std::bind(&WheelTask::RemovePtr, this, id)); // 有一个this指针         int pos = (_tick + delay) % _capacity;         _wheel[pos].push_back(pt); // 将这个pt先插入到哈希表中         _timers[id] = WeakPtrTask(pt); // 对象中不能用shared_ptr,因为shared_ptr不会减减引用计数,所以用weak_ptr     }     void TimerRefresh(uint64_t id) // 刷新/延迟定时时间     {         auto it = _timers.find(id);         if (it == _timers.end())         {             perror("_timers find error!");             return;         }         SharedPtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptr         int delay = pt->DelayTimer();         int pos = (_tick + delay) % _capacity;         _wheel[pos].push_back(pt);     }     void CancelTimer(uint64_t id) // 取消定时任务     {         auto it = _timers.find(id);         if (it == _timers.end())         {             perror("_timers not find!");             return;         }         SharedPtrTask pt = it->second.lock();         if (pt) pt->Cancel();     }     void RunTimerTask() // 时钟滴答滴答往后走     {         _tick = (_tick + 1) % _capacity;         _wheel[_tick].clear(); // 清空一下     } };  // for test class Task {     public:         Task(){std::cout << "构造函数!~" << std::endl;}         ~Task(){std::cout << "析构函数!~" << std::endl;}    };  void Delete(Task* t) {     delete t; }  int main() {     WheelTask tw;     Task* t = new Task();     tw.TimerAdd(888, 5, std::bind(Delete, t));      for(int i = 5; i > 0; i--)     {         sleep(1);         tw.TimerRefresh(888);//刷新定时任务         tw.RunTimerTask();//向后移动秒针         std::cout << "刷新了一下操作,需要" << i << "秒以后才能进行\n";     }     tw.CancelTimer(888); // 用来测试取消的函数     while(1)      {         std::cout << "-------------------\n";         tw.RunTimerTask();//向后移动秒针         sleep(1);     }     return 0;  } 

(2)思想部分

我们linux本身的定时器是有很大缺陷的,因为我们假如说一个任务超时了以后,要将所有的所有的连接进行遍历一遍,这就效率大大的降低,因为假如说是有成千上万个连接的话,从头到尾变量太不现实了。
所以我们有了另一种方案,这种方案的思路来自于时钟的钟表,我们看时钟的钟表是不是滴答滴答的往后走的?所以我们就可以用的是这种思路来进行设计的,我们定义一个数组,以及一个指针,指针起始位置指向初始位置,指针每秒钟往后走一步,走到哪里象征着哪里的任务开始执行,所以假如我们设置一个3秒的任务,我们只需要用_tick指针往后+3指向数组的相对应的位置即可,然后执行当前的任务。而假如说是我们有成批成批的任务的话,我们设计思路也非常简单,我们在当前位置往下拉一排数组就可以了,也就是我们熟悉的哈希桶底下挂链表,所以也就是在当前的时刻同时执行很多的任务。如下图所示:
在这里插入图片描述

而还有一种条件,假如说我们的超时时间到了1小时的时间的话,这个数组要开3600个,还是太多了,那么我们就有另外一种思路,我们用多个时间轮来进行操作,我们设置一个小时时间轮,一个分钟时间轮,一个秒钟时间轮,当小时时间轮到相对应的位置的时候,分钟时间轮开始运动,到了相对应的位置秒钟时间轮开始运动,一直到对应的位置即可,我们如下图所示:
在这里插入图片描述

但其实,我们平常用的定时任务最多也不过30秒,所以我们只需要设计60秒的就可以了,不用设计多层的时间轮,只需要设计单层时间轮即可。

我们可以去思考一个问题,我们现在实现的想法是当时间片到了,我们主动去执行定时任务,释放连接,那我们可不可以时间片到了后自动取执行定时任务,释放连接。那么我们可以想到析构函数!利用析构函数将一个定时任务作为一个类中的析构函数内进行操作,那么这个定时任务在对象被释放的时候执行析构函数进行释放。
同时还有一个问题:假如说我们当前的任务建立连接成功了,我们给这个连接设置了一个30s的定时销毁的任务,但是在第10秒的时候,突然建立了通信了,我们此时该怎么办?我们还记不记得有一个智能指针叫shared_ptr?我们利用这个智能指针的引用计数来进行解决,简单思路就是第10秒后在第40秒的位置加一个销毁任务,那么引用计数也就为2,到第30秒先-1,到第40秒再-1到0,销毁,图的思路逻辑如下:
在这里插入图片描述

(3)使用实验

在这里插入图片描述

在这里插入图片描述

四、正则表达式

1、描述

正则表达式描述了⼀种字符串匹配的模式,可以⽤来检查⼀个串是否含有某种⼦串、将匹配的⼦串替换或者从某个串中取出符合某个条件的⼦串等。使用正则表达式,使得HTTP协议更加的简单了,因为其只需要用这一个来进行子串的匹配和提取了。

2、前置工作(非常重要)

sudo yum install -y devtoolset-9-gcc devtoolset-9-gcc-c++ 

在这里插入图片描述

scl enable devtoolset-9 bash 

在这里插入图片描述

vim ~/.bash_profile 

在这里插入图片描述

在这里插入图片描述

3、代码与解析

我们的HTTP代码是如下所示的:

"GET /jiangrenhai/login$usr=jrh&password=123456 HTTP/1.1\r\n" 

(1)请求字符

请求的方法有右边五种:GET|HEAD|POST|PUT|DELETE 

(2)[^?]*

表示匹配非问号的字符 后面的*表示匹配并提取0个或多个。

(3)\\?(.*)

\\?表示匹配原始的? (.*)表示匹配问号后的任意字符0次或多次 空格表示到空格为止.

(4)HTTP/1\\.[01]

\\表示匹配最原始的\ [01]表示匹配0或1。

(5)(?:\n|\r\n)?

(?: …) 表示匹配某个格式字符串,但是不提取, 最后的?表示的是匹配前边的表达式0次或1次。

(6)总体代码如下

#include <iostream> #include <regex> #include <string>  int main() {     // std::string str = "GET /jiangrenhai/login?usr=jrh&password=123456 HTTP/1.1\r\n";     std::string str = "GET /jiangrenhai/login$usr=jrh&password=123456 HTTP/1.1\r\n";     std::smatch matches;     // 请求的方法有右边五种:GET|HEAD|POST|PUT|DELETE       std::regex e("(GET|HEAD|POST|PUT|DELETE) ([^?]*)(?:\\?(.*))? (HTTP/1\\.[01])(?:\n|\r\n)?");     // [^?]*      表示匹配非问号的字符 后面的*表示匹配并提取0个或多个     // \\?(.*)    \\?表示匹配原始的? (.*)表示匹配问号后的任意字符0次或多次 空格表示到空格为止     // HTTP/1\\.[01]   \\表示匹配最原始的\  [01]表示匹配0或1     // (?:\n|\r\n)?   (?: ...) 表示匹配某个格式字符串,但是不提取, 最后的?表示的是匹配前边的表达式0次或1次     bool ret = std::regex_match(str, matches, e);     if (ret == false)     {       return -1;     }     for (auto &e : matches)     {       std::cout << "[" << e << "]" << std::endl;     }     return 0; } 

在这里插入图片描述

五、通用类型Any

1、简单介绍

每⼀个Connection对连接进⾏管理,最终都不可避免需要涉及到应⽤层协议的处理,因此在Connection中需要设置协议处理的上下⽂来控制处理节奏。但是应⽤层协议千千万,为了降低耦合度,这个协议接收解析上下⽂就不能有明显的协议倾向,它可以是任意协议的上下⽂信息,因此就需要⼀个通⽤的类型来保存各种不同的数据结构。

简而言之,也就是我们构建一个通用的类型Any,用来实例化近乎所有容器类型。

2、思想

我们假如说要实现一个通用的类型用来保存Connection的上下文,那么我们不能用模版来进行描写,因为假如我们用的是模版的话,我们需要传Any<int>或者Any<float>等,也就是说在使⽤的时候就要确定其类型。这是⾏不通的,因为保存在Content中的协议上下⽂,我们在定义any对象的时候是不知道他们的协议类型的,因此⽆法传递类型作为模板参数。所以我们考虑在Any类中内部设计⼀个模板容器holder类,可以保存各种类型数据,所以,定义⼀个基类placehoder,让placeholder继承于holder,而Any类保存⽗类指针即可,当需要保存数据时,则new⼀个带有模板参数的子类placeholder对象出来保存数据,然后让Any类中的父类指针,指向这个子类对象就搞定了。

3、代码实现

#include <iostream> #include <cassert> #include <typeinfo> #include <string>  // 先搭个框架--通用容器的函数 class Any {     private:         class Holder         {             public:                 virtual ~Holder(){} // 析构函数                 // 纯虚函数的定义在于只让它的子类对象刻画出对象,                 // 不让父类对象刻画出对象,就像动物包含着老虎狮子等的,老虎狮子是对象,而动物不是对象                 virtual const std::type_info& type() = 0;                  virtual Holder* clone() = 0;         };         template<class T>         class PlaceHolder : public Holder         {             public:                 // 构造函数                 PlaceHolder(const T& val) :_val(val) {}                 // 类型                 virtual const std::type_info& type()                 {                     return typeid(T); // 直接返回我们模版实例化的参数的类型即可                 }                 // 针对当前的函数对象,克隆出一个新的子类对象                 virtual Holder* clone()                 {                     return new PlaceHolder(_val);                 }             public:                 T _val; // 构造一个模版类型的数据         };         Holder *_content; // 父类的指针可以指向任何类型的数据(这个是Any类型的私有函数)     public:         // 空的构造函数         Any():_content(NULL){}         // 模版T类型的构造函数         template<class T>         Any(const T& val):_content(new PlaceHolder<T>(val)){}         // 通过其他的通用容器来构造我们自己的容器         Any(const Any& other):_content(other._content ? other._content->clone() : NULL){}         // 析构函数         ~Any()         {             delete _content; // 释放掉这个_content         }         // 得到子类对象保存的数据的指针         template<class T>         T* get()         {             // 当前对象的类型不能和传进来的实例化模版的类型是一样的             assert(_content->type() == typeid(T));             // 先强转,再指向_val,再取地址             return &((PlaceHolder<T>*)_content)->_val;         }         Any& Swap(Any& other)         {             std::swap(_content, other._content); // 调用算法函数直接交换this和other对象的指针             return *this; // 返回实例化的this解引用         }         // operator=的任意模板类型的运算符重载函数         template<class T>         Any& operator=(const T& val)         {             //为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放             Any(val).Swap(*this);             return *this;         }         // operator=的对象的运算符重载函数         Any& operator=(const Any& other)         {             Any(other).Swap(*this);             return *this;         } };  class Task {     public:         Task(){std::cout << "构造" << std::endl;}         Task(const Task& t){std::cout << "拷贝" << std::endl;}         ~Task(){std::cout << "析构" << std::endl;} };  int main() {     Any a;     {         Task t;         a = t;     }     // a = 10;     // int* pa = a.get<int>();     // std::cout << *pa << std::endl;     // a = std::string("nihaoya");     // std::string* ps = a.get<std::string>();     // std::cout << *ps << std::endl;     return 0; }  

4、效果展示

(1)int、string这种简单类型的

在这里插入图片描述

(2)其他类

在这里插入图片描述

5、C++17中的Any

(1)查阅文档介绍

在这里插入图片描述
我们主要用的是any和any_cast<>。

(2)例子展示

在这里插入图片描述
在这里插入图片描述

六、Buffer缓冲区

1、设计

(1)vector<char>的设计

我们使用的缓冲区中得有一块内存空间,我们这里采用的是vector<char>,为什么不用string呢?原因在于我们结尾又不需要判断\0,根本不需要用string,我们为什么不用vector<std::string>,我们是一个字符一个字符存的,又不是一段字符串一段字符串存的,所以用vector<char>更合适。

(2)要素

首先要有一个默认的空间大小
当前的读取数据的位置
当前的写入数据的位置

我们用代码角度解释为:

class Buffer { 	private: 		std::vector<char>; 		uint64_t ReadData; // 相对读数据的偏移量 		uint64_t WriteData; // 相对写数据的偏移量 	public: 		// ... }; 

(3)操作

i、写入操作

当前的写入位置指向哪里就从哪里开始写入,但我们内存空间总是有大小的吧?所以我们分下面情况进行讨论:

假如说是后续的剩余空间不够了
我们此时考虑一下当前内存的整体空间的大小是否足够,倘若是前面还是有空间的,那么我们将这段数据移到从最前面开始,读指针指向开头位置,写指针指向当前内存往后增加的结尾位置。倘若是前面已经没有空间了或者是前面的空间已经不够再加入我们当前需要插入的空间的时候,我们直接扩容。而当数据一旦写成功了,我们的当前写位置就要往后偏移。

在这里插入图片描述

ii、读取数据

当前的读取数据指向哪里,我们就从哪里开始读取,但其前提是要有数据可读。
可读数据大小=写数据位置-读数据位置。

2、函数接口

我们总共有11种主要的函数接口的定义,其余有其他拓展的函数接口,我们主要的十一种接口如下:
我们用下面这个形象的图来进行写接口。
在这里插入图片描述

        // 1、获取当前写入的起始地址         char* WritePos(); // 写就是数组开始位置+write的相对开头的位置(是一个指针)         // 2、获取当前读入的起始地址         char* ReadPos(); // 读就是数组开始位置+read的相对开头的位置(同样也是一个指针)         // 3、获取缓冲区末尾的空闲的空间大小--写数据往后的空闲空间的大小         uint64_t TailSpace(); // 总空间-写的位置         // 4、获取缓冲区开始的空闲的空间大小--读数据往前的空闲空间的大小         uint64_t HeadSpace(); // 直接就是读的位置,相对于整个缓冲区的开头         // 5、获取可读数据大小         uint64_t ReadAbleSize(); // 写位置-读位置         // 6、将读偏移往后         void ReadOffset(uint64_t len); // 读往后加len(考虑长度不大于可读空间)         // 7、将写偏移往后         void WriteOffset(uint64_t len); // 写往后加len(考虑长度不大于读位置往后的空间)         // 8、确保可写空间足够--整体空闲空间够了就用整体空闲空间,不够就扩容         void EnsureWriteSpace(uint64_t len); // 空间不够的话先看前面的空间和后面剩余空间相加是否够用,够用就往前挪然后后面加入,不够就扩容(扩到刚好的容量)         // 9、写入数据         void WriteData(const void* data, uint64_t len); // 从写的位置往后写len的长度         // 10、读取数据         void ReadData(void* buff, uint64_t len); // 从读位置往后读len个位置         // 11、清空缓冲区         void clear(); // 俩相对位置等于0即可 

后面会加很多的string类的,string类的只需要传入字符串即可,直接用上面的接口就OK了。

3、总体代码

#include <iostream> #include <string> #include <vector> #include <unistd.h> #include <cassert> #include <cstring> #define BufferDefault 1024  class Buffer {     private:         std::vector<char> _buffer;         uint64_t _read_idx; // 相对读数据的偏移量         uint64_t _write_idx; // 相对写数据的偏移量     public:         // 构造函数--俩偏移位置都从0开始,设置_buffer的空间大小为1024         Buffer():_buffer(BufferDefault), _read_idx(0), _write_idx(0){}         // 定义一个Begin()函数用来调用整个数组_buffer的起始位置的         char* Begin() { return &*_buffer.begin(); }         // 1、获取当前写入的起始地址         char* WritePos()         {             return Begin() + _write_idx; // 开始位置加上_write_idx到写的位置         }         // 2、获取当前读入的起始地址         char* ReadPos()         {             return Begin() + _read_idx; // 开始位置加上_read_idx到读的位置         }         // 3、获取缓冲区末尾的空闲的空间大小--写数据往后的空闲空间的大小         uint64_t TailSpace()         {             return _buffer.size() - _write_idx; // 总体的大小减去写的相对偏移量         }         // 4、获取缓冲区开始的空闲的空间大小--读数据往前的空闲空间的大小         uint64_t HeadSpace()         {             return _read_idx; // 直接就是读的相对偏移量         }         // 5、获取可读数据大小         uint64_t ReadAbleSize()         {             return _write_idx - _read_idx; // 写的偏移量-读的偏移量         }         // 6、将读偏移往后         void ReadOffset(uint64_t len)         {             assert(len <= ReadAbleSize()); // 不让往后读到结尾的位置,要读的长度小于等于可读数据大小             _read_idx += len; // 往后加等即可         }         // 7、将写偏移往后         void WriteOffset(uint64_t len)         {             assert(len <= TailSpace()); // 往后写不能到末尾,不能多于后面剩的空间大小             _write_idx += len; // 往后加等即可         }         // 8、确保可写空间足够--整体空闲空间够了就用整体空闲空间,不够就扩容         void EnsureWriteSpace(uint64_t len)         {             // 1、足够的话 直接返回             if (len <= TailSpace()) return;             // 2、末尾空间不够且加上前面剩余空间够的情况下,先将整体数据移动到最前面,然后再在后面进行加入             if (len <= TailSpace() + HeadSpace())             {                 // 先保存一下可读数据的大小                 uint64_t Size = ReadAbleSize();                 // 再将数据头移动到最开始,这里用std::copy函数                 // template<class InputIterator, class OutputIterator>                 // OutputIterator copy (InputIterator first, InputIterator last, OutputIterator result)                 // {                 //     while (first!=last) {                 //         *result = *first;                 //         ++result; ++first;                 //     }                 //     return result;                 // }                 std::copy(ReadPos(), ReadPos() + Size, Begin());                 _read_idx = 0; // 更新可读位置到0                 _write_idx = Size; // 更新可写位置为Size             }             else // 这种情况是需要扩容了             {                 _buffer.resize(len + _write_idx); // 直接扩容到相对应的位置             }         }         // 9、写入数据         void WriteData(const void* data, uint64_t len)         {             // 1、确保有足够的空间             EnsureWriteSpace(len);             // 2、写入数据             const char* data_ = (const char*)data;             std::copy(data_, data_ + len, WritePos());         }         // 写入push的         void WriteAndPush(const void* data, uint64_t len)         {             WriteData(data, len); // 先写进去             WriteOffset(len); // 再偏移         }         // 写入string类型数据         void WriteString(const std::string& data)         {             return WriteData(data.c_str(), data.size()); // 返回上面封装的函数即可         }         // 写入string再push         void WriteStringAndPush(const std::string& data)         {             WriteString(data); // 先写进去             WriteOffset(data.size()); // 再偏移         }         // 写入Buffer类的         void WriteBuffer(Buffer& data)         {             return WriteData(data.ReadPos(), data.ReadAbleSize()); // 先写入         }         // 写入Buffer的string类型         void WriteBufferAndPush(Buffer& data)         {             WriteBuffer(data); // 先写入             WriteOffset(data.ReadAbleSize()); // 再偏移         }         // 10、读取数据         void ReadData(void* buff, uint64_t len)         {             assert(len <= ReadAbleSize()); // 确保不超能读的范围             std::copy(ReadPos(), ReadPos() + len, (char*)buff);         }         // 读取string类         std::string ReadAsString(uint64_t len)         {             // 首先要求len不能比可读数据多             assert(len <= ReadAbleSize());             std::string str;             str.resize(len); // 将str这个string类字符串的空间定义好             ReadData(&str[0], len);             return str;         }         // 读取并往外拿         void ReadAndPop(void* buff, uint64_t len)         {             ReadData(buff, len);             ReadOffset(len);         }         // 读string类并往外拿         std::string ReadAsStringAndPop(uint64_t len) {             // 首先要求len不能比可读数据多             assert(len <= ReadAbleSize());             std::string str = ReadAsString(len);             ReadOffset(len);             return str;         }         // 找换行符         char* FindCRl()         {             // void *memchr(const void *s, int c, size_t n);             char* res = (char*)memchr(ReadPos(), '\n', ReadAbleSize()); // memchr(起始位置,到的字符,大小)             return res;         }         std::string GetLine()         {             char* pos = FindCRl(); // 先找到这一行的位置             if (pos == nullptr) { return ""; }             return ReadAsString(pos - ReadPos() + 1); // +1是为了将\n也加入           }         std::string GetLineAndPop()         {             std::string str = GetLine(); // 先找到             ReadOffset(str.size());             return str;         }         // 11、清空缓冲区         void clear()         {             _read_idx = _write_idx = 0;         } }; 

4、测试实例

(1)测试string类的是否有用

在这里插入图片描述

(2)检查扩容

在这里插入图片描述

(3)测试按行读取

在这里插入图片描述

七、暂停一下:日志宏的玩法

1、日志打印代码

我们通过日志宏的做法可以更快速的定位到错误的地方,将日志宏分等级以后,我们想打印什么等级的日志我们只需要进行打印即可,我们尝试写一下下面的日志宏代码:

#define INFOR 0 #define DEBUG 1 #define ERROR 2 #define LOGLEVEL INFOR     /*struct tm *localtime(const time_t *timep);--localtime的用法*/     // 先定义一个t为当前时间     // 用上面那个样例的结构体     /*size_t strftime(char *s, size_t max, const char *format,     const struct tm *tm);*/     // %H对应小时,%M对应分钟,%S对应秒 #define LOG(level, format/*类型*/, ...)do{\     if (level < LOGLEVEL) break;\     time_t t = time(NULL);\     struct tm *ltm = localtime(&t);\     char tmp[32] = {0};\     strftime(tmp, 31, "%H:%M:%S", ltm);\     fprintf(stdout, "[%s %s:%d]" format "\n", tmp,  __FILE__, __LINE__, ##__VA_ARGS__);\ }while(0)  #define INFLOG(format, ...) LOG(INFOR, format, ##__VA_ARGS__); #define DEBLOG(format, ...) LOG(DEBUG, format, ##__VA_ARGS__); #define ERRLOG(format, ...) LOG(ERROR, format, ##__VA_ARGS__); 

2、效果展示

在这里插入图片描述

八、Socket

1、接口介绍及速览

        // 创建套接字         bool CreateSocket(); // 直接用socket就OK         // 绑定地址信息         bool Bind(const std::string &ip, uint16_t port); // bind         // 开始监听         bool Listen(int blocknum = MAX_BLOCK_NUM); // listen         // 向服务器发起连接         bool Connection(const std::string& ip, uint16_t port); // connect         // 获取新连接         int Accept(); // accept         // 接收数据         ssize_t Recv(void* buff, size_t len, int flag = 0); // recv         // 非阻塞接收数据         ssize_t NonBlockRecv(void* buff, size_t len); // 调用Recv即可,我们最后一个参数设置为非阻塞即可         // 发送数据         ssize_t Send(const void* buff, size_t len, int flag = 0); // send         // 非阻塞发送数据         ssize_t NonBlockSend(void* buff, size_t len); // 调用Send,最后一个参数设置为非阻塞即可         // 关闭套接字         void Close(); // close         // 创建一个监听链接         bool Server(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false); // 1、创建套接字 2、绑定地址 3、监听套接字 4、设置非阻塞 5、地址复用         // 创建一个客户端连接         bool Client(uint16_t port, const std::string& ip); // 1、创建套接字 2、建立连接         // 设计套接字选项--开启地址端口重用         void Reuse(); // 用setsockopt         // 设置套接字阻塞属性--设置为非阻塞         void NonBlock(); // fcntl 

2、代码总览

这里为了方便我们进行测试,我们直接将整个项目的所有头文件都放在下面了:

#include <iostream> #include <string> #include <vector> #include <unistd.h> #include <cassert> #include <cstring> #include <ctime> #include <functional> #include <unordered_map> #include <thread> #include <mutex> #include <condition_variable> #include <memory> #include <typeinfo> #include <fcntl.h> #include <signal.h> #include <netinet/in.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/epoll.h> #include <sys/eventfd.h> #include <sys/timerfd.h> 
#define MAX_BLOCK_NUM 1024 class Socket {     private:         int _sockfd;     public:         Socket():_sockfd(-1){}         Socket(int fd):_sockfd(fd){}         ~Socket(){ Close(); }         int Fd(){ return _sockfd; }         // 创建套接字         bool CreateSocket()         {             /*int socket(int domain, int type, int protocol);*/             // AF_INET             IPv4 Internet protocols          ip(7)             // SOCK_STREAM     Provides sequenced, reliable, two-way, connection-based byte streams.  An out-of-band data  trans‐             //           mission mechanism may be supported.             _sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP); // 创建套接字             if (_sockfd < 0)             {                 ERRLOG("Create socket error!");                 return false;             }             return true;         }         // 绑定地址信息         bool Bind(const std::string &ip, uint16_t port)         {             // int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);             struct sockaddr_in addr; // 先创建这个结构体             addr.sin_family = AF_INET;             addr.sin_port = htons(port);             addr.sin_addr.s_addr = inet_addr(ip.c_str());             socklen_t len = sizeof(struct sockaddr_in);             int ret = bind(_sockfd, (struct sockaddr*)&addr, len);             if (ret < 0)             {                 ERRLOG("BIND FAILED");                 return false;             }             return true;         }         // 开始监听         bool Listen(int blocknum = MAX_BLOCK_NUM)         {             // int listen(int sockfd, int backlog);             int ret = listen(_sockfd, blocknum);             if (ret < 0)             {                 ERRLOG("LISTEN ERROR!");                 return false;             }             return true;         }         // 向服务器发起连接         bool Connection(const std::string& ip, uint16_t port)         {             // int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);             struct sockaddr_in addr; // 先创建这个结构体             addr.sin_family = AF_INET;             addr.sin_port = htons(port);             addr.sin_addr.s_addr = inet_addr(ip.c_str());             socklen_t len = sizeof(struct sockaddr_in);             int ret = connect(_sockfd, (struct sockaddr*)&addr, len);             if (ret < 0)             {                 ERRLOG("CONNECT FAILED");                 return false;             }             return true;         }         // 获取新连接         int Accept()         {             // int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);             int newfd = accept(_sockfd, NULL, NULL);             if (newfd < 0)             {                 ERRLOG("ACCEPT ERROR!");                 return -1;             }             return newfd;         }         // 接收数据         ssize_t Recv(void* buff, size_t len, int flag = 0)         {             // ssize_t recv(int sockfd, void *buf, size_t len, int flags);             ssize_t ret = recv(_sockfd, buff, len, flag);             if (ret < 0)             {                 //EAGAIN 当前socket的接收缓冲区中没有数据了,在非阻塞的情况下才会有这个错误                 //EINTR  表示当前socket的阻塞等待,被信号打断了,                 if (errno == EAGAIN || errno == EINTR)                  {                     INFLOG("JUST NON&&BREAK");                     return 0;//表示这次接收没有接收到数据                 }                 ERRLOG("RECV ERROR!");                 return -1; // 这里-1表示确实是出错了             }             return ret;         }         // 非阻塞接收数据         ssize_t NonBlockRecv(void* buff, size_t len)         {             return Recv(buff, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。         }         // 发送数据         ssize_t Send(const void* buff, size_t len, int flag = 0)         {             // ssize_t send(int sockfd, const void *buf, size_t len, int flags);             ssize_t ret = send(_sockfd, buff, len, flag);             if (ret < 0)             {                 if (errno == EAGAIN || errno == EINTR)                  {                     INFLOG("JUST NON&&BREAK");                     return 0;                 }                 ERRLOG("SEND ERROR!");                 return -1;             }             return ret;         }         // 非阻塞发送数据         ssize_t NonBlockSend(void* buff, size_t len)         {             return Send(buff, len, MSG_DONTWAIT); // MSG_DONTWAIT 表示当前接收为非阻塞。         }         // 关闭套接字         void Close()         {             // int close(int fd);             if (_sockfd != -1)             {                 close(_sockfd);                 _sockfd = -1;             }         }         // 创建一个监听链接         bool Server(uint16_t port, const std::string& ip = "0.0.0.0", bool block_flag = false)         {             // 1、创建套接字 2、绑定地址 3、监听套接字 4、设置非阻塞 5、地址复用             if (CreateSocket() == false) return false;             if (block_flag) NonBlock(); // 设置非阻塞             if (Bind(ip, port) == false) return false;             if (Listen() == false) return false;             Reuse();             return true;         }         // 创建一个客户端连接         bool Client(uint16_t port, const std::string& ip)         {             // 1、创建套接字 2、建立连接             if (CreateSocket() == false) return false;             if (Connection(ip, port) == false) return false;             return true;         }         // 设计套接字选项--开启地址端口重用         void Reuse()         {             // int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen);             int val = 1;             setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void*)&val, sizeof(int)); // 地址进行复用一下             val = 1;             setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void*)&val, sizeof(int)); // 端口也进行复用一下         }         // 设置套接字阻塞属性--设置为非阻塞         void NonBlock()         {             // int fcntl(int fd, int cmd, ... /* arg */ );             int flag = fcntl(_sockfd, F_GETFL, 0);             fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK);         } }; 

3、效果展示

代码展示:

// tcp_svr.cc #include "../source/server.hpp"  // for test int main() {     Socket l_socket;     l_socket.Server(8888);     while (1)     {         int newfd = l_socket.Accept();         if (newfd < 0)         {             continue;         }         Socket cl_socket(newfd); // 重新创建一个         char buff[1024] = {0};         int ret = cl_socket.Recv(buff, 1023);         if (ret < 0)         {             cl_socket.Close();             continue;         }         cl_socket.Send(buff, ret);         cl_socket.Close();     }     l_socket.Close();     return 0; } 
// tcp_cli.cc #include "../source/server.hpp"  int main() {     Socket clent_socket;     clent_socket.Client(8888, "127.0.0.1");     std::string str = "xiangshuijiaole";     clent_socket.Send(str.c_str(), str.size());     char buff[1024] = {0};     clent_socket.Recv(buff, 1023);     DEBLOG("%s", buff);     return 0; } 

在这里插入图片描述

九、Channel

1、简单介绍思路和用法

这个类设计的目的是对描述符的监控事件管理。
其功能有:描述符是否可读,描述符是否可写,对描述符监控可读,对描述符监控可写,解除可读事件监控,解除可写事件监控,解除所有事件监控。而当事件触发后的处理的管理,需要的处理的事件有:可读,可写,挂断,错误,任意,同时我们需要添加事件处理的回调函数。
在这里插入图片描述

2、接口介绍及速览

		// 获取想要监控的事件         uint32_t Events();         // 设置实际就绪的事件         void SetREvents(uint32_t events);         // 设置可读回调函数         void SetReadCallback(const EventCallback& cb);         // 设置可写回调函数         void SetWriteCallback(const EventCallback& cb);         // 设置错误回调函数         void SetErrorCallback(const EventCallback& cb);         // 设置连接关闭回调函数         void SetCloseCallback(const EventCallback& cb);         // 设置所有回调函数         void SetAlleventCallback(const EventCallback& cb);         // 描述符是否监控了可读         bool ReadAble();         // 描述符是否监控了可写         bool WriteAble();         // 对描述符监控可读         void EnableRead();         // 对描述符监控可写         void EnableWrite();         // 解除可读事件监控         void DisableWrite();         // 解除可写事件监控         void DisableRead();         // 解除所有事件监控         void DisableEvent();         // 移除监控         void Move() { /*这个要到evebtloop往后才能用到*/ }         // 连接事件处理         void Handle(); 

3、代码总览

class Channel {     // 以下是epoll_ctl的使用 man 2 epoll_ctl     /*EPOLLIN               The associated file is available for read(2) operations.         EPOLLOUT               The associated file is available for write(2) operations.         EPOLLRDHUP (since Linux 2.6.17)               Stream socket peer closed connection, or shut down writing half of connection.  (This  flag  is  especially               useful for writing simple code to detect peer shutdown when using Edge Triggered monitoring.)         EPOLLPRI               There is urgent data available for read(2) operations.         EPOLLERR               Error condition happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;               it is not necessary to set it in events.         EPOLLHUP               Hang up happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;  it  is               not necessary to set it in events.         EPOLLET               Sets  the  Edge  Triggered  behavior for the associated file descriptor.  The default behavior for epoll is               Level Triggered.  See epoll(7) for more detailed information about Edge and Level Triggered event distribu‐               tion architectures.        EPOLLONESHOT (since Linux 2.6.2)               Sets  the  one-shot  behavior for the associated file descriptor.  This means that after an event is pulled               out with epoll_wait(2) the associated file descriptor is internally disabled and no other  events  will  be               reported  by  the  epoll  interface.   The  user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file               descriptor with a new event mask.*/     private:         int _fd; // 文件描述符         uint32_t _events; // 当前所监控的事件         uint32_t _revents; // 当前所连接触发的事件         using EventCallback = std::function<void()>; // 包装器包装个回调函数         EventCallback _readcallback; // 可读事件回调         EventCallback _writecallback; // 可写事件回调         EventCallback _errorcallback; // 错误事件回调         EventCallback _closecallback; // 连接失效事件回调         EventCallback _eventcallback; // 任意事件回调     public:         // 构造函数         Channel(int fd): _fd(fd), _events(0), _revents(0) {}         // 返回文件描述符         int Fd()         {             return _fd;         }         // 获取想要监控的事件         uint32_t Events() { return _events; }         // 设置实际就绪的事件         void SetREvents(uint32_t events) { _revents = events; }         // 设置可读回调函数         void SetReadCallback(const EventCallback& cb) { _readcallback = cb; }         // 设置可写回调函数         void SetWriteCallback(const EventCallback& cb) { _writecallback = cb; }         // 设置错误回调函数         void SetErrorCallback(const EventCallback& cb) { _errorcallback = cb; }         // 设置连接关闭回调函数         void SetCloseCallback(const EventCallback& cb) { _closecallback = cb; }         // 设置所有回调函数         void SetAlleventCallback(const EventCallback& cb) { _eventcallback = cb; }         // 描述符是否监控了可读         bool ReadAble() { return (_events & EPOLLIN); }         // 描述符是否监控了可写         bool WriteAble() { return (_events & EPOLLOUT); }         // 对描述符监控可读         void EnableRead() { _events |= EPOLLIN; }         // 对描述符监控可写         void EnableWrite() { _events |= EPOLLOUT; }         // 解除可读事件监控         void DisableWrite() { _events &= ~EPOLLIN; }         // 解除可写事件监控         void DisableRead() { _events &= ~EPOLLOUT; }         // 解除所有事件监控         void DisableEvent() { _events = 0;/*全部清空即可*/ }         // 移除监控         void Move() { /*这个要到evebtloop往后才能用到*/ }         // 连接事件处理         void Handle()         {             if ((_revents & EPOLLIN) || (_revents & EPOLLHUP) || (_revents & EPOLLPRI))             {                 if (_readcallback) _readcallback(); // 上面都成立的情况下如果读的回调函数存在则调用读的回调函数                 if (_eventcallback) _eventcallback(); // 总要有一个任意事件的关闭函数吧!不管怎么样都会有一个回调函数的                          }             if (_revents & EPOLLOUT)             {                 if (_writecallback) _writecallback(); // 写入                 if (_eventcallback) _eventcallback(); // 事件处理完毕后,刷新活跃度             }             else if (_revents & EPOLLERR)             {                 if (_eventcallback) _eventcallback();                 if (_errorcallback) _errorcallback(); // 一旦出了错误,就调用错误的回调函数,但前面是加入了任意事件的回调函数的             }             else if (_revents & EPOLLHUP)             {                 if (_eventcallback) _eventcallback();                 if (_closecallback) _closecallback(); // 关闭函数前调用一下任意事件的回调函数             }         } }; 

4、过编译就行

在这里插入图片描述

十、Poller

1、简单介绍

在这里插入图片描述
在这里插入图片描述

2、使用接口简述

private: 	// 向epoll_ctl提供的添加或更新描述符所监控事件 	void Updata(Channel* channel, int op); 	// 判断一个新的Channel是否链接了监控 	bool HasChannel(Channel* channel); public: 	// 添加或更新描述符所监控的事件     void UpdateEvent(Channel* channel);     // 移除描述符监控     void RemoveFd(Channel* channel);     // 开始监控,获取就绪的Channel     void Poll(std::vector<Channel*>* active) 

3、原码

#define MAX_EPOLL_EVENTS 1024 class Epoll {     private:         // epoll操作尺柄         int _epfd;         // struct结构监控时保存所有的活跃事件         struct epoll_event _evs[MAX_EPOLL_EVENTS];         // 使用hash表智理描述符与描述符对应的事件管理Channel对象         std::unordered_map<int, Channel*> _channels;     private:         // 向epoll_ctl提供的添加或更新描述符所监控事件         void Update(Channel* channel, int op)         {             // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);             int fd = channel->Fd();             struct epoll_event ev;             ev.data.fd = fd;             ev.events = channel->Events();             int epcl = epoll_ctl(_epfd, op, fd, &ev);             if (epcl < 0)             {                 ERRLOG("EPOLL_CTL ERROR!");                 return; // 直接退出程序             }         }         // 判断一个新的Channel是否链接了监控         bool HasChannel(Channel* channel)         {             auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符             if (it == _channels.end())             {                 ERRLOG("CHANNEL FIND FAILED!");                 return false;             }             return true;         }     public:         // 构造函数         Epoll()         {             // int epoll_create(int size);             int epct = epoll_create(MAX_EPOLL_EVENTS);             if (epct < 0)             {                 ERRLOG("EPCT EPOLL_CREAT ERROR!");                 abort(); // 退出程序了             }         }         // 添加或更新描述符所监控的事件         void UpdateEvent(Channel* channel)         {             bool ret = HasChannel(channel);              if (ret == false) // 没有的话就添加             {                 return Update(channel, EPOLL_CTL_ADD); // 添加             }             return Update(channel, EPOLL_CTL_MOD); // 修改         }         // 移除描述符监控         void RemoveFd(Channel* channel)         {             auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符             if (it != _channels.end())             {                 _channels.erase(it);             }             Update(channel, EPOLL_CTL_DEL);         }         // 开始监控,获取就绪的Channel         void Poll(std::vector<Channel*>* active)         {             // int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);             int epwt = epoll_wait(_epfd, _evs, MAX_EPOLL_EVENTS, -1);             if (epwt < 0)             {                 if (errno == EINTR)                 {                     return;                 }                 ERRLOG("EPWT EPOLL_WAIT ERROR:%s\n", strerror(errno));                 abort(); // 退出程序             }              for (int i = 0; i < epwt; i++)             {                 auto it = _channels.find(_evs[i].data.fd); // 继续找文件描述符                 if (it == _channels.end()) { return; }                 // 设置实际就绪事件                 it->second->SetREvents(_evs[i].events);                 active->push_back(it->second);             }         } }; 

十一、通过Poller进行Channel的修改

1、修改内容

在这里插入图片描述

将声明和定义放在一起编译报错:
在这里插入图片描述

将声明和定义分离,定义分离到Epoll类后面的话编译成功:
在这里插入图片描述

2、Poller和Channel联调

// tcp_svr.cc #include "../source/server.hpp"   void HandleClose(Channel* channel)  {     std::cout << "close channel fd is sucessful!" << channel->Fd() << std::endl;     channel->Move();     delete channel; } void HandleRead(Channel* channel) {     int fd = channel->Fd();     char buff[1024] = {0};     // ssize_t recv(int sockfd, void *buf, size_t len, int flags);     int ret = recv(fd, buff, 1023, 0);     if (ret <= 0)     {         return HandleClose(channel); // 关闭释放     }     std::cout << buff << std::endl;     channel->EnableWrite(); // 启动可写事件 } void HandleWrite(Channel* channel)  {     int fd = channel->Fd();     const char *data = "I love you!";     int ret = send(fd, data, strlen(data), 0);     if (ret < 0)     {         return HandleClose(channel); // 关闭释放     }     channel->DisableWrite(); // 关闭释放 } void HandleError(Channel* channel)  {     return HandleClose(channel); } void HandleAllEvent(Channel* channel)  {     std::cout << "have a new channel connection" << std::endl; } void Acceptor(Epoll* poller, Channel* ls_channel) {     int fd = ls_channel->Fd();     int newfd = accept(fd, NULL, NULL);     if (newfd < 0) { return; }     Channel* channel = new Channel(poller, newfd);     channel->SetReadCallback(std::bind(HandleRead, channel)); // 可读事件的回调函数     channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件回调函数     channel->SetCloseCallback(std::bind(HandleClose, channel));  // 关闭事件回调函数     channel->SetErrorCallback(std::bind(HandleError, channel));  // 错误事件回调函数     channel->SetAlleventCallback(std::bind(HandleAllEvent, channel));  // 任意事件回调函数     channel->EnableRead(); }  // for test int main() {     Epoll poller;     Socket l_socket;     l_socket.Server(8888);     Channel channel(&poller, l_socket.Fd()); // 监听套接字的     channel.SetReadCallback(std::bind(Acceptor, &poller, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控     channel.EnableRead(); // 启动可读事件监控     while (1)     {         std::vector<Channel*> actives;         poller.Poll(&actives);         for (auto& a : actives)         {             a->Handle();         }     }     l_socket.Close();     return 0; } 
// tcp_cli.cc #include "../source/server.hpp"  int main() {         Socket clent_socket;         clent_socket.Client(8888, "127.0.0.1");         while(1)         {         std::string str = "xiangshuijiaole";         clent_socket.Send(str.c_str(), str.size());         char buff[1024] = {0};         clent_socket.Recv(buff, 1023);         DEBLOG("%s", buff);         sleep(1);     }      return 0; } 

在这里插入图片描述
修改代码(前面Channel和Poller代码有很多bug,我们直接在下面展示我们的修改后的代码):

class Epoll; class Channel {     // 以下是epoll_ctl的使用 man 2 epoll_ctl     /*EPOLLIN               The associated file is available for read(2) operations.         EPOLLOUT               The associated file is available for write(2) operations.         EPOLLRDHUP (since Linux 2.6.17)               Stream socket peer closed connection, or shut down writing half of connection.  (This  flag  is  especially               useful for writing simple code to detect peer shutdown when using Edge Triggered monitoring.)         EPOLLPRI               There is urgent data available for read(2) operations.         EPOLLERR               Error condition happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;               it is not necessary to set it in events.         EPOLLHUP               Hang up happened on the associated file descriptor.  epoll_wait(2) will always wait for this event;  it  is               not necessary to set it in events.         EPOLLET               Sets  the  Edge  Triggered  behavior for the associated file descriptor.  The default behavior for epoll is               Level Triggered.  See epoll(7) for more detailed information about Edge and Level Triggered event distribu‐               tion architectures.        EPOLLONESHOT (since Linux 2.6.2)               Sets  the  one-shot  behavior for the associated file descriptor.  This means that after an event is pulled               out with epoll_wait(2) the associated file descriptor is internally disabled and no other  events  will  be               reported  by  the  epoll  interface.   The  user must call epoll_ctl() with EPOLL_CTL_MOD to rearm the file               descriptor with a new event mask.*/     private:         int _fd; // 文件描述符         uint32_t _events; // 当前所监控的事件         Epoll * _poller;         uint32_t _revents; // 当前所连接触发的事件         using EventCallback = std::function<void()>; // 包装器包装个回调函数         EventCallback _readcallback; // 可读事件回调         EventCallback _writecallback; // 可写事件回调         EventCallback _errorcallback; // 错误事件回调         EventCallback _closecallback; // 连接失效事件回调         EventCallback _eventcallback; // 任意事件回调     public:         // 构造函数         Channel(Epoll* poller, int fd): _fd(fd), _events(0), _revents(0), _poller(poller) {}         // 返回文件描述符         int Fd()         {             return _fd;         }         // 获取想要监控的事件         uint32_t Events() { return _events; }         // 设置实际就绪的事件         void SetREvents(uint32_t events) { _revents = events; }         // 设置可读回调函数         void SetReadCallback(const EventCallback& cb) { _readcallback = cb; }         // 设置可写回调函数         void SetWriteCallback(const EventCallback& cb) { _writecallback = cb; }         // 设置错误回调函数         void SetErrorCallback(const EventCallback& cb) { _errorcallback = cb; }         // 设置连接关闭回调函数         void SetCloseCallback(const EventCallback& cb) { _closecallback = cb; }         // 设置所有回调函数         void SetAlleventCallback(const EventCallback& cb) { _eventcallback = cb; }         // 描述符是否监控了可读         bool ReadAble() { return (_events & EPOLLIN); }         // 描述符是否监控了可写         bool WriteAble() { return (_events & EPOLLOUT); }         // 对描述符监控可读         void EnableRead() { _events |= EPOLLIN; Update(); }         // 对描述符监控可写         void EnableWrite() { _events |= EPOLLOUT; Update(); }         // 解除可读事件监控         void DisableRead() { _events &= ~EPOLLIN; Update(); }         // 解除可写事件监控         void DisableWrite() { _events &= ~EPOLLOUT; Update(); }         // 解除所有事件监控         void DisableEvent() { _events = 0; Update();}         // 移除监控         void Move(); // 声明         // 更新监控         void Update(); // 声明         // 连接事件处理         void Handle()         {             if ((_revents & EPOLLIN) || (_revents & EPOLLHUP) || (_revents & EPOLLPRI))             {                 if (_eventcallback) _eventcallback(); // 总要有一个任意事件的关闭函数吧!不管怎么样都会有一个回调函数的                 if (_readcallback) _readcallback(); // 上面都成立的情况下如果读的回调函数存在则调用读的回调函数                         }             if (_revents & EPOLLOUT)             {                 if (_writecallback) _writecallback(); // 写入                 if (_eventcallback) _eventcallback(); // 事件处理完毕后,刷新活跃度             }             else if (_revents & EPOLLERR)             {                 if (_eventcallback) _eventcallback();                 if (_errorcallback) _errorcallback(); // 一旦出了错误,就调用错误的回调函数,但前面是加入了任意事件的回调函数的             }             else if (_revents & EPOLLHUP)             {                 if (_eventcallback) _eventcallback();                 if (_closecallback) _closecallback(); // 关闭函数前调用一下任意事件的回调函数             }         } };  #define MAX_EPOLL_EVENTS 1024 class Epoll {     private:         // epoll操作尺柄         int _epfd;         // struct结构监控时保存所有的活跃事件         struct epoll_event _evs[MAX_EPOLL_EVENTS];         // 使用hash表智理描述符与描述符对应的事件管理Channel对象         std::unordered_map<int, Channel*> _channels;     private:         // 向epoll_ctl提供的添加或更新描述符所监控事件         void Update(Channel* channel, int op)         {             // int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);             int fd = channel->Fd();             struct epoll_event ev;             ev.data.fd = fd;             ev.events = channel->Events();             int epcl = epoll_ctl(_epfd, op, fd, &ev);             if (epcl < 0)             {                 ERRLOG("EPOLL_CTL ERROR!");             }             return;         }         // 判断一个新的Channel是否链接了监控         bool HasChannel(Channel* channel)         {             auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符             if (it == _channels.end())             {                 ERRLOG("CHANNEL FIND FAILED!");                 return false;             }             return true;         }     public:         // 构造函数         Epoll()         {             // int epoll_create(int size);             _epfd = epoll_create(MAX_EPOLL_EVENTS);             if (_epfd < 0)             {                 ERRLOG("EPCT EPOLL_CREAT ERROR!");                 abort(); // 退出程序了             }         }         // 添加或更新描述符所监控的事件         void UpdateEvent(Channel* channel)         {             bool ret = HasChannel(channel);              if (ret == false) // 没有的话就添加             {                 _channels.insert(std::make_pair(channel->Fd(), channel));                 return Update(channel, EPOLL_CTL_ADD); // 添加             }             return Update(channel, EPOLL_CTL_MOD); // 修改         }         // 移除描述符监控         void RemoveFd(Channel* channel)         {             auto it = _channels.find(channel->Fd()); // 用迭代器去获取文件描述符             if (it != _channels.end())             {                 _channels.erase(it);             }             Update(channel, EPOLL_CTL_DEL);         }         // 开始监控,获取就绪的Channel         void Poll(std::vector<Channel*> *active)         {             // int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);             int epwt = epoll_wait(_epfd, _evs, MAX_EPOLL_EVENTS, -1);             if (epwt < 0)             {                 if (errno == EINTR)                 {                     return;                 }                 ERRLOG("EPWT EPOLL_WAIT ERROR:%s\n", strerror(errno));                 abort(); // 退出程序             }              for (int i = 0; i < epwt; i++)             {                 auto it = _channels.find(_evs[i].data.fd); // 继续找文件描述符                 if (it == _channels.end()) { return; }                 // 设置实际就绪事件                 it->second->SetREvents(_evs[i].events);                 active->push_back(it->second);             }             return;         } };  // 移除监控 void Channel::Move() { return _poller->RemoveFd(this); } // 更新监控 void Channel::Update() { return _poller->UpdateEvent(this); } 

3、回调关系图

在这里插入图片描述

十二、EventLoop

1、eventfd的妙用

在这里插入图片描述
在这里插入图片描述

(1)代码展示

#include <stdio.h> #include <fcntl.h> #include <unistd.h> #include <sys/eventfd.h>  int main() {     int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);     if (efd < 0) { perror("eventfd created error!"); return; }      uint64_t val = 1;     write(efd, &val, sizeof(val));     write(efd, &val, sizeof(val));     write(efd, &val, sizeof(val));     uint64_t res = 0;     read(efd, &res, sizeof(res));     printf("%ld\n", res);     return 0; } 

(2)运行结果

在这里插入图片描述

2、EventLoop设计思路

我们首先要了解,这个模块是与线程是一一对应关联的,这是因为线程安全的问题,因为当一个线程中跑的资源只有这一个,不让其他资源抢占这个线程的话,实现了线程安全的问题。

而当我们监控了一个连接,这个连接一旦就绪了,就要进行事件处理,但是如果这个描述符在多个线程中都出发了事件,都进行处理的话必然会出现线程安全的问题。因此我们需要将一个连接的事件监控以及连接事件处理以及其他操作都放在同一个线程中进行,这样就能实现线程安全了。

那么我们如何能够保证一个连接的所有操作都在EventLoop对应的线程中呢?其解决方案就是给EventLoop模块中添加一个任务队列,并且对连接的所有操作,都进行一次封装,将对连接的操作并不直接执行,而是当做任务添加到任务队列中。

EventLoop的处理流程:
1、在线程中对描述符进行事件监控
2、有描述符就绪则描述符进行事件处理(保证处理回调函数中的操作都在线程中)
3、所有的就绪事件处理完了,这时候再去将任务队列中的任务一一执行。

我们用图看一下:
在这里插入图片描述

介绍一下epoll和task:
1、事件监控:使用Poller模块,有事件就绪则进行事件处理
2、执行任务队列中的任务:一个线程安全的任务队列
注意点:有可能因为等待描述符IO事件就绪导致的执行流流程阻塞,这时候任务队列中的任务将得不到执行,因此得有一个事件通知的东西能够唤醒事件监控的阻塞。
当事件就绪,需要进行处理的时候,处理过程中,如果对连接要进行某些操作,这些操作必须在EventLoop对应的线程中执行,保证对连接的各项操作都是线程安全的。
1、如果执行的操作本就在线程中,不需要将操作压入队列,可以直接执行
2、如果执行的操作不在线程中,才需要加入任务池,等到事件处理完了再执行任务。

3、EventLoop代码

class EventLoop {     private:         using Functor = std::function<void()>;         std::thread::id _threadid; // 线程ID,用于判断是否是同一个线程         int _eventfd; // eventfd唤醒IO操作中有可能引起的阻塞         std::unique_ptr<Channel> _event_channel; // 监控         Epoll _poller; // 进行所有描述符事件的监控         std::vector<Functor> _taskpool; // 任务池         std::mutex _mutex; // 对任务池中的任务进行加锁操作     public:         // 启动任务池所有任务           void RunAllTask()         {             std::vector<Functor> functor;             // 限定作用域,用来实现加锁的操作             {                 std::unique_lock<std::mutex> _lock(_mutex);                 _taskpool.swap(functor); // 交换             }             for (auto& e : functor)             {                 e(); // 回调执行任务             }             return;         }         static int GetEventFd()         {             int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK);             if (efd < 0)              {                 ERRLOG("EVENT CREATE ERROR!!");                 abort();             }             return efd;         }         void ReadEventFd()         {             uint64_t res = 0;             int ret = read(_eventfd, &res, sizeof(res));             if (ret < 0)             {                 //EINTR -- 被信号打断;   EAGAIN -- 表示无数据可读                 if (errno == EINTR || errno == EAGAIN)                 {                     ERRLOG("MEI SHI!!!");                     return;                 }                 ERRLOG("RET READ ERROR!!!");                 abort();             }             return;         }         // 用来唤醒         void WakeUpEventFd()         {             uint64_t val = 1;             int ret = write(_eventfd, &val, sizeof(val));             if (ret < 0)             {                 if (errno == EINTR)                  {                     return;                 }                 ERRLOG("RET WRITE ERROR!!!");                 abort();             }             return;         }     public:         // 构造函数          EventLoop()             : _threadid(std::this_thread::get_id())             , _eventfd(GetEventFd())             , _event_channel(new Channel(this, _eventfd))         {             _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventFd, this)); // 调用设置的回调读函数,读取Eventfd事件的次数             _event_channel->EnableRead(); // 启动EventFd读事件监控         }         // 判断将要执行的任务是否处于当前线程中,如果是则执行,不是则压入队列         void RunInLoop(const Functor& cb)         {             if (IsInLoop())             {                 return cb();             }             return QueueInTask(cb);         }         void QueueInTask(const Functor& cb) // 将操作压入到任务队列中         {             // 限定作用域,对其进行加锁             {                 std::unique_lock<std::mutex> _lock(_mutex);                 _taskpool.push_back(cb);             }             //唤醒有可能因为没有事件就绪,而导致的epoll阻塞;             //其实就是给eventfd写入一个数据,eventfd就会触发可读事件             WakeUpEventFd();         }         // 判断当前线程是否是EventLoop中的线程         bool IsInLoop()         {             return (_threadid == std::this_thread::get_id());         }         // 添加/修改描述符事件监控         void UpdateEvent(Channel *channel)         {             return _poller.UpdateEvent(channel);         }         // 移除描述符事件监控         void RemoveEvent(Channel *channel)         {             return _poller.RemoveFd(channel);         }         // 三步走:事件监控->就绪事件处理->执行任务         void Start()         {             // 1、事件监控             std::vector<Channel*> actives;             _poller.Poll(&actives);             // 2、就绪事件处理             for (auto &a : actives)             {                 a->Handle(); // 处理             }             // 3、执行任务             RunAllTask();         } }; 

4、Channel的修改

在这里插入图片描述
在这里插入图片描述

5、运行代码及结果

tcp_server.cc:
在这里插入图片描述

tcp_client.cc代码不变。

测试结果:
在这里插入图片描述

十三、TimerWheel

1、TimerWheel思路

timerfd:实现内核每隔一段时间,给进程一次超时事件,我们用RefreshEvent接口实现。
timerwheel:实现每次执行Runtimetask,都可以执行一波到期的定时任务
要实现一个完整的秒级定时器,就要把这两个功能整合到一起
timerfd设置为每秒钟触发一次定时事件,当事件被触发,则运行一次timerwheel和runtimetask,执行一下所有的过期的定时任务。

2、代码

using FuncTask = std::function<void()>; // 回调函数,用来回调函数任务的 using ReleaseTask = std::function<void()>; // 回调函数,用来回调释放任务的 class TimerTask {     private:         uint64_t _id; // 定时器任务对象ID         uint32_t _timeout; // 定时器任务超时的时间         bool _cancel;  // 用来取消定时任务, false表示没有被取消,true表示被取消了         FuncTask _taskcb; // 定时器任务要执行的定时任务         ReleaseTask _releasecb; // 定时器任务在时间轮中保存的定时器信息     public:         TimerTask(uint64_t id, uint32_t delay, const FuncTask& cb) // 构造函数             : _id(id)             , _timeout(delay)             , _cancel(false) // 刚开始定为false             , _taskcb(cb)         {}         ~TimerTask() // 析构函数         {             // 在析构函数中进行两个回调函数的创建             if (_cancel == false)                  _taskcb();             _releasecb();         }         void SetRelease(const ReleaseTask& cb) // 设置释放的回调函数         {             _releasecb = cb;         }         uint32_t DelayTimer() // 将延迟时间开放出去         {             return _timeout;         }         void Cancel()         {             _cancel = true;         } }; // 时间轮 class WheelTask {     private:         using SharedPtrTask = std::shared_ptr<TimerTask>; // 用一个shared_ptr,因为有引用计数,当引用计数为0的时候,就是释放资源的时候         using WeakPtrTask = std::weak_ptr<TimerTask>; // weak_ptr用来配合shared_ptr使用         int _tick; // 当前的秒钟,指到哪就将哪释放掉         int _capacity; // 表盘的最大数量(最大延迟时间)         std::vector<std::vector<SharedPtrTask>> _wheel; // 时间轮(二维数组,存放的数据更规整)         std::unordered_map<uint64_t, WeakPtrTask> _timers; // 映射关系                  EventLoop* _loop; // 定时器描述符         int _timerfd; // 定时器描述符--可读事件回调,就是读取计数器,执行定时任务         std::unique_ptr<Channel> _timerchannel;     private:         void RemovePtr(uint64_t id) // 移动指针         {             auto it = _timers.find(id); // 先找到id在_timers中             if (it != _timers.end())             {                 _timers.erase(it); // 删除掉it所指向的位置,也就是id的位置             }         }         static int CreateTimerFd()         {             // 创建一个定时器             int timerfd = timerfd_create(CLOCK_MONOTONIC, 0);             if (timerfd < 0)             {                 ERRLOG("CREATE TIMERFD ERROR!!");                 abort();             }             struct itimerspec iem; // 使用该结构体             /* struct timespec {             time_t tv_sec; // Seconds             long tv_nsec; // Nanoseconds             };             struct itimerspec {             struct timespec it_interval; // 第 次之后的超时间隔时间              struct timespec it_value; // 第 次超时时间              };*/             iem.it_value.tv_sec = 2; // 第一次的超时时间(秒)             iem.it_value.tv_nsec = 0; // 第一次的超时时间(毫秒)             iem.it_interval.tv_sec = 2; // 这次过后的每隔多长时间超时(秒)             iem.it_interval.tv_nsec = 0; // 这次过后的每隔多长时间超时(毫秒)             // 启动定时器             timerfd_settime(timerfd, 0, &iem, NULL); // 启动定时器             return timerfd;         }         void ReadTimerFd()         {             uint64_t temp;             int fd = read(_timerfd, &temp, 8);             if (fd < 0)             {                 ERRLOG("READ FAILED!!");                 abort();             }             return;         }         void RunTimerTask() // 时钟滴答滴答往后走         {             _tick = (_tick + 1) % _capacity;             _wheel[_tick].clear(); // 清空一下         }         void OnTimer() // 时间到了的函数         {             ReadTimerFd(); // 读取一下             RunTimerTask(); // 任务运行起来         }         void TimerAddInLoop(uint64_t id, uint32_t delay, const FuncTask& cb) // 添加到定时任务         {             SharedPtrTask pt(new TimerTask(id, delay, cb)); // 先建立一个shared_ptr             pt->SetRelease(std::bind(&WheelTask::RemovePtr, this, id)); // 有一个this指针             int pos = (_tick + delay) % _capacity;             _wheel[pos].push_back(pt); // 将这个pt先插入到哈希表中             _timers[id] = WeakPtrTask(pt); // 对象中不能用shared_ptr,因为shared_ptr不会减减引用计数,所以用weak_ptr         }         void TimerRefreshInLoop(uint64_t id) // 刷新/延迟定时时间         {             auto it = _timers.find(id);             if (it == _timers.end())             {                 //perror("_timers find error!");                 ERRLOG("TIMERS FREASH ERROR!!");                 return;             }             SharedPtrTask pt = it->second.lock();//lock获取weak_ptr管理的对象对应的shared_ptr             int delay = pt->DelayTimer();             int pos = (_tick + delay) % _capacity;             _wheel[pos].push_back(pt);         }         void CancelTimerInloop(uint64_t id) // 取消定时任务         {             auto it = _timers.find(id);             if (it == _timers.end())             {                 //perror("_timers not find!");                 ERRLOG("CANCELTIMER ERROR!!!");                 return;             }             SharedPtrTask pt = it->second.lock();             if (pt) pt->Cancel();         }     public:         WheelTask(EventLoop* loop)             : _tick(0)             , _capacity(60)             , _wheel(_capacity)             , _loop(loop)             , _timerfd(CreateTimerFd())             , _timerchannel(new Channel(_loop, _timerfd))         {             _timerchannel->SetReadCallback(std::bind(&WheelTask::OnTimer, this));             _timerchannel->EnableRead(); // 启动可读事件         }         // 定时器中有个_timers的成员,定时器信息的操作是有可能在多个线程中进行,因此需要考虑线程安全的问题         // 如果不想加锁,那么就将定时器的所有操作都放在同一个线程中进行,放到EventLoop中即可         void TimerAdd(uint64_t id, uint32_t delay, const FuncTask& cb);         void TimerRefresh(uint64_t id);         void CancelTimer(uint64_t id);         // 这个接口是有线程安全的问题,不能外界使用者调用,只能在内部,在EventLoop线程内执行         bool HasTimer(uint64_t id)         {             auto ret = _timers.find(id);             if (ret == _timers.end())             {                 ERRLOG("HASTIMER FIND FAILED!!");                 return false;             }             return true;         } }; 

3、修改EventLoop

在这里插入图片描述

4、运行代码及结果

// tcp_svr.cc: #include "../source/server.hpp"   void HandleClose(Channel* channel)  {     //std::cout << "close channel fd is sucessful!" << channel->Fd() << std::endl;     DEBLOG("%d", channel->Fd());     channel->Move();     delete channel; } void HandleRead(Channel* channel) {     int fd = channel->Fd();     char buff[1024] = {0};     // ssize_t recv(int sockfd, void *buf, size_t len, int flags);     int ret = recv(fd, buff, 1023, 0);     if (ret <= 0)     {         return HandleClose(channel); // 关闭释放     }     DEBLOG("%s", buff);     //std::cout << buff << std::endl;     channel->EnableWrite(); // 启动可写事件 } void HandleWrite(Channel* channel)  {     int fd = channel->Fd();     const char *data = "I love you!";     int ret = send(fd, data, strlen(data), 0);     if (ret < 0)     {         return HandleClose(channel); // 关闭释放     }     channel->DisableWrite(); // 关闭释放 } void HandleError(Channel* channel)  {     return HandleClose(channel); } void HandleAllEvent(EventLoop* loop, Channel* channel, uint64_t timerid)  {     loop->TimerRefresh(timerid); // 刷新     //std::cout << "have a new channel connection" << std::endl; } void Acceptor(EventLoop* loop, Channel* ls_channel) {     int fd = ls_channel->Fd();     int newfd = accept(fd, NULL, NULL);     if (newfd < 0) { return; }     uint64_t timerid = rand() % 10000;     Channel* channel = new Channel(loop, newfd);     channel->SetReadCallback(std::bind(HandleRead, channel)); // 可读事件的回调函数     channel->SetWriteCallback(std::bind(HandleWrite, channel)); // 可写事件回调函数     channel->SetCloseCallback(std::bind(HandleClose, channel));  // 关闭事件回调函数     channel->SetErrorCallback(std::bind(HandleError, channel));  // 错误事件回调函数     channel->SetAlleventCallback(std::bind(HandleAllEvent, loop, channel, timerid));  // 任意事件回调函数     // 非活跃连接超时释放操作     // 注意的是:定时任务一定在读事件启动之前,因为有可能启动事件监控后,立即就有了事件,但是这时候还没有任务产生     loop->TimerAdd(timerid, 10, std::bind(HandleClose, channel));     channel->EnableRead(); }  // for test int main() {     srand(time(NULL)); // 生成一个随机数种子     // Epoll poller;     EventLoop loop;     Socket l_socket;     l_socket.Server(8889);     Channel channel(&loop, l_socket.Fd()); // 监听套接字的     channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控     channel.EnableRead(); // 启动可读事件监控     while (1)     {         loop.Start();     }     l_socket.Close();     return 0; } 
// tcp_cli.cc #include "../source/server.hpp"  int main() {         Socket clent_socket;         clent_socket.Client(8889, "127.0.0.1");         //while(1)         for (int i = 0; i < 5; i++)         {         std::string str = "xiangshuijiaole";         clent_socket.Send(str.c_str(), str.size());         char buff[1024] = {0};         clent_socket.Recv(buff, 1023);         DEBLOG("%s", buff);         sleep(1);     }     while(1)     {         sleep(1);     }     return 0; } 

在这里插入图片描述

十四、Eventloop的模块流程图

在这里插入图片描述

十五、Connection

1、Connection设计思路

(1)目的

对连接进行全方位的管理,对通信连接的所有操作都是通过这个模块提供的功能完成的。

(2)管理

套接字的管理,能够进行套接字的操作,socket
连接事件的管理,可读、可写、错误、挂断、任意
缓冲区的管理,便于socket数据的接收和发送
协议上下文的管理,记录请求数据的处理过程
回调函数的管理
因为连接接收到数据之后该如何处理,都要由用户决定,因此必须有业务处理回调函数
一个连接建立成立后,该如何处理,由用户决定,因此必须有连接建立成功的回调函数。
一个连接关闭前,该如何处理,由用户决定,因此必须由关闭连接回调函数。
任意事件的产生,有没有某些处理,由用户决定,因此必须有任意事件的回调函数。

(3)功能

发送数据:给用户提供的发送数据接口,并不是真正的发送接口,而只是把数据放到发送缓冲区,然后启动写事件监控。
关闭连接:给用户提供的关闭连接接口,应该在实际释放连接之前,看看输入输出缓冲区是否有数据待处理
启动非活跃连接的超时销毁功能
取消非活跃连接的超时功能
协议切换:一个连接接收到数据后如何进行业务处理,取决于上下文以及数据的业务处理回调功能。

(4)场景

Connection模块是对连接的管理模块,对于连接的所有操作都是通过这个模块完成的场景:对连接进行操作的时候,但是连接已经被释放,导致内存访问错误,最终程序崩溃解决方案:使用智能指针shared ptr对Connection对象进行管理,这样就能保证任意一个地方对Connection对象进行操作的时候保存了一份shared ptr,因此就算其他地方进行释放操作,也只是对shared ptr的计数器-1,而不会导致Connection的实际释放

2、预备工作:加上Any类

// 先搭个框架--通用容器的函数 class Any {     private:         class Holder         {             public:                 virtual ~Holder(){} // 析构函数                 // 纯虚函数的定义在于只让它的子类对象刻画出对象,                 // 不让父类对象刻画出对象,就像动物包含着老虎狮子等的,老虎狮子是对象,而动物不是对象                 virtual const std::type_info& type() = 0;                  virtual Holder* clone() = 0;         };         template<class T>         class PlaceHolder : public Holder         {             public:                 // 构造函数                 PlaceHolder(const T& val) :_val(val) {}                 // 类型                 virtual const std::type_info& type()                 {                     return typeid(T); // 直接返回我们模版实例化的参数的类型即可                 }                 // 针对当前的函数对象,克隆出一个新的子类对象                 virtual Holder* clone()                 {                     return new PlaceHolder(_val);                 }             public:                 T _val; // 构造一个模版类型的数据         };         Holder *_content; // 父类的指针可以指向任何类型的数据(这个是Any类型的私有函数)     public:         // 空的构造函数         Any():_content(NULL){}         // 模版T类型的构造函数         template<class T>         Any(const T& val):_content(new PlaceHolder<T>(val)){}         // 通过其他的通用容器来构造我们自己的容器         Any(const Any& other):_content(other._content ? other._content->clone() : NULL){}         // 析构函数         ~Any()         {             delete _content; // 释放掉这个_content         }         // 得到子类对象保存的数据的指针         template<class T>         T* get()         {             // 当前对象的类型不能和传进来的实例化模版的类型是一样的             assert(_content->type() == typeid(T));             // 先强转,再指向_val,再取地址             return &((PlaceHolder<T>*)_content)->_val;         }         Any& Swap(Any& other)         {             std::swap(_content, other._content); // 调用算法函数直接交换this和other对象的指针             return *this; // 返回实例化的this解引用         }         // operator=的任意模板类型的运算符重载函数         template<class T>         Any& operator=(const T& val)         {             //为val构造一个临时的通用容器,然后与当前容器自身进行指针交换,临时对象释放的时候,原先保存的数据也就被释放             Any(val).Swap(*this);             return *this;         }         // operator=的对象的运算符重载函数         Any& operator=(const Any& other)         {             Any(other).Swap(*this);             return *this;         } }; 

3、Connection代码

class Connection; // DISCONNECTED -- 连接关闭状态;   CONNECTING -- 连接建立成功-待处理状态; // CONNECTED -- 连接建立完成,各种设置已完成,可以通信的状态;  DISCONNECTING -- 待关闭状态; typedef enum { DISCONNECTED, CONNECTING, CONNECTED, DISCONNECTING }Constatus; using PtrConnection = std::shared_ptr<Connection>; // 用智能指针管理Connection class Connection : public std::enable_shared_from_this<Connection> // 为了给下面用 {     private:         uint64_t _con_id; // 连接的唯一ID,便于连接的管理和查找,同时也是定时器的ID,因为连接的唯一ID是唯一的,可以作为定时器唯一ID处理         int _sockfd; // 连接关联的文件描述符         bool _is_enable_released; // 连接是否启动非活跃销毁的判断标志,默认为false         EventLoop* _loop; // 连接所关联的EventLoop,也就是只在同一个线程中跑         Constatus _constatus; // 连接状态         Socket _socket; // 套接字的操作管理         Channel _channel; // 连接的事件管理         Buffer _buffer_in; // 输入缓冲区---存放从socket中读取到的数据         Buffer _buffer_out; // 输出缓冲区---存放要发送给对端的数据         Any _context; // 请求连接的上下文(这里需要保存的)          // 下面四个回调函数是由组件者(我们使用者)来完成的         using ConnectedCallback = std::function<void(const PtrConnection&)>;         using MessageCallback = std::function<void(const PtrConnection&, Buffer*)>;         using ClosedCallback = std::function<void(const PtrConnection&)>;         using AnyCallback = std::function<void(const PtrConnection&)>;         ConnectedCallback _conn_callback;         MessageCallback _msg_callback;         ClosedCallback _clo_callback;         AnyCallback _any_callback;         // 组件内的连接关闭回调--组件内设置的,因为服务器组件内会把所有的连接管理起来,一旦某个连接要关闭         // 就应该从管理的地方移除掉自己的信息         ClosedCallback _server_clo_callback;     private:         // 五个channel的事件回调函数         // 描述符可读事件触发后调用的函数,接收socket数据放到接收缓冲区中,然后调用_message_callback         void HandleRead()         {             // 1、接收socket数据放到接收缓冲区             char buff[65536];             ssize_t ret = _socket.NonBlockRecv(buff, 65535);             if (ret < 0)             {                 // 此时是出错了,我们不能调用实际的关闭连接的操作,而应当调用Shutdown的操作                 return ShutdownInLoop();             }             // 将数据放入缓冲区,顺便将写偏移向后移动             _buffer_in.WriteAndPush(buff, ret);             // 2、调用_message_callback             if (_buffer_in.ReadAbleSize() > 0)             {                 // shared_from_this--从当前对象自身获取自身的shared_ptr管理对象                 return _msg_callback(shared_from_this(), &_buffer_in);             }         }         // 描述符可写事件触发后调用的函数,将发送缓冲区中的数据进行发送         void HandleWrite()         {             // _out_buffer中保存的数据就是要发送的数据             ssize_t ret = _socket.NonBlockSend(_buffer_out.ReadPos(), _buffer_out.ReadAbleSize());             if (ret < 0)             {                 // 发送错误就需要实际意义的关闭了                 // 有数据先读出去                 if (_buffer_in.ReadAbleSize() > 0)                 {                     _msg_callback(shared_from_this(), &_buffer_in);                 }                 return Release();             }             // 将读事件的偏移往后移动             _buffer_out.ReadOffset(ret);             if (_buffer_out.ReadAbleSize() == 0)             {                 _channel.DisableWrite(); // 没有事件发送了,关闭写事件                 if (_constatus == DISCONNECTING)                 {                     return Release();                 }             }             return;         }         // 描述符触发关闭         void HandleClose()         {             // 一旦连接挂断了,套接字就什么都干不了了,因此有数据待处理就处理一下,完毕关闭连接             if (_buffer_in.ReadAbleSize() > 0)             {                 _msg_callback(shared_from_this(), &_buffer_in);             }             return Release();         }         // 描述符触发出错         void HandleError()         {             return HandleClose();         }         // 描述符触发任意事件: 1. 刷新连接的活跃度--延迟定时销毁任务;  2. 调用组件使用者的任意事件回调         void HandleAllEvent()         {             if (_is_enable_released == true)  {  _loop->TimerRefresh(_con_id); }             if (_any_callback)  {  _any_callback(shared_from_this()); }         }         // 连接获取之后,所处的状态下要进行各种设置(启动读监控, 调用回调函数)         void EstablishedInloop()         {             // 1、修改连接状态 2、启动读事件监控 3、调用回调函数             assert(_constatus == CONNECTING); // 当前的状态必须一定是上层的半连接状态             _constatus = CONNECTED;             // 一旦启动读事件监控就有可能会立即触发读事件,如果这时候启动了非活跃连接销毁             _channel.EnableRead();             if (_conn_callback)              {                 _conn_callback(shared_from_this());             }         }         // 发送数据到缓冲区,这个接口并不是实际要发送的接口,而是把数据放到了发送缓冲区并启动可写事件监控         void SendInLoop(Buffer &buff)         {             if (_constatus == DISCONNECTED) return;             _buffer_out.WriteBufferAndPush(buff);             if (_channel.WriteAble() == false)             {                 _channel.EnableWrite();             }         }         // 实际释放接口         void ReleaseInLoop()         {             // 1. 修改连接状态,将其置为DISCONNECTED             _constatus = DISCONNECTED;             // 2. 移除连接的事件监控             _channel.Move();             // 3. 关闭描述符             _socket.Close();             // 4. 如果当前定时器队列中还有定时销毁任务,则取消任务             if (_loop->HasTimer(_con_id)) CancelInactiveReleaseInLoop();             // 5. 调用关闭回调函数,避免先移除服务器管理的连接信息导致Connection被释放,再去处理会出错,因此先调用用户的回调函数             if (_clo_callback) _clo_callback(shared_from_this());             // 移除服务器内部管理的连接信息             if (_server_clo_callback) _server_clo_callback(shared_from_this());         }         // 关闭接口的操作并不是实际的连接释放的操作,我们需要判断是否还有数据待处理、待发送         void ShutdownInLoop()         {             _constatus = DISCONNECTING; // 设置连接为半关闭状态             if (_buffer_in.ReadAbleSize() > 0)             {                 if (_msg_callback) _msg_callback(shared_from_this(), &_buffer_in);             }             // 写入数据出错或者没有发送数据导致的关闭             if (_buffer_out.ReadAbleSize() > 0)             {                 if (_channel.WriteAble() == false)                 {                     _channel.EnableWrite();                 }             }             if (_buffer_out.ReadAbleSize() == 0)             {                 Release();             }         }         // 启动非活跃任务销毁,启动定时任务,定义多长时间无通信就是非活跃并销毁         void SetInactiveReleaseInLoop(int sec)         {             // 1. 将判断标志 _is_enable_released 置为true             _is_enable_released = true;             // 2. 如果当前定时销毁任务已经存在,那就刷新延迟一下即可             if (_loop->HasTimer(_con_id))             {                 return _loop->TimerRefresh(_con_id);             }             // 3. 如果不存在定时销毁任务,则新增             _loop->TimerAdd(_con_id, sec, std::bind(&Connection::Release, this));         }         // 取消非活跃任务的销毁         void CancelInactiveReleaseInLoop()         {             _is_enable_released = false;             if (_loop->HasTimer(_con_id))             {                 _loop->CancelTimer(_con_id);             }         }         // 切换协议的接口         void UpgradeInLoop(const Any& content, const ConnectedCallback& conn, const MessageCallback& msg,                          const ClosedCallback& clo, const AnyCallback& any)         {             _context = content;             _conn_callback = conn;             _msg_callback = msg;             _clo_callback = clo;             _any_callback = any;         }     public:         // 构造函数         Connection(EventLoop *loop, uint64_t conn_id, int sockfd)             : _con_id(conn_id)             , _sockfd(sockfd)             , _is_enable_released(false)             , _loop(loop)             , _constatus(CONNECTING)             , _socket(_sockfd)             , _channel(loop, _sockfd)         {             _channel.SetCloseCallback(std::bind(&Connection::HandleClose, this));             _channel.SetAlleventCallback(std::bind(&Connection::HandleAllEvent, this));             _channel.SetReadCallback(std::bind(&Connection::HandleRead, this));             _channel.SetWriteCallback(std::bind(&Connection::HandleWrite, this));             _channel.SetErrorCallback(std::bind(&Connection::HandleError, this));         }         // 析构函数         ~Connection()          {              std::cout << "~Connection" << std::endl;             DEBLOG("RELEASE CONNECTION:%p", this);         }         // 获取管理文件描述符         int Fd() { return _sockfd; }         // 获取连接ID         int Id() { return _con_id; }         // 判断是否处于CONNECTED状态         bool IsConnected() { return (_constatus == CONNECTED); }         // 设置上下文--连接建立完成时进行调用         void SetContext(const Any& context) { _context = context; }         // 获取上下文,返回指针         Any* GetContext() { return &_context; }         // 以下是五个回调函数(四个组件使用者使用的回调函数+一个组件内的连接关闭回调)--设置         void SetConnectedCallback(const ConnectedCallback& cb) { _conn_callback = cb; }         void SetMessageCallback(const MessageCallback& cb) { _msg_callback = cb; }         void SetClosedCallback(const ClosedCallback& cb) { _clo_callback = cb; }         void SetAnyCallback(const AnyCallback& cb) { _any_callback = cb; }         void SerServerClosedCallback(const ClosedCallback& cb) { _server_clo_callback = cb; }         // 连接建立就绪后,进行channel回调设置,启动读监控,调用_connected_callback         void Established()         {             _loop->RunInLoop(std::bind(&Connection::EstablishedInloop, this));         }         // Send,发送数据到缓冲区,启动写事件监控         void Send(const char* data, size_t len)         {             // 外界传入的data,可能是个临时的空间,我们现在只是把发送操作压入了任务池,有可能并没有被立即执行             // 因此有可能执行的时候,data指向的空间有可能已经被释放了             Buffer buff;             buff.WriteAndPush(data, len);             _loop->RunInLoop(std::bind(&Connection::SendInLoop, this, std::move(buff)));         }         // 提供给组件使用者的关闭接口--并不实际关闭,需要判断有没有数据待处理         void Shutdown()         {             _loop->RunInLoop(std::bind(&Connection::ShutdownInLoop, this));         }         void Release()         {             _loop->QueueInTask(std::bind(&Connection::ReleaseInLoop, this));         }         // 启动非活跃任务销毁,启动定时任务,定义多长时间无通信就是非活跃并销毁         void SetInactiveRelease(int sec)         {             _loop->RunInLoop(std::bind(&Connection::SetInactiveReleaseInLoop, this, sec));         }         // 取消非活跃任务的销毁         void CancelInactiveRelease()         {             _loop->RunInLoop(std::bind(&Connection::CancelInactiveReleaseInLoop, this));         }         // 切换协议,重置上下文以及阶段性的函数--用四个回调函数(因为是有线程安全,所以必须放到线程中执行)         void Upgrade(const Any& content, const ConnectedCallback& conn, const MessageCallback& msg,                          const ClosedCallback& clo, const AnyCallback& any)         {             // 防备新的事件触发后,处理的时候,切换任务还没有被执行--会导致数据使用原协议处理了。             _loop->AssertLoop(); // 必须在EventLoop中立即执行             _loop->RunInLoop(std::bind(&Connection::UpgradeInLoop, this, content, conn, msg, clo, any));         } }; 

4、纠错修改(必看)

大家看一下下面的Socket代码中的Recv函数,是不是就缺了个小于等于号???这有什么问题???其实问题大着呢!我们在客户端ctrl+c中断程序时候,如果对方正在发送数据,并且此时recv函数正处于阻塞状态等待数据到达,而你又中断了程序,导致链接关闭,那么recv函数会返回0,表示连接已经关闭,接收端不再接收数据,所以所以,切记我们在客户端ctrl+c的时候,recv返回的是0,而不是负数!!!所以这里必须将前面的Socket的值改成<=!!!

在这里插入图片描述
在这里插入图片描述

5、运行代码及结果

// tcp_svr.cc #include "../source/server.hpp"  std::unordered_map<uint64_t, PtrConnection> _conns; uint64_t con_id = 0; void DestroyConnection(const PtrConnection& ptrc) {     _conns.erase(ptrc->Id()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     // conn->Shutdown(); }  void Acceptor(EventLoop* loop, Channel* ls_channel) {     int fd = ls_channel->Fd();     int newfd = accept(fd, NULL, NULL);     if (newfd < 0) { return; }     con_id++;     PtrConnection conn(new Connection(loop, con_id, newfd));     conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));     conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));     conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1));      conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁     conn->Established(); // 就绪初始化     _conns.insert(std::make_pair(con_id, conn)); }  // for test int main() {     //srand(time(NULL)); // 生成一个随机数种子     // Epoll poller;     EventLoop loop;     Socket l_socket;     l_socket.Server(8889);     Channel channel(&loop, l_socket.Fd()); // 监听套接字的     channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控     channel.EnableRead(); // 启动可读事件监控     while (1)     {         loop.Start();     }     l_socket.Close();     return 0; } 
// tcp_cli.cc #include "../source/server.hpp"  int main() {         Socket clent_socket;         clent_socket.Client(8889, "127.0.0.1");         //while(1)         for (int i = 0; i < 5; i++)         {         std::string str = "xiangshuijiaole";         clent_socket.Send(str.c_str(), str.size());         char buff[1024] = {0};         clent_socket.Recv(buff, 1023);         DEBLOG("%s", buff);         sleep(1);     }     while(1)     {         sleep(1);     }     return 0; } 

在这里插入图片描述

在这里插入图片描述

// tcp_svr.cc #include "../source/server.hpp"  std::unordered_map<uint64_t, PtrConnection> _conns; uint64_t con_id = 0; void DestroyConnection(const PtrConnection& ptrc) {     _conns.erase(ptrc->Id()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     conn->Shutdown(); }  void Acceptor(EventLoop* loop, Channel* ls_channel) {     int fd = ls_channel->Fd();     int newfd = accept(fd, NULL, NULL);     if (newfd < 0) { return; }     con_id++;     PtrConnection conn(new Connection(loop, con_id, newfd));     conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));     conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));     conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1));      conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁     conn->Established(); // 就绪初始化     _conns.insert(std::make_pair(con_id, conn)); }  // for test int main() {     //srand(time(NULL)); // 生成一个随机数种子     // Epoll poller;     EventLoop loop;     Socket l_socket;     l_socket.Server(8889);     Channel channel(&loop, l_socket.Fd()); // 监听套接字的     channel.SetReadCallback(std::bind(Acceptor, &loop, &channel)); // 回调中,获取新连接,为新连接创建Channel并添加监控     channel.EnableRead(); // 启动可读事件监控     while (1)     {         loop.Start();     }     l_socket.Close();     return 0; } 
// tcp_cli.cc #include "../source/server.hpp"  int main() {         Socket clent_socket;         clent_socket.Client(8889, "127.0.0.1");         //while(1)         for (int i = 0; i < 5; i++)         {         std::string str = "xiangshuijiaole";         clent_socket.Send(str.c_str(), str.size());         char buff[1024] = {0};         clent_socket.Recv(buff, 1023);         DEBLOG("%s", buff);         sleep(1);     }     while(1)     {         sleep(1);     }     return 0; } 

在这里插入图片描述

十六、Acceptor

1、Acceptor设计思路

Acceptor模块说实在的就是对监听套接字进行管理。我们这样就不用在服务端用listen_socket套接字了,而只需要用这一个Acceptor模块进行操作就可以了。

(1)创建一个监听套接字

用Socket _socket:
在这里插入图片描述

(2)启动读事件监控

在这里插入图片描述

(3)事件触发后,获取新连接

(4)调用新连接获取成功后的回调函数

  也就是为新连接创建Connection进行管理,而这个创建Connection是服务器模块的操作,并不是Acceptor模块的操作。因为Acceptor模块只进行监听链接的管理,因此获取到新连接的描述符后,对于新连接如何处理根本不关心,你交给服务器模块处理就可以了。 

而服务器模块实现了一个对于新连接描述符处理的函数,将这个函数设置给Acceptor模块中的回调函数。
在这里插入图片描述

2、代码

class Acceptor {     private:         Socket _socket; // 创建套接字         EventLoop* _loop; // 对_socket套接字进行事件监控         Channel _channel; // 对_socket套接字进行事件管理          using AcceptorCallback = std::function<void(int)>;         AcceptorCallback _acceptor_callback;     private:         // 监听套接字的读事件回调处理函数--获取新连接,调用_acceptor_callback函数进行新连接处理         void HandleRead()         {             int newfd = _socket.Accept();             if (newfd < 0)             {                 ERRLOG("NEWFD ACCEPT ERROR!");                 return;             }             if (_acceptor_callback) _acceptor_callback(newfd);         }         int CreateServerFd(int port)         {             bool ret = _socket.Server(port);             if (ret == false)             {                 ERRLOG("RET SERVER ERROR!!");                 abort();             }             return _socket.Fd(); // 返回文件描述符         }     public:         // 构造函数         // 不能将启动读事件监控,放到构造函数中,必须在设置回调函数后,再去启动         // 否则有可能造成启动监控后,立即有事件,处理的时候,回调函数还没设置:新连接得不到处理,且资源泄漏         Acceptor(EventLoop* loop, int port)              :_socket(CreateServerFd(port))             , _loop(loop)             , _channel(_loop, _socket.Fd())         {             _channel.SetReadCallback(std::bind(&Acceptor::HandleRead, this));         }         void SetAcceptorCallback(const AcceptorCallback& cb) { _acceptor_callback = cb;  }         void Listen()         {             _channel.EnableRead();         } }; 

3、运行代码及结果

// tcp_svr.cc #include "../source/server.hpp"  std::unordered_map<uint64_t, PtrConnection> _conns; uint64_t con_id = 0; EventLoop loop;  void DestroyConnection(const PtrConnection& ptrc) {     _conns.erase(ptrc->Id()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     conn->Shutdown(); }  void NewAcceptor(int fd) {     con_id++;     PtrConnection conn(new Connection(&loop, con_id, fd));     conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));     conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));     conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1));      conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁     conn->Established(); // 就绪初始化     _conns.insert(std::make_pair(con_id, conn)); }  // for test int main() {     srand(time(NULL)); // 生成一个随机数种子     Acceptor acceptor(&loop, 8889);     acceptor.SetAcceptorCallback(std::bind(NewAcceptor, std::placeholders::_1)); // 回调中,获取新连接,为新连接创建Channel并添加监控     acceptor.Listen();     while (1)     {         loop.Start();     }     return 0; } 
// tcp_cli.cc #include "../source/server.hpp"  int main() {         Socket clent_socket;         clent_socket.Client(8889, "127.0.0.1");         //while(1)         for (int i = 0; i < 5; i++)         {         std::string str = "xiangshuijiaole";         clent_socket.Send(str.c_str(), str.size());         char buff[1024] = {0};         clent_socket.Recv(buff, 1023);         DEBLOG("%s", buff);         sleep(1);     }     while(1)     {         sleep(1);     }     return 0; } 

在这里插入图片描述

十七、LoopThread

1、设计思路

在这里插入图片描述

2、代码

顺带先把EventLoop中的Start()中先进行循环打印。
在这里插入图片描述

class LoopThread {     private:         // 用于实现_1oop获取的同步关系,避免线程创建了,但是_1oop还没有实例化之前去获取_1oop         // 因为此时_loop为空         std::mutex _mutex; // 互斥锁         std::condition_variable _con; // 条件变量         EventLoop* _loop; // EventLoop指针变量,这个对象需要在线程内实例化         std::thread _thread; // EventLoop对应的线程     private:         // 这个函数就是EventLoop模块中的Start,三步走:事件监控->就绪事件处理->执行任务         void ThreadEntry()         {             EventLoop loop; // 实例化对象             // 规定作用域             {                 std::unique_lock<std::mutex> lock(_mutex); // 加锁                 _loop = &loop;                 _con.notify_all(); // 唤醒loop阻塞的线程             }             loop.Start(); // 一直循环打印         }     public:         // 构造函数,创建线程,设置函数入口即ThreadEntry()         LoopThread()             : _loop(NULL)             , _thread(std::thread(&LoopThread::ThreadEntry, this))         {}         // 返回当前线程关联的EventLoop对象的指针         EventLoop* GetLoop()         {             EventLoop* loop = NULL;             // 规定作用域             {                 std::unique_lock<std::mutex> lock(_mutex); // 加锁                 _con.wait(lock, [&](){ return _loop != NULL; }); // loop为NULL就一直循环阻塞                 loop = _loop; // 赋值             }             return loop;         } }; 

3、运行结果

// tcp_svr.cc // tcp_svr.cc #include "../source/server.hpp"  std::unordered_map<uint64_t, PtrConnection> _conns; uint64_t con_id = 0; EventLoop base_loop; std::vector<LoopThread> threads(2); int next_loop = 0;  void DestroyConnection(const PtrConnection& ptrc) {     _conns.erase(ptrc->Id()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     conn->Shutdown(); }  void NewAcceptor(int fd) {     con_id++;     next_loop = (next_loop + 1) % 2;     PtrConnection conn(new Connection(threads[next_loop].GetLoop(), con_id, fd));     conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));     conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));     conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1));      conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁     conn->Established(); // 就绪初始化     _conns.insert(std::make_pair(con_id, conn)); }  // for test int main() {     srand(time(NULL)); // 生成一个随机数种子     Acceptor acceptor(&base_loop, 8888);     acceptor.SetAcceptorCallback(std::bind(NewAcceptor, std::placeholders::_1)); // 回调中,获取新连接,为新连接创建Channel并添加监控     acceptor.Listen();     while (1)     {         base_loop.Start();     }     return 0; } 

tcp_cli和之前一样。

在这里插入图片描述

十八、LoopThreadPool

1、设计思路

在这里插入图片描述

2、代码

class LoopThreadPool {     private:         int _reactor_count; // 从属线程数量         int _next_loop_idx; // 下一个从属线程的id         EventLoop* _base_loop; // 如果从属线程为0的时候,就用这个EventLoop的单个线程咯         std::vector<LoopThread*> _threads; // 保存所有的EventLoop对象         std::vector<EventLoop*> _loops; // 从属线程大于0则从_loops中进行EventLoop分配     public:         // 构造函数         LoopThreadPool(EventLoop* baseloop)             : _reactor_count(0)             , _next_loop_idx(0)             , _base_loop(baseloop)         {}         // 设置从属线程数量         void SetThreadCount(int count)         {             _reactor_count = count;         }         // 创建从属线程         void Create()         {             // 只有在主线程有的情况下并且从属线程也有的情况下,也就是数量大于0的时候             if (_reactor_count > 0)             {                 _threads.resize(_reactor_count);                 _loops.resize(_reactor_count);                 for (int i = 0; i < _reactor_count; i++)                 {                     _threads[i] = new LoopThread();                     _loops[i] = _threads[i]->GetLoop();                 }             }             return;         }         // 下一个从属线程         EventLoop* NextLoop()         {             // 只有一个主线程的情况下,就直接返回主线程即可             if (_reactor_count == 0)             {                 return _base_loop;             }             // 轮转             _next_loop_idx = (_next_loop_idx + 1) % _reactor_count;             return _loops[_next_loop_idx];         } }; 

3、运行结果

// tcp_svr.cc // tcp_svr.cc #include "../source/server.hpp"  std::unordered_map<uint64_t, PtrConnection> _conns; uint64_t con_id = 0; EventLoop base_loop; LoopThreadPool* loop_pool;  void DestroyConnection(const PtrConnection& ptrc) {     _conns.erase(ptrc->Id()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     conn->Shutdown(); }  void NewAcceptor(int fd) {     con_id++;     PtrConnection conn(new Connection(loop_pool->NextLoop(), con_id, fd));     conn->SetMessageCallback(std::bind(OnMessage, std::placeholders::_1, std::placeholders::_2));     conn->SerServerClosedCallback(std::bind(DestroyConnection, std::placeholders::_1));     conn->SetConnectedCallback(std::bind(OnConnection, std::placeholders::_1));      conn->SetInactiveRelease(5); // 非活跃连接10秒后销毁     conn->Established(); // 就绪初始化     _conns.insert(std::make_pair(con_id, conn));     DEBLOG("New-------------------"); }  // for test int main() {     loop_pool = new LoopThreadPool(&base_loop);     loop_pool->SetThreadCount(2);     loop_pool->Create();     srand(time(NULL)); // 生成一个随机数种子     Acceptor acceptor(&base_loop, 8888);     acceptor.SetAcceptorCallback(std::bind(NewAcceptor, std::placeholders::_1)); // 回调中,获取新连接,为新连接创建Channel并添加监控     acceptor.Listen();     base_loop.Start();     return 0; } 

tcp_cli.cc不变
在这里插入图片描述

十九、TcpServer

用来整合上面十八个操作。

1、设计思路

在这里插入图片描述

2、代码

class TcpServer {     private:         uint64_t _next_id; // 自动增加的ID         int _port; // 端口         int _timeout; // 非活跃连接的存在时长,也就是超过这个非活跃的时间就销毁         bool _enable_inactive_release;// 是否启动非活跃连接释放         EventLoop _baseloop;    // 主线程的EventLoop,如果没有从属线程的话就单线程EventLoop         Acceptor _acceptor;    // 监听套接字的管理对象         LoopThreadPool _pool;   // 从属EventLoop线程池         std::unordered_map<uint64_t, PtrConnection> _conns; // 保存管理所有连接对应的shared_ptr对象         // 回调函数         using ConnectedCallback = std::function<void(const PtrConnection&)>;         using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>;         using ClosedCallback = std::function<void(const PtrConnection&)>;         using AnyEventCallback = std::function<void(const PtrConnection&)>;         using Functor = std::function<void()>;         ConnectedCallback _connected_callback;         MessageCallback _message_callback;         ClosedCallback _closed_callback;         AnyEventCallback _event_callback;     private:         // 线程后再跑,因为不能让信息先进来线程再跑吧         void RunAfterInLoop(const Functor &task, int delay)          {             _next_id++;             _baseloop.TimerAdd(_next_id, delay, task);         }         // 为新连接构造一个Connection进行管理(与我们Connection模块测试的tcp_svr.cc一样)         void NewConnection(int fd)          {             _next_id++;             PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd));             conn->SetMessageCallback(_message_callback);             conn->SetClosedCallback(_closed_callback);             conn->SetConnectedCallback(_connected_callback);             conn->SetAnyCallback(_event_callback);             conn->SerServerClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1));             if (_enable_inactive_release) conn->SetInactiveRelease(_timeout); // 启动非活跃超时销毁             conn->Established();             _conns.insert(std::make_pair(_next_id, conn));         }         // 将连接从线程池中移除出来         void RemoveConnectionInLoop(const PtrConnection &conn)          {             int id = conn->Id();             auto it = _conns.find(id);             if (it != _conns.end())              {                 _conns.erase(it);             }         }         // 从管理Connection的_conns中移除连接信息         void RemoveConnection(const PtrConnection &conn)          {             _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn));         }     public:         // 构造函数         TcpServer(int port):             _port(port),              _next_id(0),              _enable_inactive_release(false),              _acceptor(&_baseloop, port),             _pool(&_baseloop)          {             _acceptor.SetAcceptorCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1));             _acceptor.Listen();//将监听套接字挂到baseloop上         }         // 设置线程个数         void SetThreadCount(int count)          {             return _pool.SetThreadCount(count);         }         // 设置四个回调函数         void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; }         void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; }         void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; }         void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; }         // 启动非活跃释放连接         void EnableInactiveRelease(int timeout)          {             _timeout = timeout;             _enable_inactive_release = true;          }         //用于添加一个定时任务         void RunAfter(const Functor &task, int delay)          {             _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay));         }         // 启动         void Start()          {              _pool.Create();               _baseloop.Start();          } }; 

3、运行代码及结果

(1)运行结果1

// server.cc #include "../source/server.hpp"  void OnClosed(const PtrConnection& ptrc) {     DEBLOG("CLOSED CONNECTION:%p", ptrc.get()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     conn->Shutdown(); }  int main() {     TcpServer server(8888);     server.SetThreadCount(2);     server.EnableInactiveRelease(10);     server.SetClosedCallback(OnClosed);     server.SetConnectedCallback(OnConnection);     server.SetMessageCallback(OnMessage);     server.Start();     return 0; } 

tcp_cli.cc不变
在这里插入图片描述

(2)运行结果2

// server.cc #include "../source/server.hpp"  void OnClosed(const PtrConnection& ptrc) {     DEBLOG("CLOSED CONNECTION:%p", ptrc.get()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     //conn->Shutdown(); }  int main() {     TcpServer server(8888);     server.SetThreadCount(2);     server.EnableInactiveRelease(10);     server.SetClosedCallback(OnClosed);     server.SetConnectedCallback(OnConnection);     server.SetMessageCallback(OnMessage);     server.Start();     return 0; } 

在这里插入图片描述

(3)运行结果3

// server.cc #include "../source/server.hpp"  void OnClosed(const PtrConnection& ptrc) {     DEBLOG("CLOSED CONNECTION:%p", ptrc.get()); } void OnConnection(const PtrConnection& ptrc) {     DEBLOG("NEW CONNECTION:%p", ptrc.get()); } void OnMessage(const PtrConnection& conn, Buffer* buff) {     DEBLOG("%s", buff->ReadPos());     buff->ReadOffset(buff->ReadAbleSize());     std::string str = "hello linux";     conn->Send(str.c_str(), str.size());     //conn->Shutdown(); }  int main() {     TcpServer server(8888);     server.SetThreadCount(2);     //server.EnableInactiveRelease(10);     server.SetClosedCallback(OnClosed);     server.SetConnectedCallback(OnConnection);     server.SetMessageCallback(OnMessage);     server.Start();     return 0; } 

在这里插入图片描述

二十、NetWork

class NetWork  {     public:         NetWork()          {             DEBLOG("SIGPIPE INIT");             signal(SIGPIPE, SIG_IGN);         } }; static NetWork nw; 

二十一、EchoServer

1、直接上代码(TcpServer二次封装)

// echo.hpp #include "../server.hpp" class EchoServer {     private:         TcpServer _server;     private:         void OnClosed(const PtrConnection& ptrc)         {             DEBLOG("CLOSED CONNECTION:%p", ptrc.get());         }         void OnConnection(const PtrConnection& ptrc)         {             DEBLOG("NEW CONNECTION:%p", ptrc.get());         }         void OnMessage(const PtrConnection& conn, Buffer* buff)         {             conn->Send(buff->ReadPos(), buff->ReadAbleSize());             buff->ReadOffset(buff->ReadAbleSize());             conn->Shutdown();         }     public:         EchoServer(int port)             : _server(port)         {             _server.SetThreadCount(2);             _server.EnableInactiveRelease(10);             _server.SetClosedCallback(std::bind(&EchoServer::OnClosed, this, std::placeholders::_1));             _server.SetConnectedCallback(std::bind(&EchoServer::OnConnection, this, std::placeholders::_1));             _server.SetMessageCallback(std::bind(&EchoServer::OnMessage, this, std::placeholders::_1, std::placeholders::_2));             _server.Start();         }         void Start()         {             _server.Start();         } }; 
//main.cc #include "echo.hpp"  int main() {     EchoServer server(8888);     server.Start();     return 0; } 

2、测试结果

在这里插入图片描述

3、简单的EchoServer压力测试

(1)找测试文件

github上找一个叫WebBench的项目并压缩下来
在这里插入图片描述

(2)unzip

在这里插入图片描述

(3)make

在这里插入图片描述

(4)./webbench

在这里插入图片描述

(5)./webbench -c 500 -t 60 http://127.0.0.1:8888/hello

在这里插入图片描述

(6)跑完以后的结果:

在这里插入图片描述

二十二、EchoServer关系图

在这里插入图片描述

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!