目录
未完待续......
学习笔记,仅作交流。如有谬误,敬请指正。
1、项目介绍
本项目实现一个基于从属Reactor模式的高性能并发服务器,并且该服务器可以单独作为一个网络库组件,组件使用者可以利用该网络库组件方便地实现各种各样的服务器。
服务器使用到epoll多路转接模型,并且工作在ET模式下。
2、项目部署
操作系统:Ubuntu(只要支持C++11的正则库)
3、项目开发过程
3.1网络库模块开发
3.1.1日志宏
日志信息分级:FATAL(致命错误)、ERROR(一般错误)、WARN(警告)、INFO(一般信息)、DEBUG(调试信息)
server.hpp
#include <stdio.h> #include <stdlib.h> #include <iostream> #define NORMAL 0 // 正常 #define DEBUG 1 // 调试 #define ERROR 2 // 错误 #define LOG_LEVEL DEBUG// 控制输出 #define LOG(level,format,...) do{\ if(level < LOG_LEVEL) break;\ time_t t = time(nullptr);\ struct tm *ltm = localtime(&t);\ char tmp[32] = {0};\ strftime(tmp,sizeof(tmp) - 1,"%H:%M:%S",ltm);\ fprintf(stdout,"[thread:%p]--[%s]--[file:%s|line:%d]=> " format "\n",(void *)pthread_self(),tmp,__FILE__,__LINE__,##__VA_ARGS__);\ }while(0) #define NORMAL_LOG(format,...) LOG(NORMAL,format,##__VA_ARGS__) #define DEBUG_LOG(format,...) LOG(DEBUG,format,##__VA_ARGS__) #define ERROR_LOG(format,...) LOG(ERROR,format,##__VA_ARGS__)
该日志宏使用fprintf,可以将日志输出到文件上。该日志的输出格式为:
[线程地址]–[时:分:秒]–[file:发生日志输出的文件名|line:发生日志输出的行号]=> 输出内容
这段代码的作用在于控制日志的输出,即不符合等级的日志输出统统不输出。
if(level < LOG_LEVEL) break;\
3.1.2Buffer模块
TCP通信的数据都会被放在套接字的缓冲区当中,但是套接字的缓冲区是有大小限制的,尽管开发者可以控制这些缓冲区的大小,但是这样做很没必要。
可以直接在应用层再提供一层缓冲区,这里把它叫做Buffer。。Buffer的作用就是一个处于应用层的缓冲区,它的容量可变,为组件使用者提供一个方便、灵活的缓冲区。
Buffer.hpp
#ifndef BUFFER_HPP #define BUFFER_HPP #include <vector> #include <string> #include <algorithm> #include <cstdint> #define BUFFER_DEFAULT_SIZE 1024 class Buffer { public: Buffer(); char *Begin();//获取缓冲区的起始地址 char *WritePosition();//获取有效数据的结束位置,也就是新数据写入的位置 char *ReadPosition();//有效位置的起始位置,也就是读取数据的起始位置 uint64_t TailFreeSize();/ 获取_writer之后的空闲空间大小 uint64_t HeadFreeSize();// 获取_reader之前的空间空间大小 uint64_t ReadAbleSize();// 获取可读数据大小 void OffsetReader(uint64_t len);// _reader向后移动,说明有数据被读走 void OffsetWriter(uint64_t len);// _writer向后移动,说明有新数据写入 void EnsureWriteSpace(uint64_t len);//确保空间大小足够容纳新数据 void Write(const void *data, uint64_t len);// 向Buffer写入数据 void WriteAndPush(const void *data, uint64_t len);// 向Buffer写入并且造成_wirter偏移 void WriteString(const std::string &data);// 向Buffer写入string对象 void WriteStringAndPush(const std::string &data);// 写入string对象并造成_writer偏移 void WriteBuffer(Buffer &data);// 写入Buffer对象 void WriteBufferAndPush(Buffer &data);// 写入Buffer对象并造成_writer偏移 void Read(void *buf, uint64_t len);// 只能读取有效的数据 void ReadAndPop(void *buf, uint64_t len);// 读取数据并且移动_reader,即从Buffer当中删除数据 std::string ReadAsString(uint64_t len);// 读取len个数据,在该函数内部封装成string对象返回出去 std::string ReadAsStringAndPop(uint64_t len); char *FindEndOfLine();// 寻找一行的结束标志'\n' std::string GetLine();// 获取一行数据 std::string GetLineAndPop(); void Clear(); private: uint67_t _reader;//有效数据的起始位置 uint64_t _writer;//有效数据的结束位置 std::vector<char> _buffer;//使用vector进行空间管理 }; #endif // BUFFER_H
Buffer.cpp
#include "Buffer.hpp" #include <cstdlib> #include <cstring> #include <cstdio> Buffer::Buffer() : _reader(0), _writer(0), _buffer(BUFFER_DEFAULT_SIZE) {}//这个初始化列表的顺序与头文件类中的声明顺序有关,不然会有警告 char *Buffer::Begin() { return &(*(_buffer.begin())); } char *Buffer::WritePosition() { return Begin() + _writer; } char *Buffer::ReadPosition() { return Begin() + _reader; } uint64_t Buffer::TailFreeSize() { return _buffer.size() - _writer; } uint64_t Buffer::HeadFreeSize() { return _reader; } uint64_t Buffer::ReadAbleSize() { return _writer - _reader; } void Buffer::OffsetReader(uint64_t len) { if (len == 0) return; // 最大范围是和_writer处于同一位置,说明Buffer为空;如果超过_writer,就是未定义的行为 if (len > ReadAbleSize()) abort(); _reader += len; } void Buffer::OffsetWriter(uint64_t len) { if (len == 0) return; // 最多移动到当前_buffer的最大容量处,一旦超出就可能造成越界访问 if (len > TailFreeSize()) abort(); _writer += len; } void Buffer::EnsureWriteSpace(uint64_t len) { if (TailFreeSize() >= len) return;//_writer尾部有足够的空间容纳新数据 // _reader之前、_writer之后的空间足够容纳新数据 if (TailFreeSize() + HeadFreeSize() >= len) { uint64_t oldsize = ReadAbleSize();// 保存当前有效数据大小 std::copy(ReadPosition(), ReadPosition() + oldsize, Begin());// 将数据往前挪动 _reader = 0; _writer = oldsize; } else { _buffer.resize(_writer + len); } } void Buffer::Write(const void *data, uint64_t len) { if (len == 0) return; EnsureWriteSpace(len); const char *d = (const char *)data; std::copy(d, d + len, WritePosition());// 将[d,d+len]这段区间的数据拷贝到_writer指向的位置之后 } void Buffer::WriteAndPush(const void *data, uint64_t len) { Write(data, len); OffsetWriter(len); } void Buffer::WriteString(const std::string &data) { Write(data.c_str(), data.size()); } void Buffer::WriteStringAndPush(const std::string &data) { WriteString(data); OffsetWriter(data.size()); } void Buffer::WriteBuffer(Buffer &data) { Write(data.ReadPosition(), data.ReadAbleSize()); } void Buffer::WriteBufferAndPush(Buffer &data) { WriteBuffer(data); OffsetWriter(data.ReadAbleSize()); } void Buffer::Read(void *buf, uint64_t len) { if (len > ReadAbleSize()) abort(); std::copy(ReadPosition(), ReadPosition() + len, (char *)buf); } void Buffer::ReadAndPop(void *buf, uint64_t len) { Read(buf, len); OffsetReader(len); } std::string Buffer::ReadAsString(uint64_t len) { if (len > ReadAbleSize()) abort(); std::string str; str.resize(len); Read(&str[0], len); return str; } std::string Buffer::ReadAsStringAndPop(uint64_t len) { if (len > ReadAbleSize()) abort(); std::string str = ReadAsString(len); OffsetReader(len); return str; //return std::move(str); //std::move是将对象的状态或者所有权从一个对象转移到另一个对象,只是转移,没有内存的搬迁或者内存拷贝。 //在处理大型对象时深拷贝可能非常低效,此时可以用移动语义,即将资源的所有权从一个对象转移给另一个对象,无需进行数据的实际复制 } char *Buffer::FindEndOfLine()// 寻找一行的结束标志'\n' { char *res = (char *)memchr(ReadPosition(), '\n', ReadAbleSize()); return res; } std::string Buffer::GetLine()// 获取一行数据 { char *pos = FindEndOfLine(); if (pos == nullptr) return ""; return ReadAsString(pos - ReadPosition() + 1);/ +1是为了将'\n'一并返回 } std::string Buffer::GetLineAndPop() { std::string str = GetLine(); OffsetReader(str.size()); return str; } void Buffer::Clear()// 清空 { _reader = 0; _writer = 0; }
Buffer扩容机制
3.1.3Socket模块
Socket模块是将套接字的过程封装起来(有阻塞和非阻塞两种模式进行读取和发送数据;增加端口复用功能)。
Socket.hpp
#ifndef SOCKET_HPP #define SOCKET_HPP #include <string> #include <cstring> #include <cstdlib> #include <cstdio> #include <cerrno> #include <fcntl.h> #include <unistd.h> #include <arpa/inet.h> #include <sys/socket.h> #include <sys/types.h> #include <netinet/in.h> #define MAX_LISTEN 64 class Socket { public: Socket(int sockfd = -1); ~Socket(); int GetFd();// 获取套接字文件描述符 bool Create();// 创建套接字 bool Bind(const std::string &ip, uint16_t port);//绑定 bool Listen(int backlog = MAX_LISTEN);//监听 bool Connect(const std::string &ip, uint16_t port);//连接 int Accept();//接收 ssize_t Recv(void *buf, size_t len, int flag = 0);// 默认为阻塞式的读取数据 ssize_t NoBlockRecv(void *buf, size_t len);// 非阻塞式读取 ssize_t Send(const void *buf, size_t len, int flag = 0);// 默认为阻塞式的发送数据 ssize_t NoBlockSend(const void *buf, size_t len);//非阻塞发送 void Close();//关闭 bool CreateServer(uint16_t port, const std::string &ip = "0.0.0.0");// 直接创建一个服务器套接字 bool CreateClient(const std::string &ip, uint16_t port); void ReuseAddr();// 开启端口地址重用 void NonBlock();//设置非阻塞 private: int _sockfd; }; #endif // SOCKET_H
Socket.cpp
#include "Socket.hpp" #include <iostream> #include "server.hpp" Socket::Socket(int sockfd) : _sockfd(sockfd) {} Socket::~Socket() { Close(); } int Socket::GetFd() { return _sockfd; } bool Socket::Create() { _sockfd = socket(AF_INET, SOCK_STREAM, 0);//只支持TCP协议 if (_sockfd < 0) { ERROR_LOG("CREATE SOCKET ERROR: %s", strerror(errno));//错误级日志 return false; } NonBlock();// 任何套接字都设置非阻塞 return true; } bool Socket::Bind(const std::string &ip, uint16_t port) { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(local); int n = bind(_sockfd, (struct sockaddr *)&local, len); if (n < 0) { ERROR_LOG("BIND SOCKET ERROR: %s", strerror(errno)); return false; } return true; } bool Socket::Listen(int backlog) { int n = listen(_sockfd, backlog); if (n < 0) { ERROR_LOG("SOCKET LISTEN ERROR: %s", strerror(errno)); return false; } return true; } bool Socket::Connect(const std::string &ip, uint16_t port) { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = inet_addr(ip.c_str()); socklen_t len = sizeof(local); int n = connect(_sockfd, (struct sockaddr *)&local, len); if (n < 0) { ERROR_LOG("CONNECT SERVER ERROR: %s", strerror(errno)); return false; } return true; } int Socket::Accept() { int connfd = accept(_sockfd, nullptr, nullptr);// 不关心客户端信息 if (connfd < 0) { ERROR_LOG("SOCKET ACCEPT ERROR: %s", strerror(errno)); return -1; } NonBlock(); return connfd; } ssize_t Socket::Recv(void *buf, size_t len, int flag) { ssize_t n = recv(_sockfd, buf, len, flag); if (n <= 0) { if (errno == EAGAIN || errno == EINTR) return 0; ERROR_LOG("SOCKET RECV ERROR: %s", strerror(errno)); return -1; } return n; } ssize_t Socket::NoBlockRecv(void *buf, size_t len) { return Recv(buf, len, MSG_DONTWAIT); } ssize_t Socket::Send(const void *buf, size_t len, int flag) { ssize_t n = send(_sockfd, buf, len, flag); if (n < 0) { if (errno == EAGAIN || errno == EINTR) return 0; ERROR_LOG("SOCKET SEND ERROR: %s", strerror(errno)); return -1; } return n; } ssize_t Socket::NoBlockSend(const void *buf, size_t len) { return Send(buf, len, MSG_DONTWAIT); } void Socket::Close() { if (_sockfd != -1) { close(_sockfd); _sockfd = -1; } } bool Socket::CreateServer(uint16_t port, const std::string &ip) { if (Create() == false) return false; ReuseAddr(); if (Bind(ip, port) == false) return false; if (Listen() == false) return false; return true; } bool Socket::CreateClient(const std::string &ip, uint16_t port) { if (Create() == false) return false;// 创建失败 if (Connect(ip, port) == false) return false;// 连接失败 return true; } void Socket::ReuseAddr() { int val = 1;//缓冲区 //int setsockopt(int sockfd, int level, int optname, const void *optval, socklen_t optlen); //sockfd:套接字文件描述符。level:选项定义的协议层,通常是 SOL_SOCKET。 //optname:选项名称,用于指定要设置的选项。optval:指向包含新选项值的缓冲区的指针。optlen:optval 缓冲区的大小。 //常用选项SO_REUSEADDR:允许在绑定之前关闭处于 TIME_WAIT 状态的套接字,从而允许新的套接字绑定到相同的地址和端口。 setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR, (void *)&val, sizeof(val)); val = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEPORT, (void *)&val, sizeof(val)); } void Socket::NonBlock() { //int fcntl(int fd, int cmd, ... /* arg */ ); //fcntl():对打开的文件描述符fd执行下面描述的操作之一。 //cmd功能:获取/设置文件状态标志(F_GETFL、F_SETFL); int flag = fcntl(_sockfd, F_GETFL, 0);// 获取当前属性 fcntl(_sockfd, F_SETFL, flag | O_NONBLOCK); }
3.1.4Channel模块
Channel模块的工作就是将文件描述符和事件进行一个封装整合
Channel模块涉及到两个大动作:一是事件的设置,事件的设置很简单,直接把事件和套接字绑定即可;二是触发事件之后该做什么。触发事件之后要处理什么动作,这个动作由回调函数决定。
Channel.hpp
#ifndef CHANNEL_HPP #define CHANNEL_HPP #include <functional> #include <sys/epoll.h> #include "EventLoop.hpp" #include <functional> #include <cstdint> class EventLoop; //向前声明 class Channel { public: using EventCallback = std::function<void()>;//触发事件后的回调 Channel(EventLoop* loop, int fd); int GetFd();//获取文件描述符 uint32_t GetEvents();//获取监控事件 uint32_t GetRevents();//获取触发事件 void SetRevents(uint32_t events);//设置就绪事件 //相关的回调函数 void SetReadCallback(const EventCallback& cb); void SetWriteCallback(const EventCallback& cb); void SetErrorCallback(const EventCallback& cb); void SetCloseCallback(const EventCallback& cb); void SetEventCallback(const EventCallback& cb); //判断是否监控了可读和可写的事件 bool ReadAble();//当前文件描述符是否监控了可读事件 bool WriteAble(); //添加读写事件的监控 void EnableRead(); void EnableWrite(); void EnableETMode();//开启边缘触发(ET)模式 //取消读写事件的监控 void DisableRead(); void DisableWrite(); void DisableAll();//取消所有事件的监控 void Update();//更新时间的监控 void Remove();//移除事件的监控 void HandleEvent();//通过判断触发事件判断调用哪个回调函数 private: int _fd; EventLoop* _loop; uint32_t _events;// 需要监控的事件 uint32_t _revents;// 触发的事件 EventCallback _readCallback; EventCallback _read_callback;// 可读事件触发后的回调函数 EventCallback _write_callback; EventCallback _error_callback;// 可读事件触发后的回调函数 EventCallback _close_callback;// 连接断开事件触发后的回调函数 EventCallback _event_callback;// 任意事件触发后的回调函数 }; #endif // CHANNEL_HPP
Channel.cpp
#include "Channel.hpp" #include "EventLoop.hpp" #include <sys/epoll.h> Channel::Channel(EventLoop* loop, int fd) : _fd(fd), _loop(loop) {} int Channel::GetFd() { return _fd; } uint32_t Channel::GetEvents() { return _events; } uint32_t Channel::GetRevents() { return _revents; } void Channel::SetRevents(uint32_t events) { _revents = events; } void Channel::SetReadCallback(const EventCallback& cb) { _read_callback = cb; } void Channel::SetWriteCallback(const EventCallback& cb) { _write_callback = cb; } void Channel::SetErrorCallback(const EventCallback& cb) { _error_callback = cb; } void Channel::SetCloseCallback(const EventCallback& cb) { _close_callback = cb; } void Channel::SetEventCallback(const EventCallback& cb) { _event_callback = cb; } bool Channel::ReadAble() { return (_events & EPOLLIN); } bool Channel::WriteAble() { return (_events & EPOLLOUT); } //socket读触发:socket数据从无到有,会触发epoll_wait EPOLLIN事件,只会触发一次EPOLLIN事件,用户检测到事件后,需一次性把socket接收缓冲区数据全部读取完,读取完的标志为recv返回-1,errno为EAGAIN。 void Channel::EnableRead() { _events |= EPOLLIN; Update(); } //socket写触发:socket可写,会触发一次epoll_wait EPOLLOUT事件。 void Channel::EnableWrite() { _events |= EPOLLOUT; Update(); } void Channel::EnableETMode() { _events |= EPOLLET; Update(); } void Channel::DisableRead() { _events &= ~EPOLLIN; Update(); } void Channel::DisableWrite() { _events &= ~EPOLLOUT; Update(); } void Channel::DisableAll() { _events = 0; Update(); } void Channel::Update() { _loop->UpdateEvent(this); } void Channel::Remove() { _loop->RemoveEvent(this); } void Channel::HandleEvent() //通过判断触发事件判断调用哪个回调函数 { if ((_revents & EPOLLIN) || (_revents & EPOLLRDHUP) || (_revents & EPOLLPRI)) { if (_read_callback) _read_callback(); } if (_revents & EPOLLOUT) { if (_write_callback) _write_callback(); } else if (_revents & EPOLLERR) { if (_error_callback) _error_callback(); } else if (_revents & EPOLLHUP) { if (_close_callback) _close_callback(); } if (_event_callback) _event_callback(); }
3.1.5Poller模块
Poller模块的作用就是进行事件的监控和通知事件触发
Poller.hpp
#ifndef POLLER_HPP #define POLLER_HPP #include <unordered_map> #include <vector> #include <sys/epoll.h> #include <cstring> #include "Channel.hpp" #include <mutex> #define MAX_EPOLLEVENTS 1024 class Channel; class Poller { public: Poller(); void updateEvent(Channel *channel);// 更新事件的监控 void removeEvent(Channel *channel);// 移除事件对某个Channel的事件监控 void poll(std::vector<Channel *> *actives);// 开始监控,并且返回事件触发的Channel private: bool HanChannel(Channel *channel);// 判断Channel对象是否被Poller模块所管理 void Update(Channel *channel, int op);// 更新epoll的监控事件 int _epfd;//epoll文件描述符 struct epoll_event _evs[MAX_EPOLLEVENTS];// 存储触发事件的数组 std::unordered_map<int, Channel *> _channels;/ Poller模块会负责通知事件,通知的对象就是Channel对象 }; #endif // POLLER_HPP
3.1.6Poller.cpp
相关函数
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
函数参数:
- epfd : epoll实例的fd
- op : 操作标志,下文会描述
- fd : 监控对象的fd
- event : 事件的内容,下文描述
op可以有3个值,分别为:
- EPOLL_CTL_ADD : 添加监听的事件
- EPOLL_CTL_DEL : 删除监听的事件
- EPOLL_CTL_MOD : 修改监听的事件
events表示监控的事件的集合,是一个状态值,通过状态位来表示,可以设置如下事件:
- EPOLLIN : 文件可读
- EPOLLOUT : 文件可写
- EPOLLPRI : 文件有紧急数据可读
- EPOLLHUP : 文件被挂断。这个事件是一直监控的,即使没有明确指定
#include "Poller.hpp" #include "server.hpp" #define MAX_EPOLLEVENTS 1024 Poller::Poller() : _channels() { _epfd = epoll_create(20); if (_epfd < 0) { ERROR_LOG("EPOLL CREATE ERROR: %s", strerror(errno)); abort(); } } bool Poller::HanChannel(Channel *channel) { if (channel == nullptr) { ERROR_LOG("Null channel passed to HanChannel"); return false; } auto it = _channels.find(channel->GetFd()); if (it == _channels.end()) { return false; } return true; } void Poller::Update(Channel *channel, int op) { if (channel == nullptr) { ERROR_LOG("Null channel passed to Update"); return; } int fd = channel->GetFd(); struct epoll_event ev; ev.data.fd = fd; ev.events = channel->GetEvents(); int n = epoll_ctl(_epfd, op, fd, &ev); if (n < 0) { ERROR_LOG("EPOLLCTL ERROR: %s", strerror(errno)); } } void Poller::updateEvent(Channel *channel) { if (channel == nullptr) { ERROR_LOG("Null channel passed to updateEvent"); return; } bool ret = HanChannel(channel); if (!ret) { _channels.insert(std::make_pair(channel->GetFd(), channel)); Update(channel, EPOLL_CTL_ADD); return; } Update(channel, EPOLL_CTL_MOD); } //假设某个文件描述符的连接断开了,就需要取消该文件描述符的事件监控,就需要通过RemoveEvent()来完成。 void Poller::removeEvent(Channel *channel) { if (channel == nullptr) { ERROR_LOG("Null channel passed to removeEvent"); return; } auto it = _channels.find(channel->GetFd()); if (it != _channels.end()) { _channels.erase(it); } Update(channel, EPOLL_CTL_DEL); } void Poller::poll(std::vector<Channel *> *actives) { int ret = epoll_wait(_epfd, _evs, MAX_EPOLLEVENTS, -1); if (ret < 0) { if (errno == EINTR) return; ERROR_LOG("EPOLL WAIT ERROR: %s", strerror(errno)); abort(); } for (int i = 0; i < ret; i++) { auto it = _channels.find(_evs[i].data.fd); if (it == _channels.end()) { ERROR_LOG("Poller does not manage channel for fd %d", _evs[i].data.fd); abort(); } it->second->SetRevents(_evs[i].events); actives->push_back(it->second); } }
3.1.6Timerwheel模块
该模块由TimerTask类和TimerWheel类组成
主要任务:定时器可以用来定时处理某些任务,在服务器的典型用处就是定时处理一些非活跃的连接,以释放服务器资源。
TimerTask.hpp
主要对定时任务的处理(如:取消定时)
#ifndef TIMERTASK_HPP #define TIMERTASK_HPP #include <functional> #include <cstdint> using TaskFunc = std::function<void()>; using ReleaseFunc = std::function<void ()>; class TimerTask { public: TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb, int turns); ~TimerTask(); void Cancel(); // 取消定时任务 void SetRelease(const TaskFunc &cb); uint32_t DelayTime();// 返回定时时间 void ReduceTurns();// 减少圈数 int GetTurns()const;// 获得圈数 void SetTurns(int turns); private: uint64_t _id; // 定时任务id,方便定位、查询、管理 uint32_t _timeout; // 定时任务的超时时间,即多久之后执行任务 bool _canceled; // 是否取消定时任务 TaskFunc _task_cb; // 定时器任务 ReleaseFunc _release; // 删除TimerWheel当中保存的TimerTask信息,防止内存泄漏 int _turns; // 圈数 }; #endif // TIMERTASK_HPP
TimerTask.cpp
#include "TimerTask.hpp" TimerTask::TimerTask(uint64_t id, uint32_t timeout, const TaskFunc &cb, int turns) : _id(id), _timeout(timeout), _canceled(false),_task_cb(cb), _turns(turns) { }// 默认不取消定时任务 TimerTask::~TimerTask() { if (_canceled == false) _task_cb(); // 对象析构时执行定时任务 _release(); // 释放TimerWheel中所管理的TimerTask资源 } void TimerTask::Cancel() { _canceled = true; } void TimerTask::SetRelease(const ReleaseFunc &cb) { _release = cb; } uint32_t TimerTask::DelayTime() { return _timeout; } //每当秒针移动到下一个刻度时,检查该刻度上的所有任务。如果任务的圈数 _turns 大于0,则递减 _turns。当 _turns 变为0时,任务才有资格被执行。 void TimerTask::ReduceTurns() { --_turns; } int TimerTask::GetTurns() const{ return _turns; } //为了处理延时大于60秒的定时任务,每个任务有一个 _turns 成员变量。这个变量记录了任务需要经过的完整时间轮圈数。 void TimerTask::SetTurns(int turns) { _turns = turns; }
TimerWheel.hpp
shared_ptr指针:对于开辟在堆区的内存,我可以使用多个指针指向它,就相当于我先在堆区开辟一块内存使用一个指针指向这片内存区域,然后给这个指针取很多个别名;
weak_ptr指针:由shared_ptr不正当使用引发的错误来而引出,weak_ptr只能访问所指向的内存区域,当weak_ptr指针生命结束之时,其所指向的内存依旧完好无损
#ifndef TIMERWHEEL_HPP #define TIMERWHEEL_HPP #include <unordered_map> #include <vector> #include <memory> #include <sys/timerfd.h> #include <unistd.h> #include <cstring> #include "Channel.hpp" #include "EventLoop.hpp" #include "TimerTask.hpp" #include <functional> #include <stdint.h> #include <memory> class EventLoop; class Channel; // 前向声明 using TaskFunc = std::function<void()>; class TimerWheel { private: //weak_ptr只能访问所指向的内存区域,当weak_ptr指针生命结束之时,其所指向的内存依旧完好无损, //由shared_ptr不正当使用引发的错误来引出weak_ptr using WeakTask = std::weak_ptr<TimerTask>; // 指向TimerTask的弱指针 using PtrTask = std::shared_ptr<TimerTask>; // 指向TimerTask的引用计数型指针 static int CreateTimerfd(); int ReadTimerfd(); void RunTimerTask(); void OnTime(); // 超时时间到,读事件触发,读事件触发后的回调函数 void RemoveTimer(uint64_t id); void TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb); void TimerRefreshInLoop(uint64_t id); void TimerCancelInLoop(uint64_t id); public: TimerWheel(EventLoop *loop); ~TimerWheel(); void addTimer(uint64_t id, uint32_t delay, const TaskFunc &callback); //void addTimer(int timeout, std::function<void()> callback); // void removeTimer(uint64_t id); void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); bool HasTimer(uint64_t id); private: int _tick; // 秒针,心博,每秒钟变化一次 int _capacity; // 表盘的最大数量,模拟钟表 std::vector<std::vector<PtrTask>> _wheel; // 时间轮,存放TimerTask的智能指针 std::unordered_map<uint64_t, WeakTask> _timers; // 管理TimerTask对象 EventLoop *_loop; int _timerfd; std::unique_ptr<Channel> _timer_channel; }; #endif // TIMERWHEEL_HPP
TimerWheel.cpp
int timerfd_create(int clockid, int flags);//创建timerfd描述符 //参数: //1、clockid可以填CLOCK_REALTIME,CLOCK_MONOTONIC //CLOCK_REALTIME:系统实时时间,随系统实时时间改变而改变,即从UTC1970-1-1 0:0:0开始计时, //中间时刻如果系统时间被用户改成其他,则对应的时间相应改变 //CLOCK_MONOTONIC:从系统启动这一刻起开始计时,不受系统时间被用户改变的影响 //2、flags可以填0,O_CLOEXEC,O_NONBLOCK
#include "TimerWheel.hpp" #include "TimerTask.hpp" #include "server.hpp" #include "Channel.hpp" #include <vector> #include <sys/timerfd.h> #include <unistd.h> #include <cstring> #include <iostream> TimerWheel::TimerWheel(EventLoop *loop) : _tick(0), _capacity(60), _wheel(_capacity), _loop(loop), _timerfd(CreateTimerfd()), _timer_channel(new Channel(_loop, _timerfd))// 每一个文件描述符都会配备一个Channel对象 { _timer_channel->SetReadCallback(std::bind(&TimerWheel::OnTime, this)); _timer_channel->EnableRead();// 启动读事件监控 } TimerWheel::~TimerWheel() { close(_timerfd); } int TimerWheel::CreateTimerfd()// 创建定时器 { int timerfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK | TFD_CLOEXEC); if (timerfd < 0) { std::cerr << "TIMERFD CREATE ERROR: " << strerror(errno) << std::endl; abort(); } struct itimerspec itime; itime.it_value.tv_sec = 1; itime.it_value.tv_nsec = 0; itime.it_interval.tv_sec = 1; itime.it_interval.tv_nsec = 0; timerfd_settime(timerfd, 0, &itime, nullptr); return timerfd; } int TimerWheel::ReadTimerfd() { uint64_t times; int n = read(_timerfd, ×, 8);// 只能8个字节的读 if (n < 0) { std::cerr << "READ TIMEFD FAILED: " << strerror(errno) << std::endl; abort(); } // 每次从_timerfd当中读取数据后,_timerfd内的内容会被清空,所以读事件不会重复被触发 return times;// 返回值是超时次数 } void TimerWheel::RunTimerTask() { _tick = (_tick + 1) % _capacity;// 秒针转动一次 for (auto it = _wheel[_tick].begin(); it != _wheel[_tick].end();) { if ((*it)->GetTurns() >= 1)// 圈数>=1的定时任务不应该被执行,而是减少圈数 { (*it)->ReduceTurns(); ++it; } else { it = _wheel[_tick].erase(it); } } } void TimerWheel::OnTime()// 超时时间到,读事件触发,读事件触发后的回调函数 { int times = ReadTimerfd(); for (int i = 0; i < times; i++)// 返回的是超时次数,超时几次就处理几次任务 { RunTimerTask(); } } void TimerWheel::RemoveTimer(uint64_t id) { auto it = _timers.find(id); if (it != _timers.end()) { _timers.erase(it); } } void TimerWheel::TimerAddInLoop(uint64_t id, uint32_t delay, const TaskFunc &cb) { int turns = delay / _capacity;// 计算圈数 PtrTask pt(new TimerTask(id, delay, cb, turns));// 创建TimerTask对象 pt->SetRelease(std::bind(&TimerWheel::RemoveTimer, this, id));// 设置TimerTask析构时,取消TimerWheel对其的管理 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt); _timers[id] = WeakTask(pt); } void TimerWheel::TimerRefreshInLoop(uint64_t id) // 真实的刷新定时器 { auto it = _timers.find(id); if (it == _timers.end()) { return; } PtrTask pt = it->second.lock();// 弱指针向shared_ptr转化 int delay = pt->DelayTime(); int turns = delay / _capacity; pt->SetTurns(turns);// 设置圈数 int pos = (_tick + delay) % _capacity; _wheel[pos].push_back(pt);// 重新添加新的定时任务对象 } void TimerWheel::TimerCancelInLoop(uint64_t id) { auto it = _timers.find(id); if (it == _timers.end()) { return; } PtrTask pt = it->second.lock(); if (pt) pt->Cancel(); } void TimerWheel::addTimer(uint64_t id, uint32_t delay, const TaskFunc &callback) { _loop->RunInLoop(std::bind(&TimerWheel::TimerAddInLoop, this, id, delay, callback)); } void TimerWheel::TimerRefresh(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerRefreshInLoop, this, id)); } void TimerWheel::TimerCancel(uint64_t id) { _loop->RunInLoop(std::bind(&TimerWheel::TimerCancelInLoop, this, id)); } bool TimerWheel::HasTimer(uint64_t id)//检查定时任务是否完成 { auto it = _timers.find(id); if (it == _timers.end()) { return false; } return true; }
_tick;// 秒针,钟变化一次
_capacity;// 表盘的最大数量,模拟钟表
_wheel;// 时间轮,存储了指向TimerTask对象的shared_ptr
_timers;// 管理TimerTask对象,存储的Val值是一个weak_ptr,weak_ptr可以升级成为shared_ptr,这样一来,weak_ptr就具有了探测指向的对象是否存在的功能了。
本项目模拟的是一个钟表,每个定时任务放在每个钟表刻度上,秒针指向了哪个刻度,哪个定时任务就执行(或圈数减1)。
为了处理延时大于60秒的定时任务,每个任务有一个 圈数_turns 成员变量。这个变量记录了任务需要经过的完整时间轮圈数。每当秒针移动到下一个刻度时,检查该刻度上的所有任务。如果任务的圈数 _turns
大于0,则递减 _turns
。当 _turns
变为0时,任务才有资格被执行。
3.1.7EvenLoop模块
EventLoop的功能是进行事件循环、事件监听、事件处理和定时任务。
使用者在不同的线程操作了同一个EventLoop对象,这就很容易导致线程安全问题。
使用了一种解决方案,那就是在EventLoop当中放一个任务队列。思路是这样的:在执行任何一个有可能导致线程安全问题的函数时都判断一下执行该函数的线程是否是EventLoop对象构造时的线程,如果是,那么直接执行;如果不是,就将函数封装成一个一个任务对象压入任务队列,待EventLoop处理完所有的触发事件后再统一处理任务队列的所有任务。
EvenLoop.hpp
#ifndef EVENTLOOP_HPP #define EVENTLOOP_HPP #include <functional> #include <vector> #include <thread> #include <mutex> #include <memory> #include <sys/eventfd.h> #include "Channel.hpp" #include "Poller.hpp" #include "TimerWheel.hpp" class Poller; class Channel; // 前向声明 class TimerWheel; // 前向声明 class EventLoop { public: using Functor = std::function<void()>; EventLoop(); ~EventLoop(); void Start(); bool IsInLoop();// 判断当前EventLoop对象是否处于构造线程中 void AssertInLoop(); void RunInLoop(const Functor &cb);// 所有任务的执行都必须经过这个接口 void QueueInLoop(const Functor &cb); void UpdateEvent(Channel *channel); void RemoveEvent(Channel *channel); void TimerAdd(uint64_t id, uint32_t delay, const Functor &cb); void TimerRefresh(uint64_t id); void TimerCancel(uint64_t id); bool HasTimer(uint64_t id); private: void RunAllTask();// 执行任务队列当中的所有任务 static int CreateEventFd(); void ReadEventfd();// 从_event_fd当中读取数据 void WeakUpEventFd();// 向_evenfd_fd写入数据,即触发_event_fd的可读事件 std::thread::id _thread_id; 线程id int _event_fd;// eventfd的返回值,必须要有这个,如果任务队列当中有任务,但是没有IO事件触发,任务队列的任务就一直不会执行 std::unique_ptr<Channel> _event_channel; bool Running; std::unique_ptr<Poller> _poller; std::unique_ptr<TimerWheel> _timer_wheel; // 使用智能指针 std::vector<Functor> _tasks;// 任务队列 std::mutex _mutex;// 保证任务队列的互斥访问 }; #endif // EVENTLOOP_HPP
EvenLoop.cpp
#include "EventLoop.hpp" #include "server.hpp" // 假设有一个日志模块,用于记录错误信息 #include "Channel.hpp" #include <vector> #include <fcntl.h> #include <unistd.h> #include <sys/eventfd.h> EventLoop::EventLoop() : _thread_id(std::this_thread::get_id()), _event_fd(CreateEventFd()), _event_channel(new Channel(this, _event_fd)), _poller(new Poller()), _timer_wheel(new TimerWheel(this)) { // 使用 std::unique_ptr 的构造函数 // _event_fd也需要被监听 _event_channel->SetReadCallback(std::bind(&EventLoop::ReadEventfd, this)); _event_channel->EnableRead(); } EventLoop::~EventLoop() {} void EventLoop::Start() { while (true) { std::vector<Channel *> actives; _poller->poll(&actives); // 调用 poll 方法 for (auto &channel : actives) { channel->HandleEvent(); // 挨个处理事件触发之后的任务 } RunAllTask(); // 最后执行任务队列的所有任务 } } bool EventLoop::IsInLoop()// 判断当前EventLoop对象是否处于构造线程中 { return (_thread_id == std::this_thread::get_id()); } void EventLoop::AssertInLoop() { if (_thread_id != std::this_thread::get_id()) abort(); } void EventLoop::RunInLoop(const Functor &cb) { if (IsInLoop()) { return cb(); // 处于构造线程的任务直接执行 } QueueInLoop(cb); // 否则压入任务队列 } void EventLoop::QueueInLoop(const Functor &cb) { { std::unique_lock<std::mutex> _lock(_mutex); _tasks.push_back(cb); } WeakUpEventFd(); // 任务队列有任务,向_event_fd写入数据,触发读事件,读事件触发后才会执行RunAllTask()继而执行任务队列的任务 } void EventLoop::UpdateEvent(Channel *channel) { _poller->updateEvent(channel); // 调用 updateEvent 方法 } void EventLoop::RemoveEvent(Channel *channel) { _poller->removeEvent(channel); // 调用 removeEvent 方法 } void EventLoop::TimerAdd(uint64_t id, uint32_t delay, const Functor &cb) { _timer_wheel->addTimer(id, delay, cb); // 确保调用方法签名匹配 } void EventLoop::TimerRefresh(uint64_t id) { _timer_wheel->TimerRefresh(id); } void EventLoop::TimerCancel(uint64_t id) { _timer_wheel->TimerCancel(id); } bool EventLoop::HasTimer(uint64_t id) { return _timer_wheel->HasTimer(id); } void EventLoop::RunAllTask() { std::vector<Functor> functor; { std::unique_lock<std::mutex> _lock(_mutex); _tasks.swap(functor); // 交换之后,_tasks就为空了,其他线程就没有任务执行了 } for (auto &f : functor) { f(); // 执行任务 } } int EventLoop::CreateEventFd() { int efd = eventfd(0, EFD_CLOEXEC | EFD_NONBLOCK); if (efd < 0) { ERROR_LOG("CREATE EVENTFD ERROR: %s", strerror(errno)); abort(); } return efd; } void EventLoop::ReadEventfd() { uint64_t res = 0; int ret = read(_event_fd, &res, sizeof(res)); if (ret < 0) { if (errno == EINTR || errno == EAGAIN) { return; } ERROR_LOG("READ EVENTFD ERROR: %s", strerror(errno)); abort(); } } void EventLoop::WeakUpEventFd() { uint64_t val = 1; int ret = write(_event_fd, &val, sizeof(val)); if (ret < 0) { if (errno == EINTR) { return; } ERROR_LOG("WRITE EVENTFD ERROR: %s", strerror(errno)); abort(); } }
整合测试1
test1.cpp
#include <iostream> #include <functional> #include <cstring> #include "server.hpp" // Assuming these are the header files for Socket, Channel, Buffer, and EventLoop #include "Channel.hpp" #include "EventLoop.hpp" #include "TimerTask.hpp" #include "TimerWheel.hpp" #include "Socket.hpp" #include "Buffer.hpp" EventLoop loop; void WriteHandle(Socket *sock, Channel *ch, Buffer *buf) { // Send the data from the buffer to the socket sock->Send(buf->ReadPosition(), buf->ReadAbleSize()); // Disable write events and enable read events for the channel ch->DisableWrite(); ch->EnableRead(); } void ReadHandle(Socket *sock, Channel *ch) { char buffer[1024] = {0}; // Buffer to store received data ssize_t n = sock->Recv(buffer, sizeof(buffer) - 1); // Receive data from the socket if (n > 0) { buffer[n] = 0; // Null-terminate the received data DEBUG_LOG("Received message from connection %d: %s", sock->GetFd(), buffer); // Create a new buffer and write the received data into it Buffer *buf = new Buffer; buf->WriteAndPush(buffer, strlen(buffer)); // Disable read events and set the write callback for the channel ch->DisableRead(); ch->SetWriteCallback(std::bind(WriteHandle, sock, ch, buf)); ch->EnableWrite(); } else { // Handle the case where no data was received or an error occurred ERROR_LOG("Failed to receive data from connection %d", sock->GetFd()); } } void AcceptHandle(Socket *sock) { int connfd = sock->Accept(); // Accept a new connection if (connfd >= 0) { DEBUG_LOG("Accepted new connection: %d", connfd); // Create new socket and channel for the accepted connection Socket *connsock = new Socket(connfd); Channel *connch = new Channel(&loop, connsock->GetFd()); // Set the read callback for the new channel and enable read events connch->SetReadCallback(std::bind(ReadHandle, connsock, connch)); connch->EnableRead(); } else { // Handle the case where accepting the connection failed ERROR_LOG("Failed to accept new connection"); } } int main() { // Create and start the server socket Socket lissock; bool ret = lissock.CreateServer(9090); if (!ret) { ERROR_LOG("Failed to create server"); return -1; } // Create a channel for the listening socket and set the accept callback Channel lisch(&loop, lissock.GetFd()); lisch.SetReadCallback(std::bind(AcceptHandle, &lissock)); lisch.EnableRead(); // Start the event loop loop.Start(); return 0; }
Makefile
# Compiler CXX = g++ # Compiler flags CXXFLAGS = -std=c++11 -Wall -Iinclude # Linker flags LDFLAGS = -pthread # Directories INCLUDE_DIR = include SRC_DIR = src OBJ_DIR = obj # Source files SRCS = $(wildcard $(SRC_DIR)/*.cpp) # Object files OBJS = $(patsubst $(SRC_DIR)/%.cpp,$(OBJ_DIR)/%.o,$(SRCS)) # Executable name TARGET = test1 # Default rule all: $(TARGET) # Link the object files to create the executable $(TARGET): $(OBJS) $(CXX) $(CXXFLAGS) -o $@ $^ $(LDFLAGS) # Compile source files to object files $(OBJ_DIR)/%.o: $(SRC_DIR)/%.cpp | $(OBJ_DIR) $(CXX) $(CXXFLAGS) -c -o $@ $< # Create the object directory if it doesn't exist $(OBJ_DIR): mkdir -p $(OBJ_DIR) # Clean up the build artifacts clean: rm -rf $(OBJ_DIR) $(TARGET) # Phony targets .PHONY: all clean
在项目的路径下运行以下命令
make ./test1
运行结果
未完待续........
(由于篇幅过长,今天就先分享到这,续集见下一篇)