63 epoll服务器 (ET模式)

avatar
作者
筋斗云
阅读量:0

基于LT模式修改,并加入前面的应用层计算器,实现稍完整的服务器功能
1.修改tcp_socket.hpp,新增非阻塞读和非阻塞写接口
2.对于accept返回的new_sock加上EPOLLET这样的选项

注意:此代码暂时未考虑listen_sock ET的情况,如果将listen_sock设为ET,则需要非阻塞轮询的方式accept,否则会导致同一时刻大量的客户端同时连接的情况,只能accept一次的问题

目录

  1. 整体结构
  2. 流程
  3. 运行示例
  4. 改进
  5. Reactor的理论

1. 整体结构

在这里插入图片描述
TpcServer服务器类,维护_listen套接字,用来获取连接和监听读写事件,map用套接字做键值,rev数组作为epoll_wait的输出参数
在这里插入图片描述

每一个连接都是一个session结构,包含读写缓冲区,ip和端口方便调试,读写错误的回调处理函数,指针回指服务器
在这里插入图片描述
nocopy类用来给某些类提供无法拷贝的功能
socket类,提供套接字的创建,监听等功能
协议类和计算类为前面章节的内容,用来对收到的数据处理返回结果
Epoll类提供epoll多路转接功能
comm类单独拎出来的设置非阻塞功能,因为很多地方都要用到
在这里插入图片描述

2. 流程

服务端

TpcServer.hpp
继承enable_shared_from_this类可以解决智能指针不能用this构造的问题,使用智能指针对象需要用shared_from_this()功能来获取
在这里插入图片描述
定义两个函数模板,构造时传入报文处理回调
在这里插入图片描述

在这里插入图片描述
Init函数初始化套接字,设置非阻塞,AddConnection函数添加listen套接字到关注事件中,绑定事件分配函数Accepter
在这里插入图片描述

AddConnection函数参数传入要设置的套接字,事件,三个函数,ip和端口用来调试

在这里插入图片描述

Connection
构造时传入sock初始化成员变量
在这里插入图片描述
作为连接管理类,需要管理每个连接的发送和接收缓冲区,所以提供存入缓冲区数据的和返回缓冲区内容的功能,再提供初始化自己成员函数的功能
在这里插入图片描述在这里插入图片描述在这里插入图片描述
TpcServer.hpp
继续说明AddConnection函数,这个函数的作用为每个连接初始化session,添加关注事件和管理,后面每个新链接都要用这个函数

构造一个Connection的临时对象
设置成员TpcServer和回调函数,ip和port
添加对象到map结构里,添加listen的事件,listen关注读

Accpeter连接管理器函数,参数是事件就绪的会话
在这里插入图片描述

不一定只有一个连接到来,所以需要循环读取。用accept获取就绪连接,设置非阻塞后,调用AddConnection函数加入会话管理,作为连接会话三个回调函数分别是读写错误

当错误码是EWOULDBLOCK的时候,说明已经获取完,退出循环,EINTR表示系统调用被信号中断,所以继续读取,其他情况退出

Recver数据读取函数,用来提供读取数据添加到Connection缓冲区的功能
在这里插入图片描述在这里插入图片描述

首先判断了连接的生命周期,如果消亡就退出。通过lock获取一个shared指针对象。
因为是ET模式,所以一次性需要读完所有数据,用recv函数,返回值n大于0表示读取到数据,添加到接收缓冲区中,等于0对方客户度退出,调用错误处理函数,小于0和上面一样判断是否读完,不是就走错误处理

最后将读取到的数据交给处理函数,所有报文情况都由它处理

Sender函数,获取连接的发送缓冲区发送,一次性将数据都发送,返回值大于0发送成功,将发送了的内容删除,判断如果发送缓冲区为空就退出。0表示没发送任何内容也退出,其他情况判断是否走错误处理
在这里插入图片描述

epoll/select/poll,因为写事件经常都是就绪的,发送缓冲区基本会有空间,如果设置了写关心,每次都会就绪,经常返回浪费cpu资源。所以对于读,需要设置常关心,写,按需求设置
当发送完后,检查缓冲区不为空,没发送完就对写事件开启关心,发送完将事件关闭

EnableEvent函数,设置套接字的读和写,根据传入的参数,判断有没有读和写,通过三木运算符,有就加入event,最后修改套接字的事件
在这里插入图片描述
Excepter函数,错误处理函数,遇到错误就是关闭这个链接。如果连接在读和写时发生错误,用这个函数。取消这个套接字的所有关心,关闭文件,map中移除
在这里插入图片描述

