文章目录
前言
在上一篇文章当中对本项目的框架做了一个整体的介绍(感兴趣的读者可从本专栏中进行查看),不过并没有进行实现和详细地说明,那么本篇文章将从代码的角度呈现一个更为完整的服务器模块,同时结合上篇文章对服务器模块有一个更加深刻的理解。话不多说,不过还是要说明一点,本项目是一个基于C++代码实现的,如果对C++的基本语法和面向对象的思想,以及系统IO不了解的话,慎入!
代码
说明:如下代码更准确的说应该是伪代码,目的是方便大家阅读,其中省略了一些不必要的代码,因此能看可能不能跑^ _ ^ ,如果需要能跑的可自行在Gitee中进行查看,这样写的目的是避免自己偷懒水博客,不当CV工程师,将思路更好地理顺进而呈现出来^ _ ^。
1. Socket
先交代两个杂七杂八的事情,即信号的设置,所需的头文件。
- 头文件
//套接字相关的定义和函数 #include <sys/types.h> #include <sys/socket.h> //Internet地址族的相关结构 #include <netinet/in.h> //地址转换 #include <arpa/inet.h> //文件描述符的操作 #include <unistd.h> #include <fcntl.h>
- 忽略管道破裂信号,避免服务进程异常退出。
//防止进程异常退出 struct Attribution { Attribution() { //忽略管道破裂信号。 signal(SIGPIPE,SIG_IGN); } }; //在main启动之前进行初始化,类似于饿汉模式。 Attribution sets;
1.1 Socket
Socket子模块,完成简单的网络套接字的创建和关闭。
class Socket { public: Socket(const std::string& ip = "",uint16_t port = -1,int fd = -1) :_ip(ip),_port(port),_fd(fd) {} //创建套接字 void Create() { _fd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP); //AF_INET,使用的ipv4的网络类型。 //SOCK_STREAM,套接字类型,即连接,可靠,全双工,字节流的套接字。 //IPPROTO_TCP,传输层协议,TCP。 } //关闭套接字 void Close(int fd) { if(fd >= 0) { close(fd); } } void Close() { Close(_fd); _fd = -1; } //获取文件描述符 int GetFd() { return _fd; } //设置套接字模式为非阻塞,方便提高IO通信的效率 void SetNoBlock(int fd) { int flag = fcntl(fd,F_GETFL); if(fcntl(fd,F_SETFL,flag | O_NONBLOCK) < 0) { //出错 return; } } //网络序列信息的转换 sockaddr_in MesToInet() { sockaddr_in meg; socklen_t len = sizeof(meg); memset(&meg,0,sizeof(meg)); //网络协议 meg.sin_family = AF_INET; //端口号 meg.sin_port = htons(_port); //ip地址 if(!inet_aton(_ip.c_str(),&(meg.sin_addr))) { //出错 } return meg; } //网络序列转换为当前主机的字节序列 std::pair<uint16_t,std::string> InetToMes(sockaddr_in meg) { uint16_t port = ntohs(meg.sin_port); std::string ip_str = inet_ntoa(meg.sin_addr); return std::make_pair(port,ip_str); } private: int _fd; uint16_t _port; std::string _ip; };
1.2 SSocket
SSocket子模块,服务端监听以及获取连接。
//默认端口号,根据自己的服务器的开放端口自行设置。 const static int default_port = 8000; struct SSocket : public Socket { public: //服务端启动时要绑定端口 SSocket(const std::string& ip = "0.0.0.0",uint16_t port = default_port) :Socket(ip,port) { Bind(); Listen(); } public: //绑定 void Bind() { //创建套接字。 Create(); //设置端口号复用 ReusePort(); //设置地址复用 ReuseAddr(); //获取网络信息 sockaddr_in sver_msg = MesToInet(); socklen_t len = sizeof(sver_msg); //绑定,0 表示成功。 if(!bind(GetFd(),(sockaddr*)&sver_msg,len)) { //绑定成功 } } //监听 void Listen() { if(!listen(GetFd(),backlog)) { //监听成功 } } //接收连接 int Accept() { //获取客户端的ip地址和端口号。 sockaddr_in cmsg; memset(&cmsg,0,sizeof(cmsg)); socklen_t len = sizeof(cmsg); //获取套接字 int client_fd = accept(GetFd(),(sockaddr*)&cmsg,&len); return client_fd; } private: void ReusePort() { int opt = 1; //表示开启。 int ret = setsockopt(GetFd(),SOL_SOCKET,SO_REUSEPORT,&opt,sizeof(opt)); } void ReuseAddr() { int opt = 1; //表示开启。 int ret = setsockopt(GetFd(),SOL_SOCKET,SO_REUSEADDR,&opt,sizeof(opt)); } const static int backlog = 8; };
1.3 CSocket
CSocket子模块,客户端进行连接以及读写数据。
struct CSocket : public Socket { public: //客户端连接服务端锁定唯一一个进程时,需要ip地址和端口号。 CSocket(const std::string& ip,uint16_t port = default_port) :Socket(ip,port) {} //发起连接 bool Connect() { //创建套接字。 Create(); //获取套接字 int fd = GetFd(); //获取网络信息 sockaddr_in sver_msg = MesToInet(); socklen_t len = sizeof(sver_msg); //发起连接 int ret = connect(fd,(sockaddr*)&sver_msg,len); if(-1 == ret) { //连接失败 return false; } return true; } //发送信息 ssize_t Send(int fd,const void* buff,uint64_t len,int flag = 0) { ssize_t n = send(fd,buff,len,flag); if(-1 == n) { if(errno == EINTR || errno == EAGAIN) { //阻断或者无数据可读 //说明:EWOULDBLOCK == EAGAIN; return 0; } } return n; } //接收信息 ssize_t Recv(int fd,void* buff, size_t len,int flag = 0) { ssize_t n = recv(fd,buff,len,flag); if(-1 == n) { if(errno == EINTR || errno == EAGAIN) { //阻断或者无数据可读 //说明:EWOULDBLOCK == EAGAIN; return 0; } return -1; } return n; } //依据于上面两个核心函数实现的函数重载,目的是方便调用,很简单说明一下,,不实现了。。 ssize_t Send(int fd,const std::string& str); ssize_t Send(const std::string& str); ssize_t RecvNoBlock(void* buff,size_t len);//传入flag设置为MSG_DONTWAIT //... size_t Recv(int fd,std::string& str,size_t len); size_t Recv(std::string& str,size_t len); ssize_t SendNoBlock(int fd,void* buff, size_t len);//同上 //... };
SCSocket子模块,服务端接收的客户端的连接以及读写数据。
//服务端接收连接的套接字 struct SCSocket : public Socket { SCSocket(int fd) :Socket(ip,port,fd) {} //发送信息 ssize_t Send(int fd,const void* buff,uint64_t len,int flag = 0) { ssize_t n = send(fd,buff,len,flag); if(-1 == n) { if(errno == EINTR || errno == EAGAIN) { //阻断或者无数据可读 //说明:EWOULDBLOCK == EAGAIN; return 0; } else { return -1; } } return n; } //接收信息 ssize_t Recv(int fd,void* buff, size_t len,int flag = 0) { ssize_t n = recv(fd,buff,len,flag); if(-1 == n) { if(errno == EINTR || errno == EAGAIN) { //阻断或者无数据可读 //说明:EWOULDBLOCK == EAGAIN; return 0; } else { return -1; } } return n; } //依据于上面两个核心函数实现的函数重载,目的是方便调用,很简单说明一下,,不实现了。。 ssize_t Send(int fd,const std::string& str); ssize_t Send(const std::string& str); ssize_t RecvNoBlock(void* buff,size_t len);//传入flag设置为MSG_DONTWAIT //... size_t Recv(int fd,std::string& str,size_t len); size_t Recv(std::string& str,size_t len); ssize_t SendNoBlock(int fd,void* buff, size_t len);//同上 //... };
补充:相关的接口的详细说明可使用man命令进行查看,或者翻博主写的【Linux进阶之路】Socket —— “UDP“ && “TCP“以及【Linux进阶之路】高级IO,电脑浏览器可按下Ctrl + F可快速查阅,因此就不在本文中赘述了。
2. Poller
#pragma once //容器 #include<unordered_map> #include<vector> //智能指针 #include<memory> //文件IO #include<unistd.h> //epoll #include<sys/epoll.h> class Poller { public: Poller() { _efd = epoll_create(DefaultSize); memset(_evs,0,sizeof(_evs)); } //更新 void UpDate(Channel* cel) { if(Is_ADD(cel)) { UpDate(cel,EPOLL_CTL_MOD); } else { //添加到哈希表中 _chs.insert({cel->Fd(),cel->Get()}); UpDate(cel,EPOLL_CTL_ADD); } } //移除 void Remove(Channel* cel) { auto it = _chs.find(cel->Fd()); if(it != _chs.end()) { //将其从epoll中删除 UpDate(cel,EPOLL_CTL_DEL); _chs.erase(it); } } //获取活跃连接 std::vector<std::shared_ptr<Channel>> Wait() { int n = epoll_wait(_efd,_evs,MAX_EPOLLEVENTS,-1); if(-1 == n) { if(errno == EINTR || errno == EPIPE) { //阻断或者非阻塞 return {}; } } std::vector<std::shared_ptr<Channel>> actives; for(int i = 0; i < n; i++) { int fd = _evs[i].data.fd; //就绪的事件 int event = _evs[i].events; if(_chs.count(fd)) { //设置就绪的事件 _chs[fd]->SetRevent(event); actives.push_back(_chs[fd]); } } //右值拷贝 return actives; } private: //直接进行监控并更新操作 void UpDate(Channel* cel,int opt) { struct epoll_event event; //初始化 memset(&event,0,sizeof(event)); event.data.fd = cel->Fd(); event.events = cel->GetMoniter();//要监控的事件 epoll_event* eptr = (opt == EPOLL_CTL_DEL) ? nullptr : &event; //设置进内核的红黑树结构 if(-1 == epoll_ctl(_efd,opt,fd,eptr)) { //错误 } } //检测是否被添加 bool Is_ADD(Channel* cel) { return _chs.count(cel->Fd()); } private: enum { MAX_EPOLLEVENTS = 1024, DefaultSize = 1 }; int _efd; struct epoll_event _evs[MAX_EPOLLEVENTS]; using cel_t = std::shared_ptr<Channel>; std::unordered_map<int,cel_t> _chs; };
说明:在博主的此篇文章中实现过类似的——【Linux进阶之路】高级IO,接口有着详细介绍,此处是一个更加完善,更加贴合实际的版本。
3. Channel
- 说明: Channel在实际使用的时候一般来说是在堆上开辟的,即new出一个Channel对象,因此使用shared_ptr进行管理,但是在使用的过程无法通过原始指针获取到shared_ptr对象,进而再度构造,导致段错误的现象,为了避免这种现象C++使用enable_shared_from_this模版来避免这个问题,原理是内部保存一个weak_ptr,下面给出一段样例代码,方便大家理解。
#include <iostream> #include <memory> class MyClass : public std::enable_shared_from_this<MyClass> { public: void show() { std::cout << "MyClass instance at " << this << std::endl; } std::shared_ptr<MyClass> getSharedPtr() { return shared_from_this(); } }; int main() { std::shared_ptr<MyClass> sptr1(new MyClass); std::shared_ptr<MyClass> sptr2(sptr1.get());//error std::shared_ptr<MyClass> sptr3 = sptr1->getSharedPtr(); std::cout << sptr2.use_count() << std::endl;//1 std::cout << sptr3.use_count() << std::endl;//2 //最后会出现Segmentation fault (core dumped)现象,因为sptr2进行构造导致引用计数不正确,最终导致资源多释放了一回。 return 0; }
#pragma once //bind #include<functional> //智能指针 #include<memory> class EventLoop; class Channel:public std::enable_shared_from_this<Channel> { using evt_cb_t = std::function<void()>; public: //构造 Channel(EventLoop* loop,int fd):_loop(loop),_fd(fd),_revents(0),_mevents(0) {} //获取智能指针 std::shared_ptr<Channel> Get() { return shared_from_this(); } //设置回调 void SetRead(const evt_cb_t& rd){_rd = rd;} void SetWrite(const evt_cb_t& wt){_wt = wt;} void SetError(const evt_cb_t& err){_er = err;} void SetClose(const evt_cb_t& clo){_clo = clo;} void SetNor(const evt_cb_t& nor){_nor = nor;} //清理回调与监控信息 void Clear() { _rd = _wt = _clo = _er = _nor = nullptr; _mevents = _revents = 0; _fd = -1; } //取消 void RemoveRead(){_rd = nullptr;} void RemoveWrite(){_wt = nullptr;} //处理 void Handle() { //算术 位运算 关系 按位 逻辑 赋值 //异常 if(_er != nullptr && (_revents & EPOLLERR)) { _er(); } //连接关闭 else if(_clo != nullptr && (_revents & EPOLLHUP)) { _clo(); } else { //读 if(_rd != nullptr && (IsRead()) && ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI))) { //连接活跃,更新活跃时间 Flush(); _rd(); } //写 else if(_wt != nullptr && (IsWrite()) &&(_revents & EPOLLOUT)) { _wt(); } } } //打开监控,epoll内核结构也要进行打开 void OpenWrite(){_mevents |= (EPOLLOUT),Update(); } void OpenRead(){_mevents |= EPOLLIN,Update(); } //关闭,epoll内核结构也要进行关闭。 void CloseWrite() { _mevents &= (~EPOLLOUT),Update(); } void CloseRead() { _mevents &= ~EPOLLIN,Update(); } void CloseAll(){_mevents = 0,Update()} //检测 bool IsRead(){return _mevents & EPOLLIN;} bool IsWrite(){return _mevents & EPOLLOUT;} //描述符 int Fd(){return _fd;}; //获取监控事件 int GetMoniter(){return _mevents;} //设置已经就绪的事件 void SetRevent(uint32_t revs){_revents = revs;} //更新内核结构的事件结构:EventLoop -> Poller -> Update / Remove void Update(); void Remove(); private: //刷新一下事件 bool Flush() { if(_nor != nullptr) _nor(); return true; } EventLoop* _loop; //绑定EventLoop即绑定线程。 int _fd; uint32_t _mevents; //预期监控事件 uint32_t _revents; //实际就绪事件 //回调函数 evt_cb_t _rd; //读 evt_cb_t _wt; //写 evt_cb_t _er; //错误 evt_cb_t _clo;//关闭 evt_cb_t _nor;//任意事件 };
4. Acceptor
#pragma once //bind #include<functional> //智能指针 #include<memory> class Acceptor { using apt_cb = std::function<void(int)>; //连接处理回调 public: //port:8000是博主的服务器设置的默认端口,自己还需查看开放端口进行设置。 Acceptor(EventLoop* loop,int port = 8000) :_loop(loop),_lskt("0.0.0.0",port) ,_cel(new Channel(loop,_lskt.GetFd())) { //绑定读事件回调 _cel->SetRead(std::bind(&Acceptor::ReadHander,this)); } //设置连接处理函数 void SetAcceptCb(const apt_cb& aptcb) { _accept_cb = aptcb; //只有连接处理回调设置了才能打开读事件,否则会导致连接没有被正常管理,进而连接异常。 _cel->OpenRead(); } private: //获取与处理连接 void ReadHander() { int cfd = _lskt.Accept(); if(_accept_cb) _accept_cb(cfd); } private: EventLoop* _loop; SSocket _lskt;//服务器进行监听的套接字 std::shared_ptr<Channel> _cel; apt_cb _accept_cb = nullptr; };
5. Time
5.1 TimerTask
定时任务模块,即完成对任务的执行,取消,资源的清理功能,是一个由shared_ptr管理的资源对象。
using TaskFunc = std::function<void()>; using DelFunc = std::function<void()>; class TimerTask { public: TimerTask(uint64_t id,uint32_t delay,TaskFunc cb) :_id(id),_timeout(delay),_cancel(true),_task_cb(cb) {} void SetDel(DelFunc cb) { _del_cb = cb; } void Cancel() { _cancel = false; } uint32_t GetDelay() { return _timeout; } ~TimerTask() { if(true == _cancel) { _task_cb(); } _del_cb(); } private: uint64_t _id; //设置id方便进行查找 uint32_t _timeout;//设置超时时间 bool _cancel;//是否取消定时任务 TaskFunc _task_cb;//任务执行的回调 DelFunc _del_cb;//资源释放的回调,从TimerWheel的结构中移除对应的信息。 };
5.1 TimerWheel
时间轮模块,即实现定时功能,进而完成对任务的添加,刷新,取消。
- 技术核心:时间轮 + 智能指针,即vector<shared_ptr<TimerTask>>。
- 原理:在本项目中时间轮设置为60秒,即一个容量为60的vector,当任务活跃时重新计算定时任务在vector中的位置,运行过程中每一秒种定时刷新一次时间轮,同时清空里面的定时任务,最终当shared_ptr的引用计数为0时,调用TimerTask的析构函数,执行定时任务。
- 系统调用接口:timerfd,获取当前距离上一次刷新的秒数。
#include <sys/timerfd.h> int timerfd_create(int clockid, int flags); /* 参数: 1.设置的时间标准,例如CLOCK_REALTIME为实时时钟,易收到系统时间更改的影响,CLOCK_MONOTONIC为单调时钟,不易受到系统时间更改的影响 比较适合测量时间的间隔,还有CLOCK_PROCESS_CPUTIME_ID,CLOCK_THREAD_CPUTIME_ID分别用于测量进程和线程的CPU时间。 2.设置属性,一般设置为0。 返回值: 1.成功返回创建的文件描述符timer_fd。 2.失败返回-1,并设置合适的错误码。 */ int timerfd_settime(int fd, int flags,const struct itimerspec *new_value,struct itimerspec *old_value); /* 参数: 1.创建的timer_fd 2.设置属性,一般设置为0。 3.输入型参数,用于设置间隔时间对象。 4.输出型参数,用于获取上次设置的间隔时间对象。 返回值: 1.成功返回创建的文件描述符。 2.失败返回-1,并设置合适的错误码。 */
- 拓展:本项目还可以设置一个24小时60分钟60秒的时间轮,思路一致,感兴趣的可自行思考实现或者与博主私信交流。
//整形相关的类型 #include<cstdint> //bind #include<functional> //智能指针 #include<memory> //容器 #include<vector> #include<unordered_map> //文件描述符 #include<unistd.h> #include<sys/timerfd.h> //timerfd class EventLoop; //根据TimerTask任务,封装一个时间轮 class TimerWheel { public: TimerWheel(EventLoop *loop):_capacity(60),_tick(0),_wheel(_capacity),_tfd(CreateTFd()),_loop(loop),_tcel(new Channel(_loop,_tfd)) { _tcel->SetRead(std::bind(&TimerWheel::OnTime,this)); _tcel->OpenRead(); } //说明: //1.任务统一是在线程中运行的,为了防止多线程对连接操作导致的线程安全,放到EventLoop中串行化执行可避免此问题。 //2.在实际执行的就绪任务队列时可能由于前面的任务没有及时地执行导致刷新不及时或者没有释放,因此解决此类问题可将时间轮放到与接收连接类似执行 // 轻量化任务的线程中或者自己再创建一个线程专门执行此任务。 //添加任务 void TimerAdd(uint64_t id,uint32_t delay,TaskFunc cb); //取消任务 void CancelTask(uint64_t id); //刷新任务 void TimerRefresh(uint64_t id); //判断任务 bool HasTimer(uint64_t id){return _hash.count(id);} private: //InLoop系列主要是将任务放到指定的线程的任务池中,方便其执行。 //添加 void TimerAddInLoop(uint64_t id,uint32_t delay,TaskFunc cb) { //将任务new出来赋值给 TimerPtr TimerPtr task(new TimerTask(id,delay,cb)); //设置资源释放函数 task->SetDel(std::bind(&TimerWheel::SourceClear,this,id)); //将任务添加到哈希表中 _hash.insert({id,task}); //将任务添加到tick指向的数组当中 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(task); } //取消 void CancelTaskInLoop(uint64_t id) { auto it = _hash.find(id); if(it != _hash.end()) { TimerPtr task = it->second.lock(); if(task.get() != nullptr) { task->Cancel(); } else { _hash.erase(id); } } } //刷新 void TimerRefreshInLoop(uint64_t id) { auto it = _hash.find(id); if(it != _hash.end()) { //获取对象并进行拷贝 TimerPtr task = it->second.lock(); int pos = (task->GetDelay() + _tick) % _capacity; _wheel[pos].push_back(task); return; } } //资源清理 void SourceClear(u_int64_t id) { auto it = _hash.find(id); if(it == _hash.end()) { //说明不存在 return; } else { _hash.erase(it); } } //timerfd系列 //创建timerfd static int CreateTFd() { int tfd = timerfd_create(CLOCK_MONOTONIC,0); //计时标准为相对时间 //CLOCK_REALTIME if(tfd < 0) { //创建失败 } struct itimerspec new_time; memset(&new_time, 0, sizeof(new_time)); //到期时间为1s后 new_time.it_value.tv_sec = 1; //每次时间间隔为1s new_time.it_interval.tv_sec = 1; int ret = timerfd_settime(tfd,0,&new_time,nullptr); return tfd; } //获取到期次数 int TimerRead() { uint64_t val = 0; int ret = read(_tfd,&val,sizeof(val)); return val; } //更新表盘 void TimerUpdate() { _tick = (_tick + 1) % _capacity; _wheel[_tick].clear();//清理调用析构的同时,执行任务或者将引用计数减1。 } //定时刷新 void OnTime() { //可能没有及时地刷新,导致times不为1 int times = TimerRead(); for(int i = 0; i < times; i++) { TimerUpdate(); } } using TimerPtr = std::shared_ptr<TimerTask>; using ValPtr = std::weak_ptr<TimerTask>; uint32_t _capacity; //设置时间轮的最大超时时间,也就是表盘。 uint32_t _tick; //秒针的实时走向 std::vector<std::vector<TimerPtr>> _wheel; //设置为秒级的时间轮 //因为要维护一个非活跃连接的更新,所以要建立一个哈希表方便进行查找 std::unordered_map<uint64_t,ValPtr> _hash; int _tfd; //timerfd,便于计时从而定时更新。 EventLoop* _loop;//回调指针,所有的定时任务都是要在EventLoop下进行的。 std::shared_ptr<Channel> _tcel;//timewheel任务对象 };
6. Buffer
思路图解
- 读偏移向左移动时,即数据写入Buffer时空间不够,将可读数据拷贝到开头,即紫色部分,然后扩容至所需大小;读偏移向右移动时,即从数据Buffer中读取出来,需注意读偏移不能超过写偏移的位置。
- 写偏移向左移动与读偏移向左移动向左移动的情况相同,写偏移向有移动,即向Buffer写入数据。
涉及接口
- copy
#include <algorithm> template <class InputIt, class OutputIt> OutputIt copy(InputIt first, InputIt last, OutputIt d_first); //将[first,last)位置的数据拷贝到以d_first为起始位置的地方,一般来说迭代器类型为char*,即拷贝字符串或者按字节拷贝。
- memchr
#include <string.h> void *memchr(const void *s, int c, size_t n); //在以s位置为起点向后查找长度为n,看是否存在字符c。
//读写 #include<unistd.h> class Buffer { public: //构造 Buffer():_rd_idx(0),_wd_idx(0){} //读 void Read(void* buffer,uint64_t len) { _Read(buffer,len); MoveReadIdx(len); } //读len长度的string std::string Read(uint64_t len) { std::string str; str.resize(len); Read(&str[0],len); return str; } //读一行 std::string GetLine() { char* crlf = (char*)memchr(begin() + _rd_idx,'\n',ReadableSize()); if(nullptr == crlf) return ""; uint64_t len = crlf - (begin() + CurRpos()) + (uint64_t)1; std::string res = Read(len); return res; } //所有数据读出 std::string Read() { return Read(ReadableSize()); } //移动读偏移 void MoveReadIdx(uint64_t len) { if(len <= ReadableSize()) { _rd_idx += len; } } //写 void Write(const void *buffer,uint64_t len) { _Write(buffer,len); MoveWriteIdx(len); } void Write(const std::string& str) { Write(&str[0],str.size()); } //移动写偏移 void MoveWriteIdx(uint64_t len) { if(len <= TailLdleSize()) { _wd_idx += len; } } //获取到可读位置 char *rdpos(){return &_buff[CurRpos()];} //获取可读大小 uint64_t ReadableSize(){return _wd_idx - _rd_idx;} uint64_t Size(){return ReadableSize();} //复位 void Clear(){_wd_idx = _rd_idx = 0;} //检测是否为空 bool Empty(){return 0 == ReadableSize();} private: void _Read(void* buffer,uint64_t len) { if(len <= ReadableSize()) { //无法检测传入的指针是否可靠。 char* beg = begin() + _rd_idx; char* end = beg + len; if(buffer) { std::copy(beg,end,(char*)buffer); } } } void _Write(const void* buffer,uint64_t len) { //确保有足够的位置写 EnsureWriteSpace(len); const char* beg = static_cast<const char*>(buffer); const char* end = beg + len; std::copy(beg,end,begin() + CurWpos()); } //获取读写位置 uint64_t CurRpos(){return _rd_idx;} uint64_t CurWpos(){return _wd_idx;} //获取空闲位置 uint64_t TailLdleSize(){return _buff.size() - _wd_idx;} uint64_t HeadLdleSize(){return _rd_idx;} //扩容,保证有足够的长度放进去数据 void EnsureWriteSpace(uint64_t len) { //空闲空间足够 if(len <= TailLdleSize() + HeadLdleSize()) { //移动 int sz = ReadableSize(); std::copy(_buff.begin() + _rd_idx,_buff.begin() + _wd_idx, begin()); _rd_idx = 0; _wd_idx = sz; } else { _buff.resize(_wd_idx + len); } } char* begin(){return &_buff[0];} private: std::vector<char> _buff; //缓存区 uint64_t _rd_idx; //读偏移 uint64_t _wd_idx; //写偏移 };
7. Any
class Any final { public: //默认构造 Any():_cot(nullptr){} //拷贝构造 Any(const Any& val):_cot(nullptr == val._cot ? nullptr : val._cot->clone()){} //模版构造 template<class T> Any(const T& val):_cot(new placeholer<T>(val)){} //析构 ~Any(){delete _cot;}; //赋值 Any& operator =(const Any& val) { Any(val).swap(*this); return *this; } //模版赋值 template<class T> Any& operator=(const T& val) { Any(val).swap(*this); return *this; } //交换 Any& swap(Any& val) { std::swap(_cot,val._cot); return *this; } //取值函数 template<class T> T* get() { if(typeid(T) != _cot->get_type()) { //错误 } using holer_ptr = placeholer<T>*; holer_ptr to_child = (holer_ptr)(_cot); if(nullptr == to_child) { //错误 } return &(to_child->_val); } private: //父类,接口继承,多态封装 struct holder { //析构函数,虚函数 virtual ~holder(){}; //克隆函数,此处为纯虚函数,目的是子类强制重写 virtual holder* clone() = 0; //获取类型,方便之后的检测 virtual const std::type_info& get_type() = 0; }; //子类,模版实现 template<class T> struct placeholer : public holder { placeholer(const T& val):_val(val){} //重写函数 holder* clone(){return new placeholer(_val);}; const std::type_info& get_type(){return typeid(T);}; T _val; }; //父类指针,指向 holder * _cot; };
- Any类在本项目的用处为当做应用层协议的上下文和进行丝滑地切换应用层协议。
- 实现的核心思想为多态,模版,封装类,即父类为抽象类,模版的子类强制重写父类的接口,在Any类中使用父类指针成员进行封装通过向下转换完成对任意成员的赋值,拷贝,获取功能。
说明:Any类的实现是参考C++17中的any类进行实现的,感兴趣可自行查看文档——https://c-cpp.com/cpp/utility/any.html
8. Connection
//bind #include<functional> //智能指针 #include<memory> //定义状态基,用于判断连接状况,分别为连接关闭,连接待关闭,连接待建立,连接已建立。 typedef enum{DISCONNECTED,DISCONNECTING,CONNECTING,CONNECTED}ConSta; class Connection; //管理连接的智能指针类型 using ConPtr = std::shared_ptr<Connection>; //连接初始化 using con_cb_t = std::function<void(const ConPtr&)>; //消息处理 using msg_cb_t = std::function<void(const ConPtr&,Buffer*)>; //关闭连接 using clo_cb_t = std::function<void(const ConPtr&)>; //记录上下文处理过程 using txt_cb_t = std::function<void(const ConPtr&)>; //服务器组件清理 using svr_cb_t = std::function<void(const ConPtr&)>; //任意事件回调 using evt_cb_t = std::function<void(const ConPtr&)>; class Connection : public std::enable_shared_from_this<Connection> { public: //获取信息接口,比如文件描述符,连接ID,获取上下文,判断连接状态 int Fd(){return _fd;} int Id(){return _conid;} Any* GetTxt(){return &_text;} bool IsConnect(){return _const == CONNECTED;} //回调函数的设置 void SetMsgCb(msg_cb_t msg){_msg = msg;} void SetConCb(con_cb_t con){_con = con;} void SetTxtCb(txt_cb_t txt){_txt = txt;} void SetCloCb(clo_cb_t clo){_clo = clo;} void SetEvtCb(evt_cb_t evt){_evt = evt;} void SetSvrCb(svr_cb_t svr){_svr_clo = svr;} //设置上下文 void SetAnyCb(const Any& text) { _text = text; } //构造函数 Connection(EventLoop* loop,int id,int sockfd):_conid(id),_timid(id)\ ,_fd(sockfd),_const(CONNECTING),_loop(loop),_cel(new Channel(loop,sockfd)),_skt(sockfd) { //设置读,写,关闭,错误,任意事件,但是不打开因为连接还未初始化完毕 _cel->SetRead(std::bind(&Connection::ReadHder,this)); _cel->SetWrite(std::bind(&Connection::WriteHder,this)); _cel->SetClose(std::bind(&Connection::CloseHder,this)); _cel->SetError(std::bind(&Connection::ErrorHder,this)); _cel->SetNor(std::bind(&Connection::EventHder,this)); } private: //事件就绪自动执行,外部不可见。 //Channel类绑定的事件回调,要绑定到Channel内部,比如读,写,关闭,错误,任意事件。 void ReadHder() { char buff[65536] = {0}; int ret = _skt.RecvNoBlock(buff,sizeof(buff) - 1); if(ret < 0) { //进行连接关闭的预处理工作 return ShutDown(); } else if(ret > 0) { //读取到缓存区中 _inbuf.Write(buff,ret); //调用信息处理函数 ConPtr con = shared_from_this(); _msg(con,&_inbuf); } } void WriteHder() { int ret = _skt.SendNoBlock(_outbuf.rdpos(),_outbuf.ReadableSize()); if(ret < 0) { if(_inbuf.ReadableSize() > 0) { //消息处理回调,由程序员自行绑定设置 _msg(shared_from_this(),&_inbuf); } return ShutDown(); } //注意:将读偏移进行移动。 _outbuf.MoveReadIdx(ret); //防止写busy提高效率。 if(0 == _outbuf.ReadableSize()) { _cel->CloseWrite(); } //连接处于关闭状态且输出缓存区的读大小为0,则进行释放 if(_const == DISCONNECTING && _outbuf.ReadableSize() == 0){return Realease();} } void EventHder() { //可能会刷新非活跃连接 if(_entimrse == true) { _loop->TimerRefresh(_timid); } if(_evt)_evt(shared_from_this()); } void ErrorHder() { CloseHder(); } void CloseHder() { //连接处理完毕进行关闭 ShutDown(); } public: //数据发送 void Send(void* buff,int len) { _loop->Run(std::bind(&Connection::SendInLoop,this,buff,len)); } void Send(std::string msg) { Send((void*)msg.c_str(),msg.size()); } //连接建立初始化,由连接的监听线程完成。 void Establish() { if(_const == CONNECTING) { if(_con != nullptr) { _con(shared_from_this()); } //连接建立之后才能进行读事件监控 _const = CONNECTED; _cel->OpenRead(); } } //连接关闭的预处理行为 void ShutDown() { _loop->Run(std::bind(&Connection::ShutDownInLoop,this)); } //释放连接 void Realease() { _loop->Run(std::bind(&Connection::RealeaseInLoop,this)); } //用户切换应用层的处理函数。 void SwitchTrs(con_cb_t con,msg_cb_t msg, clo_cb_t clo,txt_cb_t txt) { _loop->AssertIsIn(); _loop->Run(std::bind(&Connection::SwitchTrsInLoop,this,con,msg,clo,txt)); } //非活跃连接的关闭和启动 void CloTimRse() { _loop->Run(std::bind(&Connection::CloTimRseInLoop,this)); } void TimoutRse(int sec) { _loop->Run(std::bind(&Connection::TimoutRseInLoop,this,sec)); } private: //内部执行,外部不可见,在EventLoop绑定的线程中执行的函数,防止执行流错乱,任务最终都会被Push到指定线程池中运行。 //将数据推送到应用层缓冲区 void SendInLoop(void* bufer,int len) { if(_const == CONNECTED) { _outbuf.Write(bufer,len); if(_cel->IsWrite() == false) { _cel->OpenWrite(); } } } //连接关闭 void ShutDownInLoop() { //ReadHder与WriteHder,CloseHder调用此函数,因此连接必须处于就绪状态。 if(_const != CONNECTED) { return; } //将消息处理完毕 if(_inbuf.ReadableSize() > 0 && _msg) { _msg(shared_from_this(),&_inbuf); } if(_outbuf.ReadableSize() > 0 && false == _cel->IsWrite()) { _const = DISCONNECTING; _cel->OpenWrite(); //WriteHder处理完调用Realease } //消息处理完毕之后调用Realease else { _const = DISCONNECTING; Realease(); } } //连接释放 void RealeaseInLoop() { //此函数由ShutDownInLoop,WriteHder调用,因此必须处于连接的待释放状态 if(_const != DISCONNECTING) return; ConPtr con = shared_from_this(); //用户的关闭连接对于信息处理的回调 if(_clo) { _clo(con);//此项目中最终设置的clo回调为空。 } //服务器组件内部信息清理 if(_svr_clo) _svr_clo(con); //非活跃关闭 if(_entimrse) CloTimRse(); //内部信息处理 _cel->Remove();//移除Poller内核结构 _cel->Clear();//清理属性信息。 _skt.Close();//关闭套接字 _const = DISCONNECTED; } //用于切换应用层协议 void SwitchTrsInLoop(con_cb_t con,msg_cb_t msg, clo_cb_t clo,txt_cb_t txt) { _con = con; _msg = msg; _clo = clo; _txt = txt; } //非活跃连接的关闭和启动 void CloTimRseInLoop() { if(_entimrse) { _entimrse = false; _loop->TimerCancelTask(_timid); } } void TimoutRseInLoop(int sec) { _entimrse = true; if(_loop->HasTimer(_timid)) { _loop->TimerRefresh(_timid); } else { _loop->TimerAdd(_timid,sec,std::bind(&Connection::CloseHder,this)); } } private: int _conid;//连接ID int _timid;//定时任务ID与连接ID一致。 int _fd; bool _entimrse;//是否打开非活跃连接的释放销毁 ConSta _const;//连接状态 //封装类 SCSocket _skt; //服务端接收的客户端的信息 Any _text;//上下文,用于切换应用层协议。 EventLoop* _loop;//线程绑定 std::shared_ptr<Channel> _cel; //Channel对象用于管理事件监控信息。 //应用层的输入输出缓存区 Buffer _inbuf; Buffer _outbuf; //连接的处理方法 con_cb_t _con = nullptr;//连接的初始化 msg_cb_t _msg = nullptr;//消息处理 clo_cb_t _clo = nullptr;//应用层对连接的关闭处理 txt_cb_t _txt = nullptr; //记录协议上下文的处理过程。 svr_cb_t _svr_clo = nullptr; //清理组件内关于连接的信息。 evt_cb_t _evt = nullptr;//对标Channel中的_nor };
说明:
- Connection的回调是用户自行设置的,面向上层,内部Channel的回调是通过Connection内部的接口设置的,面向下层。
- 每个连接都有自己的应用层协议,即对应的回调函数,相应协议的上下文,可随时进行切换。
- 上层连接需要及时地将内核的数据读取,需要接收缓存区;当发送数据内核的缓存区满时,剩下的数据需要存储,需要发送缓冲区。
- 用户调用的发送接口并不是真正的发送,而是将数据写入到发送缓存区,打开读事件监控,通过下层自动发送。
- 事件循环,即Loop与线程绑定,Connection内部存放EventLoop*指针绑定线程,也就是说Connection是与线程绑定的,因此当执行函数时需要将任务放到绑定线程的任务池中,而不能让其它的线程去执行,因此设置Inloop系列的函数。
9. Loop
9.1 EventLoop
说明:
- eventfd,文件描述符,通常用来记录事件发生的次数,此项目用来缓解线程阻塞等待问题。
- 相应的系统调用接口如下。
#include <sys/eventfd.h> int eventfd(unsigned int initval, int flags); /* 参数: 1.初始值,此项目中设置为1,其它项目可按场景设置。 2.文件描述符属性设置,如EFD_CLOEXEC,执行 exec 调用时,子进程对应fd应自动关闭。 EFD_NONBLOCK,fd设置为非阻塞模式 返回值: 1.成功,返回创建的fd。 2.失败,返回-1,设置errno。 */
//bind #include<functional> //容器 #include<vector> //线程 #include<thread> //锁 #include<mutex> //智能指针 #include<memory> //eventfd #include<sys/eventfd.h> //事件处理模块,一个线程一个EventLoop class EventLoop { using Handler = std::function<void()>; public: EventLoop() :_tid(std::this_thread::get_id()) ,_evfd(EventFdCreate()) ,_ev_cl(new Channel(this,_evfd)) ,_twl(this) { //设置读回调,并打开监控。 _ev_cl->SetRead(std::bind(&EventLoop::EventRead,this)); _ev_cl->OpenRead(); } void Start() { while(1) { //1.执行监控(wait) std::vector<std::shared_ptr<Channel>> alreadys = _poller.Wait(); //2.就绪事件处理(获取IO,处理新到来的任务) for(auto& handler : alreadys) Push(std::bind(&Channel::Handle,handler.get())); //3.执行任务(先处理队列中的任务) RunAll(); } } //线程与任务池相关的函数 void Push(const Handler& hder)//将执行对象压入到任务池中 { //{}锁定临界区,同时说明RAII锁的作用范围 { std::unique_lock<std::mutex> lock(_mtx); _tasks.push_back(hder); } //写入一次,触发事件,然后wait就会马上就绪,往下执行,运行任务。 EventWeak(); } void RunAll()//运行任务池中所有的对象 { std::vector<Handler> tsks; { std::unique_lock<std::mutex> lock(_mtx); tsks.swap(_tasks); } for(auto& t : tsks) { t(); } } void Run(const Handler& hder)//执行传入的线程处理对象或者将其放入到线程中 { if(IsIn()) { hder(); } else { Push(hder); } } bool IsIn()//判断当前线程与预期线程是否一致 { return std::this_thread::get_id() == _tid; } //Poller接口的封装 void UpDate(Channel* cel)//更新监控对象 { _poller.UpDate(cel); } void ReMove(Channel* cel)//移除监控对象 { _poller.Remove(cel); } //TimerWheel接口的封装 void TimerAdd(uint64_t id,uint32_t delay,TaskFunc cb)//添加定时任务 { _twl.TimerAdd(id,delay,cb); } void TimerRefresh(uint64_t id)//刷新定时任务 { _twl.TimerRefresh(id); } void TimerCancelTask(uint64_t id)//取消定时任务 { _twl.CancelTask(id); } bool HasTimer(uint64_t id)//判断定时任务 { return _twl.HasTimer(id); } private: //eventfd的实现接口 static int EventFdCreate() { int cnt = 0; int efd = eventfd(cnt,EFD_CLOEXEC | EFD_NONBLOCK); //EFD_CLOEXEC,执行 exec 调用时,子进程对应fd应自动关闭。 //EFD_NONBLOCK,fd设置为非阻塞模式 return efd; } void EventRead() { uint64_t val = 0; //清空读事件 int ret = read(_evfd,&val,sizeof(val)); if(-1 == ret) { if(errno == EINTR || errno == EAGAIN) { return; } } } void EventWeak() { //触发读就绪事件 uint64_t val = 0; int ret = write(_evfd,&val,sizeof(val)); if(-1 == ret) { if(errno == EINTR || errno == EAGAIN) { return; } } } private: int _evfd;//文件描述符管理的一把计数器,用于减缓epoll导致的阻塞。 std::shared_ptr<Channel> _ev_cl;//_evfd的事件管理对象 std::thread::id _tid; //判断运行的线程是否与预期的线程保持一致 std::vector<Handler> _tasks; //任务池,存放任务的执行对象 std::mutex _mtx; //互斥锁保证线程安全。 Poller _poller; //进行对事件的监控。 TimerWheel _twl;//时间轮,管理定时任务 }; //关联实现: //Channel void Channel::Update(){_loop->UpDate(this);} void Channel::Remove(){_loop->ReMove(this);} //TimerWhell void TimerWheel::TimerAdd(uint64_t id,uint32_t delay,TaskFunc cb) { _loop->Run(std::bind(&TimerWheel::TimerAddInLoop,this,id,delay,cb)); } //刷新任务 void TimerWheel::TimerRefresh(uint64_t id) { _loop->Run(std::bind(&TimerWheel::TimerRefreshInLoop,this,id)); } //取消任务 void TimerWheel::CancelTask(uint64_t id) { _loop->Run(std::bind(&TimerWheel::CancelTaskInLoop,this,id)); }
- 事件循环Eventloop主要是通过绑定线程,让线程不断执行任务池中的任务,除此之外此类还完成对Poller,即对事件监控的类和TimerWheel,即管理定时任务的类,两者的整合。
- 通过thread_id使得别的线程进入Eventloop时无法完成对任务的执行,而是将任务推送到任务池当中,且通过互斥锁使得多线程保持互斥,同时在Push到任务池之后,可通过EventWeak使得正在Start中阻塞的线程立马往下执行,及时执行任务池中的任务。
9.2 Thread
#pragma once //条件变量 #include<condition_variable> class LoopThread { public: LoopThread():_loop(nullptr),_thd(std::bind(&LoopThread::ThreadEntry,this)){} //用于实际分配任务,保证同步。 EventLoop* GetLoop() { EventLoop* loop = nullptr; { std::unique_lock<std::mutex> lk(_mtx); _cv.wait(lk,[&](){return _loop != nullptr;}); //为假进入条件变量进行等待。 loop = _loop; } return loop; } private: void ThreadEntry() { EventLoop loop; { std::unique_lock<std::mutex> lk(_mtx); _loop = &loop; } //唤醒所有线程,让其获取锁,进而获取loop的地址。 _cv.notify_all(); loop.Start(); } private: std::mutex _mtx; std::condition_variable _cv; EventLoop* _loop; std::thread _thd; }; class LoopThreadPool { public: //构造 LoopThreadPool(EventLoop* loop):_tcnt(0),_tnxt_id(0),_baseloop(loop) {} //析构 ~LoopThreadPool() { for(int i = 0; i < _tcnt; i++) { delete _ths[i]; } } //初始化设置子线程数量 void Init(int thread_cnt = 0) { _tcnt = thread_cnt; if(_tcnt > 0) { _loops.resize(_tcnt); _ths.resize(_tcnt); for(int i = 0; i < _tcnt; i++) { _ths[i] = new LoopThread(); _loops[i] = _ths[i]->GetLoop(); } } } //用于给连接分配线程,因为EventLoop与线程绑定,因此分配EventLoop,即分配线程。 EventLoop *GetNextLoop() { if(0 == _tcnt) { return _baseloop; } _tnxt_id = (_tnxt_id + 1) % _tcnt; return _loops[_tnxt_id]; } private: int _tcnt; int _tnxt_id; EventLoop * _baseloop; std::vector<EventLoop*> _loops; std::vector<LoopThread*> _ths; };
- LoopThread为一个封装的线程,在调用构造函数时启动,其中EventLoop是在线程执行时初始化进行赋值的,因此其它线程获取EventLoop 必须在线程初始化之后,因此封装了锁和条件变量保证同步和互斥。EventLoop是在线程内部创建的,线程通过EventLoop进而死循环地执行任务池中的任务。
- LoopThreadPool是封装的一个线程池,用于给线程按照轮转的方式分配连接。
10. TcpServer
class TcpServer { public: //初始化和启动 TcpServer(int port = 8000):_nxt_id(0),_tim_out(0),_enrsetim(false) ,_pool(&_baseloop),_aptor(&_baseloop,port) {} void Start(int thread_cnt = 0) { _aptor.SetAcceptCb(std::bind(&TcpServer::ConClient,this,std::placeholders::_1)); _pool.Init(thread_cnt); _baseloop.Start(); } private: //用于Acceptor进行绑定连接处理函数 void ConClient(int cfd) { //将新连接放到Channel进行监控 int newconid = _nxt_id++; ConPtr con(new Connection(_pool.GetNextLoop(),newconid,cfd)); //设置回调函数 con->SetConCb(_con); con->SetCloCb(_clo); con->SetMsgCb(_msg); con->SetEvtCb(_evt); //组件内清理回调 con->SetSvrCb(std::bind(&TcpServer::ConRemove,this,std::placeholders::_1)); con->SetTxtCb(_txt); //启动非活跃销毁 if(true == _enrsetim) con->TimoutRse(_tim_out); //初始化连接 con->Establish(); //插入hash表管理最后进行释放 _cons.insert({newconid,con}); } //移除组件内信息,Connection内部的SetSvrCb绑定,在Realease进行调用。 void ConRemove(const ConPtr& con) { _baseloop.Run(std::bind(&TcpServer::ConRemoveInLoop,this,con)); } void ConRemoveInLoop(const ConPtr& con) { int id = con->Id(); auto it = _cons.find(id); if(it != _cons.end()) { _cons.erase(it); } } public: //定时任务 void EnRseTimout(int tim)//启动非活跃销毁 { _enrsetim = true; _tim_out = tim; } void AddTimer(int delay,const TaskFunc& func)//添加定时任务 { _baseloop.TimerAdd(_nxt_id,delay,func);//TimerAdd保证在同一个线程下串行化运行。 } private: //分配的连接和非活跃id int _nxt_id; //超时处理时间 int _tim_out; bool _enrsetim; //线程 EventLoop _baseloop; LoopThreadPool _pool; //监听套接字 Acceptor _aptor; //连接处理 std::unordered_map<uint64_t,ConPtr> _cons; public: //设置回调函数 void SetAllCb(msg_cb_t msg = nullptr,\ con_cb_t con = nullptr,clo_cb_t clo = nullptr,\ evt_cb_t evt = nullptr,txt_cb_t txt = nullptr) { SetMsgCb(msg);SetConCb(con); SetCloCb(clo);SetEvtCb(evt); SetTxtCb(txt); } void SetMsgCb(msg_cb_t msg){_msg = msg;} void SetConCb(con_cb_t con){_con = con;} void SetTxtCb(txt_cb_t txt){_txt = txt;} void SetCloCb(clo_cb_t clo){_clo = clo;} void SetEvtCb(evt_cb_t evt){_evt = evt;} private: //回调 con_cb_t _con = nullptr; //用户设置的连接初始化回调 msg_cb_t _msg = nullptr; //用户设置的消息处理回调 clo_cb_t _clo = nullptr; //用户设置的关闭事件回调 txt_cb_t _txt = nullptr; //记录协议上下文,本项目的上层协议没有用到。 evt_cb_t _evt = nullptr; //用户设置的任意事件回调 };
TcpServer是一个整合之前所有实现模块的一个综合模块,内部独立实现连接的初始化和移除功能,分别绑定到Acceptor和Connection内部;整合Loop(内部有多线程,定时任务Time模块)进而提供多线程和定时任务的功能;向上还提供设置各种回调函数,本项目中在实现上层的Http协议时只用到了消息处理回调和连接初始化回调,其它应用层可根据需要自行设置。
尾序
在实现地过程中,感觉就像包包子一样,不断地将底层的接口这样"馅",通过封装,继承,多态思想这张"皮",最终呈现出一个完整的"包子",即呈现给上层的对象。对此模块的实现可以更好地体会到多线程的工作方式和执行逻辑,同时可以对系统IO有了更加深刻的理解。希望写这篇文章对各位也能有所收获,我是舜华,期待与你的每一次相遇!
我们下篇再见了!