IsSafeConnection函数,检查链接是否合法,遍历map,是否存在
在这里插入图片描述

主逻辑
Loop函数,服务器的运行循环,传入超时时间,不断调用事件分配函数和打印连接函数
在这里插入图片描述

PrintConnection函数,打印出map中所有的fd,用来调试
在这里插入图片描述

Dispatcher函数,timeout等待时间是上一个函数传入。不断wait监听revs数组添加了的套接字,n会返回就绪的个数,取到套接字和事件,将异常转为读写统一处理。如果是读事件就绪,并且连接合法,就调用读取函数,写事件调用写函数

在这里插入图片描述

TpcServer.cc
全局的计算类对象,DefaultOnMessage函数是默认的报文处理函数,对报文的完整性判断,计算返回结果并发送
调用计算类的函数,判断返回的字符串是否为空,为空说明报文不完整或有错误。如果处理完成,将结果加入到发送缓冲区,用tcpserver对象发送

在这里插入图片描述
main函数创建svr对象,传入报文处理函数,启动服务器
在这里插入图片描述

客户端

客户端链接服务器,生成5个随机报文发送接收结果打印
是前面章节的网络计算器
网络计算器

3. 运行示例

在这里插入图片描述

4. 全

TcpServer.hpp

#pragma ocne #include <iostream> #include <memory> #include <functional> #include <unordered_map> #include "Comm.hpp" #include "log.hpp" #include "Epoll.hpp" #include "Socket.hpp"  class Connection; class TpcServer; using func_t = std::function<void(std::weak_ptr<Connection>)>; // 用户缓冲区处理函数模板 using except_func_t = std::function<void(std::weak_ptr<Connection>)>; static const uint16_t port = 8000; static const int g_buff_size = 128; // 设置et uint32_t EVENT_IN = (EPOLLIN | EPOLLET); uint32_t EVENt_OUT = (EPOLLOUT | EPOLLET);  class Connection { public:     Connection(int sock)     {         _sock = sock;     }      ~Connection()     {      }      void AppendInbuff(const std::string& message)     {         _inbuff += message;     }      void AppendOutbuff(const std::string& message)     {         _outbuff += message;     }      int Fd()     {         return _sock;     }      std::string& Inbuffer()  // for debug     {         return _inbuff;     }      std::string& Outbuffer()  // for debug     {         return _outbuff;     }      void SetHandler(func_t recv_cb, func_t send_cb, except_func_t except_cb)     {         _recv_cb = recv_cb;         _send_cb = send_cb;         _except_cb = except_cb;     }      void SetWeakPtr(std::weak_ptr<TpcServer> tcp_setver_ptr)     {         _tcp_server_ptr = tcp_setver_ptr;     }  private:     int _sock;     std::string _inbuff;  // string不能二进制, 需要vector     std::string _outbuff; public:     func_t _recv_cb;     func_t _send_cb;     except_func_t _except_cb;      std::weak_ptr<TpcServer> _tcp_server_ptr;  // 回指向服务器     std::string _ip;     uint16_t _port; };  class TpcServer :public std::enable_shared_from_this<TpcServer>, public nocopy {     static const int num = 64;  public:     TpcServer(func_t OnMessage)         : _listensocket_ptr(new Sock())         , _epoll_ptr(new Epoll())         , _OnMessage(OnMessage)         , _quit(true)     {     }      void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, \                         except_func_t except_cb, const std::string& ip = "0.0.0.0",                          uint16_t port = 0)     {         // 1. 给sock创建connection对象, 将lstensock添加到connection中         // 同时,listeinsock和connection放入_connections         std::shared_ptr<Connection> new_con(new Connection(sock));         new_con->SetWeakPtr(shared_from_this());  // 返回当前对象的shared_ptr         new_con->SetHandler(recv_cb, send_cb, except_cb);         new_con->_ip = ip;         new_con->_port = port;         // 2. 添加到map         _connections.insert(std::make_pair(sock, new_con));         // 3. 添加对应事件         _epoll_ptr->EpollUpdate(EPOLL_CTL_ADD, sock, event);      }      void Init()     {         _listensocket_ptr->Socket();         int opt = 1;         setsockopt(_listensocket_ptr->Fd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT,                     &opt, sizeof(opt));         // et模式需要非阻塞         SetNonBlock(_listensocket_ptr->Fd());         lg.logmessage(info, "listensock create success:%d", _listensocket_ptr->Fd());          _listensocket_ptr->Bind(port);         _listensocket_ptr->Listen();         // 关联connection         AddConnection(_listensocket_ptr->Fd(), EVENT_IN,                       std::bind(&TpcServer::Accepter, this, std::placeholders::_1),                        nullptr, nullptr);     }      void Accepter(std::weak_ptr<Connection> con)     {         // 获取强引用对象, 检查是否销毁         auto connection = con.lock();         // 获取新链接         while (true)         {             struct sockaddr_in peer;             socklen_t len = sizeof(peer);             // ::调用原生函数             int sock = ::accept(connection->Fd(), (struct sockaddr *)&peer, &len);             if (sock > 0)             {                 char ipbuf[128];                 inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));                 uint16_t port = ntohs(peer.sin_port);                 lg.logmessage(info, "get a new clinet[%s:%d]:%d", ipbuf, port, sock);                 // 设置非阻塞                 SetNonBlock(sock);                 // 添加连接事件                 AddConnection(sock, EVENT_IN,\                               std::bind(&TpcServer::Recver, this, std::placeholders::_1),\                               std::bind(&TpcServer::Sender, this, std::placeholders::_1),\                               std::bind(&TpcServer::Excepter, this, std::placeholders::_1),\                               ipbuf, port);             }             else              {                 if (errno == EWOULDBLOCK)                 {                     break;                 }                 else if (errno == EINTR)  // 信号中断                 {                     continue;                 }                 else                 {                     break;                 }             }         }     }      void Recver(std::weak_ptr<Connection> con)     {         if (con.expired())             return;         auto connec = con.lock();         int sock = connec->Fd();          while (true)         {             char buff[g_buff_size];             memset(buff, 0, sizeof(buff));             ssize_t n = recv(sock, buff, sizeof(buff) - 1, 0);  // 非阻塞读取             if (n > 0)             {                 connec->AppendInbuff(buff);             }             else if (n == 0)  // 错误处理             {                 lg.logmessage(info, "sockfd:%d, client[%s:%d] quit", sock, connec->_ip.c_str(), connec->_port);                 connec->_except_cb(connec);                 return;             }             else             {                 if (errno == EWOULDBLOCK)  // 读完                 {                     break;                 }                 else if (errno == EINTR)                 {                     continue;                 }                 else                 {                     lg.logmessage(warning, "sockfd:%d, client[%s:%d] recv error", sock, connec->_ip.c_str(), connec->_port);                     connec->_except_cb(connec);                     return;                 }             }         }          // 数据有了, 不一定安全 1.检测 2.如果有完整报文,处理         _OnMessage(connec);     }      void Sender(std::weak_ptr<Connection> con)     {         if (con.expired())             return;         auto connection = con.lock();         auto &outbuff = connection->Outbuffer();         while (true)         {             ssize_t n = send(connection->Fd(), outbuff.c_str(), outbuff.size(), 0);             if (n > 0)             {                 outbuff.erase(0, n);                 if (outbuff.empty())                 {                     break;                 }             }             else if (n == 0)             {                 return;  // 没有发             }             else             {                 if (errno == EWOULDBLOCK)                 {                     break;                 }                 else if (errno == EINTR)                 {                     continue;                 }                 else                 {                     lg.logmessage(info, "sockfd:%d, client[%s:%d] recv error", connection->Fd(), connection->_ip.c_str(), connection->_port);                     connection->_except_cb(connection);                     return;                 }             }         }          // 没发完, 开启对写事件关心         if (!outbuff.empty())         {             EnableEvent(connection->Fd(), true, true);         }         else         {             EnableEvent(connection->Fd(), true, false);         }     }      void EnableEvent(int sock, bool readable, bool writeable)     {         uint32_t evnet = 0;         evnet |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);         _epoll_ptr->EpollUpdate(EPOLL_CTL_MOD, sock, evnet);     }      void Excepter(std::weak_ptr<Connection> con)     {         if (con.expired())             return;         auto connection = con.lock();         int fd = connection->Fd();         lg.logmessage(warning, "Excepter handler:%d, client[%s:%d] excepter error", connection->Fd(), connection->_ip.c_str(), connection->_port);          // 1. 移除关心         _epoll_ptr->EpollUpdate(EPOLL_CTL_DEL, fd, 0);         // 2. 关闭文件         lg.logmessage(debug, "close %d...", fd);         close(fd);         // 3. unordered_map中移除         lg.logmessage(debug, "remove connection %d", fd);         _connections.erase(fd);     }      bool IsSafeConnection(int sock)     {         auto it = _connections.find(sock);         if (it == _connections.end())         {             return false;         }         else         {             return true;         }       }      void Dispatcher(int timeout)     {         int n = _epoll_ptr->EpollWait(_revs, num, timeout);         for (int i = 0; i < n; i++)         {             int sock = _revs[i].data.fd;             uint32_t event = _revs[i].events;              // 统一将异常转换为读写问题             if (event & EPOLLERR)             {                 event |= (EPOLLIN | EPOLLOUT);             }              if (event & EPOLLHUP)             {                 event |= (EPOLLIN | EPOLLOUT);             }              if ((event & EPOLLIN) && IsSafeConnection(sock))             {                 if (_connections[sock]->_recv_cb)                 {                     _connections[sock]->_recv_cb(_connections[sock]);                 }             }              if ((event & EPOLLOUT) && IsSafeConnection(sock))             {                 if (_connections[sock]->_send_cb)                 {                     _connections[sock]->_send_cb(_connections[sock]);                 }             }         }     }      void Loop()     {         _quit = false;          while (!_quit)         {             Dispatcher(3000);             PrintConnection();         }          _quit = true;     }      void PrintConnection()     {         std::cout << "_connection list: ";         for (auto &con: _connections)         {             std::cout << con.second->Fd() << ",";         }          std::cout << std::endl;     }  private:     std::shared_ptr<Sock> _listensocket_ptr;  // 监听socket, 可以移到外部     std::shared_ptr<Epoll> _epoll_ptr;  // 内核     std::unordered_map<int, std::shared_ptr<Connection>> _connections;     struct epoll_event _revs[num];     func_t _OnMessage;     bool _quit; }; 

TcpServer.cc

#include <memory> #include "TpcServer.hpp" #include "Calculator.hpp"  Calculator calculator; void DefaultOnMessage(std::weak_ptr<Connection> con) {      if(con.expired()) return;      auto connection_ptr = con.lock();      std::cout << connection_ptr->Inbuffer() << std::endl;      std::string response_str = calculator.Handler(connection_ptr->Inbuffer());  // 业务逻辑简单,如果复杂,需要拿到结果单独线程处理      if (response_str.empty())      {          return;      }       lg.logmessage(debug, "%s", response_str.c_str());      connection_ptr->AppendOutbuff(response_str);      //connection_ptr->_send_cb(connection_ptr);       auto tcpserver = connection_ptr->_tcp_server_ptr.lock();        tcpserver->Sender(connection_ptr); }  int main() {     std::shared_ptr<TpcServer> svr(new TpcServer(DefaultOnMessage));     svr->Init();     svr->Loop();     return 0; } 

Clinet.cc

#include <time.h> #include <unistd.h> #include <assert.h> #include "Socket.hpp" #include "Protocol.hpp"  int main() {     srand(time(NULL));     std::cout << "准备连接" << std::endl;     uint16_t serverport = 8000;     string serverip = "106.54.46.147";     struct sockaddr_in server;     bzero(&server, sizeof(server));     server.sin_family = AF_INET;     server.sin_addr.s_addr = inet_addr(serverip.c_str());     server.sin_port = htons(serverport);      const string opers = "+-*/%=^";      Sock socket;     socket.Socket();     bool r = socket.Connect(serverip, serverport);     if (!r)         return 1;      std::cout << "连接成功, 开始发送数据" << std::endl;     int cnt = 1;     while (cnt <= 5)     {        std::cout << "=============第" << cnt << "次测试...." << "============" << std::endl;        string package;        int x = rand() % 100;        int y = rand() % 100 + 1;        char op = opers[rand() % opers.size()];         Request req(x, y, op);        req.DebugPrint();         req.Serialize(&package);        package = Encode(package);        std::cout << package << std::endl;        write(socket._sockfd, package.c_str(), package.size());         char buff[1024];        int n = read(socket._sockfd, buff, sizeof(buff));         string inbuff_stream;        if (n > 0)        {            buff[n] = 0;            inbuff_stream += buff;            std::cout << inbuff_stream << std::endl;             string content;            bool r = Decode(inbuff_stream, &content);            assert(r);             Response resp;            r = resp.Deserialize(content);            assert(r);             resp.DebugPrint();        }          std::cout << "=======================================" << std::endl;         sleep(1);         cnt++;     }      socket.Close();     return 0;  }  

Comm.hpp

#pragma once #include <fcntl.h> #include <unistd.h> #include "Socket.hpp"  void SetNonBlock(int sock) {     int f1 = fcntl(sock, F_GETFL);     if (f1 < 0)     {         exit(NONBLOCKERR);     }     fcntl(sock, F_SETFL, f1 | O_NONBLOCK); } 

Epoll.hpp

#pragma once #include <sys/epoll.h> #include "nocopy.hpp" #include "log.hpp"  class Epoll : public nocopy {      static const int size = 128;     Log log;  public:     Epoll()     {         _epfd = epoll_create(size);         if (_epfd == -1)         {             log.logmessage(ERROR, "epoll create error:%s", strerror(errno));         }         else         {             log.logmessage(info, "epoll create success:%d", _epfd);         }     }      int EpollWait(struct epoll_event revents[], int num, int timeout)     {         int n = epoll_wait(_epfd, revents, num, timeout);         return n;     }      int EpollUpdate(int oper, int sock, uint32_t event)     {         int n = 0;         if (oper == EPOLL_CTL_DEL)         {             n = epoll_ctl(_epfd, oper, sock, nullptr);             if (n != 0)             {                 log.logmessage(ERROR, "epoll_ctl delete error");             }         }         else         {             struct epoll_event ev;             ev.events = event;             ev.data.fd = sock;              n = epoll_ctl(_epfd, oper, sock, &ev);             if (n != 0)             {                 log.logmessage(ERROR, "epoll_ctl add error");             }         }     }      ~Epoll()     {         if (_epfd >= 0)         {             close(_epfd);         }     }  private:     int _epfd;     int _timeout{3000}; }; 

nocopy.hpp

#pragma once  class nocopy { public:     nocopy(){}     nocopy(const nocopy &) = delete;     nocopy& operator=(const nocopy&) = delete; }; 

Socket.hpp

#pragma once #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <stdlib.h> #include <fcntl.h> #include "log.hpp"  enum {     SOCKERR = 1,     BINDERR,     LISERR,     NONBLOCKERR };  Log lg; const int backlog = 5; class Sock { public:     Sock()     {      }      void Socket()     {         _sockfd = socket(AF_INET, SOCK_STREAM, 0);         if (_sockfd < 0)         {             lg.logmessage(fatal, "socket error");             exit(SOCKERR);         }          int opt = 1;         setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); // 防止偶发性的服务器无法进行立即重启(tcp协议的时候再说)     }      void Bind(uint16_t port)     {         struct sockaddr_in local;         memset(&local, 0, sizeof(local));         local.sin_family = AF_INET;         local.sin_addr.s_addr = INADDR_ANY;         local.sin_port = htons(port);          int bret = bind(_sockfd, (const struct sockaddr*)&local, sizeof(local));         if (bret < 0)         {             lg.logmessage(fatal, "bind error");             exit(BINDERR);         }     }      void Listen()     {         int lret = listen(_sockfd, backlog);         if (lret < 0)         {             lg.logmessage(fatal, "listen error");             exit(LISERR);         }     }          int Accept(string* clientip, uint16_t* clientport)     {         sockaddr_in peer;         socklen_t len = sizeof(peer);         int newfd = accept(_sockfd, (sockaddr*)&peer, &len);         if (newfd < 0)         {             lg.logmessage(warning, "accept error");             return -1;         }          char ipstr[64];         inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr));         *clientip = ipstr;         *clientport = ntohs(peer.sin_port);          return newfd;     }      bool Connect(const string ip, const uint16_t port)     {         sockaddr_in peer;         memset(&peer, 0, sizeof(peer));         peer.sin_family = AF_INET;         inet_pton(AF_INET, ip.c_str(), &peer.sin_addr);         peer.sin_port = htons(port);          int cret = connect(_sockfd, (const struct sockaddr*)&peer, sizeof(peer));         if (cret == -1)         {             lg.logmessage(warning, "connect error");             return false;         }          return true;     }      void Close()     {         close(_sockfd);     }      int Fd()     {         return _sockfd;     }     ~Sock()     {      } public:     int _sockfd; };  

Protocol.hpp

#pragma once #include <string> #include <jsoncpp/json/json.h>  //#define MYSELF 1 //分隔符 const std::string black_sep = " "; const std::string protocol_sep = "\n";  //解决报文外部格式  //len\n正文\n     std::string Encode(std::string& message)     {         std::string package = std::to_string(message.size());         package += protocol_sep;         package += message;         package += protocol_sep;          return package;     }      //len\na + b\n     bool Decode(std::string& message, std::string* content)     {         std::size_t pos = message.find(protocol_sep);         if (pos == std::string::npos)         {             return false;         }          std::string len_str = message.substr(0, pos);         std::size_t len = std::stoi(len_str);         std::size_t total_len = len_str.size() + len + 2;          //检查长度         if (message.size() < total_len)         {             return false;         }          *content = message.substr(pos + 1, len);         //earse 移除报文         message.erase(0, total_len);          return true;     }  class Request { public:     Request(){}     Request(int a, int b, char oper)     {         _num1 = a;         _num2 = b;         _op = oper;     }     //a + b     bool Serialize(std::string* out)     { #ifdef MYSELF         //构建报文有效载荷         std::string str;         str += std::to_string(_num1);         str += black_sep;         str += _op;         str += black_sep;         str += std::to_string(_num2);          *out = str;         return true; #else         Json::Value root;         root["x"] = _num1;         root["y"] = _num2;         root["op"] = _op;         Json::FastWriter w;         *out = w.write(root);          return true; #endif     }      //a + b     bool Deserialize(std::string& in)     { #ifdef MYSELF         //a         std::size_t left = in.find(black_sep);         if (left == std::string::npos)         {             return false;                  }         std::string part_a = in.substr(0, left);         // b         std::size_t right = in.rfind(black_sep);         if (right == std::string::npos)         {             return false;              }         std::string part_b = in.substr(right + 1);         //+         if (left + 2 != right)         {             return false;         }          _op = in[left+1];         _num1 = std::stoi(part_a);         _num2 = std::stoi(part_b);          return true; #else               Json::Value root;         Json::Reader r;         r.parse(in, root);         _num1 = root["x"].asInt();         _num2 = root["y"].asInt();         _op = root["op"].asInt();          return true; #endif     }      void DebugPrint()     {         std::cout << "新请求构建完成:" << _num1 << _op << _num2 << std::endl;     }  public:     int _num1;     int _num2;     char _op; };  class Response { public:     Response(){}     Response(int res, int cod)     {         _result = res;         _code = cod;     }      //1000 0     bool Serialize(std::string* out)     { #ifdef MYSELF         string str = std::to_string(_result);         str += black_sep;         str += std::to_string(_code);         *out = str;          return true; #else          Json::Value root;         root["res"] = _result;         root["code"] = _code;         Json::FastWriter w;         *out = w.write(root);          return true; #endif     }      //1000 0     bool Deserialize(std::string& in)     { #ifdef MYSELF          std::size_t pos = in.find(black_sep);         if (pos  == std::string::npos)         {             return false;         }          std::string left = in.substr(0, pos);         std::string right = in.substr(pos + 1);          _result = std::stoi(left);         _code = std::stoi(right);          return true; #else          Json::Value root;         Json::Reader r;         r.parse(in, root);         _result = root["res"].asInt();         _code = root["code"].asInt();          return true; #endif     }      void DebugPrint()     {         std::cout << "结果响应完成,result:" << _result << ",code:" << _code << std::endl;     }  public:     int _result;     int _code;   //0可信,否则表明对应的错误 };  #define MySelf 1   

Calcluator.hpp

#pragma once #include "Protocol.hpp"  enum {     DIVZERO = 1,     MODZERO,     OTHER_OPER };  class Calculator { public:     Calculator()     {      }      Response CalculatorHelp(const Request& req)     {         Response res(0, 0);         switch (req._op)         {         case '+':             res._result = req._num1 + req._num2;             break;         case '-':             res._result = req._num1 - req._num2;             break;         case '*':             res._result = req._num1 * req._num2;             break;         case '/':             if (req._num2 == 0)             {                 res._code = DIVZERO;             }             else             {                 res._result = req._num1 / req._num2;             }             break;         case '%':             if (req._num2 == 0)             {                 res._code = MODZERO;             }             else             {                 res._result = req._num1 % req._num2;                 break;             }                      default:             res._code = OTHER_OPER;             break;         }          return res;     }      std::string Handler(std::string& package)     {         std::string content;         bool r = Decode(package, &content);         if (!r)         {             return "";         }         Request req;         r = req.Deserialize(content);         if (!r)         {             return "";         }         req.DebugPrint();         content = "";         Response res = CalculatorHelp(req);         res.DebugPrint();         res.Serialize(&content);         content = Encode(content); // len\n正文\n          return content;     }      ~Calculator()     {      } };   

Log.hpp

#pragma once #include <stdarg.h> #include <iostream> #include <stdio.h> #include <cstring> #include <time.h> #include <cerrno> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h>  using namespace std;  #define info 0 #define debug 1 #define warning 2 #define ERROR 3 #define fatal 4  #define screen 1 #define onefile 2 #define classfile 3  #define PATH "log.txt"  class Log { public:      Log(int style = screen)     {         printstyle = style;         dir = "log/";     }      void enable(int method)     {         printstyle = method;     }      const char *leveltostring(int level)     {         switch (level)         {         case 0:             return "info";             break;         case 1:             return "debug";             break;         case 2:             return "warning";             break;         case 3:             return "error";             break;         case 4:             return "fatal";             break;         default:             return "none";             break;         }     }      void printlog(int level, const string &logtxt)     {         switch (printstyle)         {         case screen:             cout << logtxt;             break;         case onefile:             printonefile(PATH, logtxt);             break;         case classfile:             printclassfile(level, logtxt);             break;         }     }      void logmessage(int level, const char *format, ...)     {         time_t t = time(0);         tm *ctime = localtime(&t);         char leftbuff[1024];         sprintf(leftbuff, "[%s]%d-%d-%d %d:%d:%d:", leveltostring(level), ctime->tm_year + 1900,                 ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec);          char rightbuff[1024];         va_list s;         va_start(s, format);         vsprintf(rightbuff, format, s);         va_end(s);         char logtext[2048];         sprintf(logtext, "%s %s\n", leftbuff, rightbuff);         //printf(logtext);         printlog(level, logtext);     }        void printonefile(const string& logname, const string& logtxt)     {         int fd = open(logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666);           if (fd < 0)         {             return;         }          write(fd, logtxt.c_str(), logtxt.size());         close(fd);     }      void printclassfile(int level, const string &logtxt)     {         //log.txt.info         string filename = dir + PATH;         filename += ".";         filename += leveltostring(level);         printonefile(filename, logtxt);     }      ~Log(){};  private:     int printstyle;     string dir; //分类日志,放入目录中 };  // int sum(int n, ...) // { //     int sum = 0; //     va_list s; //     va_start(s, n);  //     while (n) //     { //         sum = sum + va_arg(s, int); //         n--; //     }  //     return sum; // } 

CMakeLists.txt

cmake_minimum_required(VERSION 2.8.12.2) project(TpcServer) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") set(CMAKE_CXX_STANDARD_REQUIRED ON) # 确保必须使用指定的标准  add_executable(TpcServer TpcServer.cc) target_link_libraries(TpcServer jsoncpp)  add_executable(Client Client.cc) target_link_libraries(Client jsoncpp)  

5. 改进

listen分离

将TpcServer里的listen套接字提出来单独成一个类,将Accpeter获取连接的函数放到这个类,通过AddCpnnection函数加入关注,这个类就做listen的连接管理。这样做的好处是可以将listen连接也作为一个正常连接统一处理
在这里插入图片描述

后续的设计可以加入多线程,主线程维护vector,里面是获得的连接fd,线程之间通信,争取这个fd会话,每一个线程也是一个Ractor,也可以负载均衡式的分配连接。这样就可以加入多线程,同时每个连接的报文内容处理如果比较复杂,可以交由线程池来处理,只需要拿到结果
这种一个连接一个reacotr,叫one thread one loop

连接管理

对于一些不活跃的连接需要处理。使用一个定时器类,里面保存一个最小堆,存每个连接的超时时间,可以回指connection和tpcserver做更多开发。在主循环每次事件分配后可以做一些其他事情,就是查询超时连接,如果有可以走错误处理,关闭这个链接。同时,事件分配的等待时间可以设置为堆顶的超时时间

6. Reactor的理论

它是一个半同步半异步的模型,类似于打地鼠,监测哪个链接有事件就处理哪个。同步体现在事件的就绪是需要等,异步体现在回调函数,如果不想自己做,可以交由线程处理,比如报文的处理可以由线程来,直接取得结果。这种叫反应堆

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!