基于LT模式修改,并加入前面的应用层计算器,实现稍完整的服务器功能
1.修改tcp_socket.hpp,新增非阻塞读和非阻塞写接口
2.对于accept返回的new_sock加上EPOLLET这样的选项
注意:此代码暂时未考虑listen_sock ET的情况,如果将listen_sock设为ET,则需要非阻塞轮询的方式accept,否则会导致同一时刻大量的客户端同时连接的情况,只能accept一次的问题
目录
- 整体结构
- 流程
- 运行示例
- 全
- 改进
- Reactor的理论
1. 整体结构
TpcServer服务器类,维护_listen套接字,用来获取连接和监听读写事件,map用套接字做键值,rev数组作为epoll_wait的输出参数
每一个连接都是一个session结构,包含读写缓冲区,ip和端口方便调试,读写错误的回调处理函数,指针回指服务器
nocopy类用来给某些类提供无法拷贝的功能
socket类,提供套接字的创建,监听等功能
协议类和计算类为前面章节的内容,用来对收到的数据处理返回结果
Epoll类提供epoll多路转接功能
comm类单独拎出来的设置非阻塞功能,因为很多地方都要用到
2. 流程
服务端
TpcServer.hpp
继承enable_shared_from_this类可以解决智能指针不能用this构造的问题,使用智能指针对象需要用shared_from_this()功能来获取
定义两个函数模板,构造时传入报文处理回调
Init函数初始化套接字,设置非阻塞,AddConnection函数添加listen套接字到关注事件中,绑定事件分配函数Accepter
AddConnection函数参数传入要设置的套接字,事件,三个函数,ip和端口用来调试
Connection
构造时传入sock初始化成员变量
作为连接管理类,需要管理每个连接的发送和接收缓冲区,所以提供存入缓冲区数据的和返回缓冲区内容的功能,再提供初始化自己成员函数的功能
TpcServer.hpp
继续说明AddConnection函数,这个函数的作用为每个连接初始化session,添加关注事件和管理,后面每个新链接都要用这个函数
构造一个Connection的临时对象
设置成员TpcServer和回调函数,ip和port
添加对象到map结构里,添加listen的事件,listen关注读
Accpeter连接管理器函数,参数是事件就绪的会话
不一定只有一个连接到来,所以需要循环读取。用accept获取就绪连接,设置非阻塞后,调用AddConnection函数加入会话管理,作为连接会话三个回调函数分别是读写错误
当错误码是EWOULDBLOCK的时候,说明已经获取完,退出循环,EINTR表示系统调用被信号中断,所以继续读取,其他情况退出
Recver数据读取函数,用来提供读取数据添加到Connection缓冲区的功能
首先判断了连接的生命周期,如果消亡就退出。通过lock获取一个shared指针对象。
因为是ET模式,所以一次性需要读完所有数据,用recv函数,返回值n大于0表示读取到数据,添加到接收缓冲区中,等于0对方客户度退出,调用错误处理函数,小于0和上面一样判断是否读完,不是就走错误处理
最后将读取到的数据交给处理函数,所有报文情况都由它处理
Sender函数,获取连接的发送缓冲区发送,一次性将数据都发送,返回值大于0发送成功,将发送了的内容删除,判断如果发送缓冲区为空就退出。0表示没发送任何内容也退出,其他情况判断是否走错误处理
epoll/select/poll,因为写事件经常都是就绪的,发送缓冲区基本会有空间,如果设置了写关心,每次都会就绪,经常返回浪费cpu资源。所以对于读,需要设置常关心,写,按需求设置
当发送完后,检查缓冲区不为空,没发送完就对写事件开启关心,发送完将事件关闭
EnableEvent函数,设置套接字的读和写,根据传入的参数,判断有没有读和写,通过三木运算符,有就加入event,最后修改套接字的事件
Excepter函数,错误处理函数,遇到错误就是关闭这个链接。如果连接在读和写时发生错误,用这个函数。取消这个套接字的所有关心,关闭文件,map中移除
IsSafeConnection函数,检查链接是否合法,遍历map,是否存在
主逻辑
Loop函数,服务器的运行循环,传入超时时间,不断调用事件分配函数和打印连接函数
PrintConnection函数,打印出map中所有的fd,用来调试
Dispatcher函数,timeout等待时间是上一个函数传入。不断wait监听revs数组添加了的套接字,n会返回就绪的个数,取到套接字和事件,将异常转为读写统一处理。如果是读事件就绪,并且连接合法,就调用读取函数,写事件调用写函数
TpcServer.cc
全局的计算类对象,DefaultOnMessage函数是默认的报文处理函数,对报文的完整性判断,计算返回结果并发送
调用计算类的函数,判断返回的字符串是否为空,为空说明报文不完整或有错误。如果处理完成,将结果加入到发送缓冲区,用tcpserver对象发送
main函数创建svr对象,传入报文处理函数,启动服务器
客户端
客户端链接服务器,生成5个随机报文发送接收结果打印
是前面章节的网络计算器
网络计算器
3. 运行示例
4. 全
TcpServer.hpp
#pragma ocne #include <iostream> #include <memory> #include <functional> #include <unordered_map> #include "Comm.hpp" #include "log.hpp" #include "Epoll.hpp" #include "Socket.hpp" class Connection; class TpcServer; using func_t = std::function<void(std::weak_ptr<Connection>)>; // 用户缓冲区处理函数模板 using except_func_t = std::function<void(std::weak_ptr<Connection>)>; static const uint16_t port = 8000; static const int g_buff_size = 128; // 设置et uint32_t EVENT_IN = (EPOLLIN | EPOLLET); uint32_t EVENt_OUT = (EPOLLOUT | EPOLLET); class Connection { public: Connection(int sock) { _sock = sock; } ~Connection() { } void AppendInbuff(const std::string& message) { _inbuff += message; } void AppendOutbuff(const std::string& message) { _outbuff += message; } int Fd() { return _sock; } std::string& Inbuffer() // for debug { return _inbuff; } std::string& Outbuffer() // for debug { return _outbuff; } void SetHandler(func_t recv_cb, func_t send_cb, except_func_t except_cb) { _recv_cb = recv_cb; _send_cb = send_cb; _except_cb = except_cb; } void SetWeakPtr(std::weak_ptr<TpcServer> tcp_setver_ptr) { _tcp_server_ptr = tcp_setver_ptr; } private: int _sock; std::string _inbuff; // string不能二进制, 需要vector std::string _outbuff; public: func_t _recv_cb; func_t _send_cb; except_func_t _except_cb; std::weak_ptr<TpcServer> _tcp_server_ptr; // 回指向服务器 std::string _ip; uint16_t _port; }; class TpcServer :public std::enable_shared_from_this<TpcServer>, public nocopy { static const int num = 64; public: TpcServer(func_t OnMessage) : _listensocket_ptr(new Sock()) , _epoll_ptr(new Epoll()) , _OnMessage(OnMessage) , _quit(true) { } void AddConnection(int sock, uint32_t event, func_t recv_cb, func_t send_cb, \ except_func_t except_cb, const std::string& ip = "0.0.0.0", uint16_t port = 0) { // 1. 给sock创建connection对象, 将lstensock添加到connection中 // 同时,listeinsock和connection放入_connections std::shared_ptr<Connection> new_con(new Connection(sock)); new_con->SetWeakPtr(shared_from_this()); // 返回当前对象的shared_ptr new_con->SetHandler(recv_cb, send_cb, except_cb); new_con->_ip = ip; new_con->_port = port; // 2. 添加到map _connections.insert(std::make_pair(sock, new_con)); // 3. 添加对应事件 _epoll_ptr->EpollUpdate(EPOLL_CTL_ADD, sock, event); } void Init() { _listensocket_ptr->Socket(); int opt = 1; setsockopt(_listensocket_ptr->Fd(), SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); // et模式需要非阻塞 SetNonBlock(_listensocket_ptr->Fd()); lg.logmessage(info, "listensock create success:%d", _listensocket_ptr->Fd()); _listensocket_ptr->Bind(port); _listensocket_ptr->Listen(); // 关联connection AddConnection(_listensocket_ptr->Fd(), EVENT_IN, std::bind(&TpcServer::Accepter, this, std::placeholders::_1), nullptr, nullptr); } void Accepter(std::weak_ptr<Connection> con) { // 获取强引用对象, 检查是否销毁 auto connection = con.lock(); // 获取新链接 while (true) { struct sockaddr_in peer; socklen_t len = sizeof(peer); // ::调用原生函数 int sock = ::accept(connection->Fd(), (struct sockaddr *)&peer, &len); if (sock > 0) { char ipbuf[128]; inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf)); uint16_t port = ntohs(peer.sin_port); lg.logmessage(info, "get a new clinet[%s:%d]:%d", ipbuf, port, sock); // 设置非阻塞 SetNonBlock(sock); // 添加连接事件 AddConnection(sock, EVENT_IN,\ std::bind(&TpcServer::Recver, this, std::placeholders::_1),\ std::bind(&TpcServer::Sender, this, std::placeholders::_1),\ std::bind(&TpcServer::Excepter, this, std::placeholders::_1),\ ipbuf, port); } else { if (errno == EWOULDBLOCK) { break; } else if (errno == EINTR) // 信号中断 { continue; } else { break; } } } } void Recver(std::weak_ptr<Connection> con) { if (con.expired()) return; auto connec = con.lock(); int sock = connec->Fd(); while (true) { char buff[g_buff_size]; memset(buff, 0, sizeof(buff)); ssize_t n = recv(sock, buff, sizeof(buff) - 1, 0); // 非阻塞读取 if (n > 0) { connec->AppendInbuff(buff); } else if (n == 0) // 错误处理 { lg.logmessage(info, "sockfd:%d, client[%s:%d] quit", sock, connec->_ip.c_str(), connec->_port); connec->_except_cb(connec); return; } else { if (errno == EWOULDBLOCK) // 读完 { break; } else if (errno == EINTR) { continue; } else { lg.logmessage(warning, "sockfd:%d, client[%s:%d] recv error", sock, connec->_ip.c_str(), connec->_port); connec->_except_cb(connec); return; } } } // 数据有了, 不一定安全 1.检测 2.如果有完整报文,处理 _OnMessage(connec); } void Sender(std::weak_ptr<Connection> con) { if (con.expired()) return; auto connection = con.lock(); auto &outbuff = connection->Outbuffer(); while (true) { ssize_t n = send(connection->Fd(), outbuff.c_str(), outbuff.size(), 0); if (n > 0) { outbuff.erase(0, n); if (outbuff.empty()) { break; } } else if (n == 0) { return; // 没有发 } else { if (errno == EWOULDBLOCK) { break; } else if (errno == EINTR) { continue; } else { lg.logmessage(info, "sockfd:%d, client[%s:%d] recv error", connection->Fd(), connection->_ip.c_str(), connection->_port); connection->_except_cb(connection); return; } } } // 没发完, 开启对写事件关心 if (!outbuff.empty()) { EnableEvent(connection->Fd(), true, true); } else { EnableEvent(connection->Fd(), true, false); } } void EnableEvent(int sock, bool readable, bool writeable) { uint32_t evnet = 0; evnet |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET); _epoll_ptr->EpollUpdate(EPOLL_CTL_MOD, sock, evnet); } void Excepter(std::weak_ptr<Connection> con) { if (con.expired()) return; auto connection = con.lock(); int fd = connection->Fd(); lg.logmessage(warning, "Excepter handler:%d, client[%s:%d] excepter error", connection->Fd(), connection->_ip.c_str(), connection->_port); // 1. 移除关心 _epoll_ptr->EpollUpdate(EPOLL_CTL_DEL, fd, 0); // 2. 关闭文件 lg.logmessage(debug, "close %d...", fd); close(fd); // 3. unordered_map中移除 lg.logmessage(debug, "remove connection %d", fd); _connections.erase(fd); } bool IsSafeConnection(int sock) { auto it = _connections.find(sock); if (it == _connections.end()) { return false; } else { return true; } } void Dispatcher(int timeout) { int n = _epoll_ptr->EpollWait(_revs, num, timeout); for (int i = 0; i < n; i++) { int sock = _revs[i].data.fd; uint32_t event = _revs[i].events; // 统一将异常转换为读写问题 if (event & EPOLLERR) { event |= (EPOLLIN | EPOLLOUT); } if (event & EPOLLHUP) { event |= (EPOLLIN | EPOLLOUT); } if ((event & EPOLLIN) && IsSafeConnection(sock)) { if (_connections[sock]->_recv_cb) { _connections[sock]->_recv_cb(_connections[sock]); } } if ((event & EPOLLOUT) && IsSafeConnection(sock)) { if (_connections[sock]->_send_cb) { _connections[sock]->_send_cb(_connections[sock]); } } } } void Loop() { _quit = false; while (!_quit) { Dispatcher(3000); PrintConnection(); } _quit = true; } void PrintConnection() { std::cout << "_connection list: "; for (auto &con: _connections) { std::cout << con.second->Fd() << ","; } std::cout << std::endl; } private: std::shared_ptr<Sock> _listensocket_ptr; // 监听socket, 可以移到外部 std::shared_ptr<Epoll> _epoll_ptr; // 内核 std::unordered_map<int, std::shared_ptr<Connection>> _connections; struct epoll_event _revs[num]; func_t _OnMessage; bool _quit; };
TcpServer.cc
#include <memory> #include "TpcServer.hpp" #include "Calculator.hpp" Calculator calculator; void DefaultOnMessage(std::weak_ptr<Connection> con) { if(con.expired()) return; auto connection_ptr = con.lock(); std::cout << connection_ptr->Inbuffer() << std::endl; std::string response_str = calculator.Handler(connection_ptr->Inbuffer()); // 业务逻辑简单,如果复杂,需要拿到结果单独线程处理 if (response_str.empty()) { return; } lg.logmessage(debug, "%s", response_str.c_str()); connection_ptr->AppendOutbuff(response_str); //connection_ptr->_send_cb(connection_ptr); auto tcpserver = connection_ptr->_tcp_server_ptr.lock(); tcpserver->Sender(connection_ptr); } int main() { std::shared_ptr<TpcServer> svr(new TpcServer(DefaultOnMessage)); svr->Init(); svr->Loop(); return 0; }
Clinet.cc
#include <time.h> #include <unistd.h> #include <assert.h> #include "Socket.hpp" #include "Protocol.hpp" int main() { srand(time(NULL)); std::cout << "准备连接" << std::endl; uint16_t serverport = 8000; string serverip = "106.54.46.147"; struct sockaddr_in server; bzero(&server, sizeof(server)); server.sin_family = AF_INET; server.sin_addr.s_addr = inet_addr(serverip.c_str()); server.sin_port = htons(serverport); const string opers = "+-*/%=^"; Sock socket; socket.Socket(); bool r = socket.Connect(serverip, serverport); if (!r) return 1; std::cout << "连接成功, 开始发送数据" << std::endl; int cnt = 1; while (cnt <= 5) { std::cout << "=============第" << cnt << "次测试...." << "============" << std::endl; string package; int x = rand() % 100; int y = rand() % 100 + 1; char op = opers[rand() % opers.size()]; Request req(x, y, op); req.DebugPrint(); req.Serialize(&package); package = Encode(package); std::cout << package << std::endl; write(socket._sockfd, package.c_str(), package.size()); char buff[1024]; int n = read(socket._sockfd, buff, sizeof(buff)); string inbuff_stream; if (n > 0) { buff[n] = 0; inbuff_stream += buff; std::cout << inbuff_stream << std::endl; string content; bool r = Decode(inbuff_stream, &content); assert(r); Response resp; r = resp.Deserialize(content); assert(r); resp.DebugPrint(); } std::cout << "=======================================" << std::endl; sleep(1); cnt++; } socket.Close(); return 0; }
Comm.hpp
#pragma once #include <fcntl.h> #include <unistd.h> #include "Socket.hpp" void SetNonBlock(int sock) { int f1 = fcntl(sock, F_GETFL); if (f1 < 0) { exit(NONBLOCKERR); } fcntl(sock, F_SETFL, f1 | O_NONBLOCK); }
Epoll.hpp
#pragma once #include <sys/epoll.h> #include "nocopy.hpp" #include "log.hpp" class Epoll : public nocopy { static const int size = 128; Log log; public: Epoll() { _epfd = epoll_create(size); if (_epfd == -1) { log.logmessage(ERROR, "epoll create error:%s", strerror(errno)); } else { log.logmessage(info, "epoll create success:%d", _epfd); } } int EpollWait(struct epoll_event revents[], int num, int timeout) { int n = epoll_wait(_epfd, revents, num, timeout); return n; } int EpollUpdate(int oper, int sock, uint32_t event) { int n = 0; if (oper == EPOLL_CTL_DEL) { n = epoll_ctl(_epfd, oper, sock, nullptr); if (n != 0) { log.logmessage(ERROR, "epoll_ctl delete error"); } } else { struct epoll_event ev; ev.events = event; ev.data.fd = sock; n = epoll_ctl(_epfd, oper, sock, &ev); if (n != 0) { log.logmessage(ERROR, "epoll_ctl add error"); } } } ~Epoll() { if (_epfd >= 0) { close(_epfd); } } private: int _epfd; int _timeout{3000}; };
nocopy.hpp
#pragma once class nocopy { public: nocopy(){} nocopy(const nocopy &) = delete; nocopy& operator=(const nocopy&) = delete; };
Socket.hpp
#pragma once #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <unistd.h> #include <stdlib.h> #include <fcntl.h> #include "log.hpp" enum { SOCKERR = 1, BINDERR, LISERR, NONBLOCKERR }; Log lg; const int backlog = 5; class Sock { public: Sock() { } void Socket() { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { lg.logmessage(fatal, "socket error"); exit(SOCKERR); } int opt = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); // 防止偶发性的服务器无法进行立即重启(tcp协议的时候再说) } void Bind(uint16_t port) { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_addr.s_addr = INADDR_ANY; local.sin_port = htons(port); int bret = bind(_sockfd, (const struct sockaddr*)&local, sizeof(local)); if (bret < 0) { lg.logmessage(fatal, "bind error"); exit(BINDERR); } } void Listen() { int lret = listen(_sockfd, backlog); if (lret < 0) { lg.logmessage(fatal, "listen error"); exit(LISERR); } } int Accept(string* clientip, uint16_t* clientport) { sockaddr_in peer; socklen_t len = sizeof(peer); int newfd = accept(_sockfd, (sockaddr*)&peer, &len); if (newfd < 0) { lg.logmessage(warning, "accept error"); return -1; } char ipstr[64]; inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr)); *clientip = ipstr; *clientport = ntohs(peer.sin_port); return newfd; } bool Connect(const string ip, const uint16_t port) { sockaddr_in peer; memset(&peer, 0, sizeof(peer)); peer.sin_family = AF_INET; inet_pton(AF_INET, ip.c_str(), &peer.sin_addr); peer.sin_port = htons(port); int cret = connect(_sockfd, (const struct sockaddr*)&peer, sizeof(peer)); if (cret == -1) { lg.logmessage(warning, "connect error"); return false; } return true; } void Close() { close(_sockfd); } int Fd() { return _sockfd; } ~Sock() { } public: int _sockfd; };
Protocol.hpp
#pragma once #include <string> #include <jsoncpp/json/json.h> //#define MYSELF 1 //分隔符 const std::string black_sep = " "; const std::string protocol_sep = "\n"; //解决报文外部格式 //len\n正文\n std::string Encode(std::string& message) { std::string package = std::to_string(message.size()); package += protocol_sep; package += message; package += protocol_sep; return package; } //len\na + b\n bool Decode(std::string& message, std::string* content) { std::size_t pos = message.find(protocol_sep); if (pos == std::string::npos) { return false; } std::string len_str = message.substr(0, pos); std::size_t len = std::stoi(len_str); std::size_t total_len = len_str.size() + len + 2; //检查长度 if (message.size() < total_len) { return false; } *content = message.substr(pos + 1, len); //earse 移除报文 message.erase(0, total_len); return true; } class Request { public: Request(){} Request(int a, int b, char oper) { _num1 = a; _num2 = b; _op = oper; } //a + b bool Serialize(std::string* out) { #ifdef MYSELF //构建报文有效载荷 std::string str; str += std::to_string(_num1); str += black_sep; str += _op; str += black_sep; str += std::to_string(_num2); *out = str; return true; #else Json::Value root; root["x"] = _num1; root["y"] = _num2; root["op"] = _op; Json::FastWriter w; *out = w.write(root); return true; #endif } //a + b bool Deserialize(std::string& in) { #ifdef MYSELF //a std::size_t left = in.find(black_sep); if (left == std::string::npos) { return false; } std::string part_a = in.substr(0, left); // b std::size_t right = in.rfind(black_sep); if (right == std::string::npos) { return false; } std::string part_b = in.substr(right + 1); //+ if (left + 2 != right) { return false; } _op = in[left+1]; _num1 = std::stoi(part_a); _num2 = std::stoi(part_b); return true; #else Json::Value root; Json::Reader r; r.parse(in, root); _num1 = root["x"].asInt(); _num2 = root["y"].asInt(); _op = root["op"].asInt(); return true; #endif } void DebugPrint() { std::cout << "新请求构建完成:" << _num1 << _op << _num2 << std::endl; } public: int _num1; int _num2; char _op; }; class Response { public: Response(){} Response(int res, int cod) { _result = res; _code = cod; } //1000 0 bool Serialize(std::string* out) { #ifdef MYSELF string str = std::to_string(_result); str += black_sep; str += std::to_string(_code); *out = str; return true; #else Json::Value root; root["res"] = _result; root["code"] = _code; Json::FastWriter w; *out = w.write(root); return true; #endif } //1000 0 bool Deserialize(std::string& in) { #ifdef MYSELF std::size_t pos = in.find(black_sep); if (pos == std::string::npos) { return false; } std::string left = in.substr(0, pos); std::string right = in.substr(pos + 1); _result = std::stoi(left); _code = std::stoi(right); return true; #else Json::Value root; Json::Reader r; r.parse(in, root); _result = root["res"].asInt(); _code = root["code"].asInt(); return true; #endif } void DebugPrint() { std::cout << "结果响应完成,result:" << _result << ",code:" << _code << std::endl; } public: int _result; int _code; //0可信,否则表明对应的错误 }; #define MySelf 1
Calcluator.hpp
#pragma once #include "Protocol.hpp" enum { DIVZERO = 1, MODZERO, OTHER_OPER }; class Calculator { public: Calculator() { } Response CalculatorHelp(const Request& req) { Response res(0, 0); switch (req._op) { case '+': res._result = req._num1 + req._num2; break; case '-': res._result = req._num1 - req._num2; break; case '*': res._result = req._num1 * req._num2; break; case '/': if (req._num2 == 0) { res._code = DIVZERO; } else { res._result = req._num1 / req._num2; } break; case '%': if (req._num2 == 0) { res._code = MODZERO; } else { res._result = req._num1 % req._num2; break; } default: res._code = OTHER_OPER; break; } return res; } std::string Handler(std::string& package) { std::string content; bool r = Decode(package, &content); if (!r) { return ""; } Request req; r = req.Deserialize(content); if (!r) { return ""; } req.DebugPrint(); content = ""; Response res = CalculatorHelp(req); res.DebugPrint(); res.Serialize(&content); content = Encode(content); // len\n正文\n return content; } ~Calculator() { } };
Log.hpp
#pragma once #include <stdarg.h> #include <iostream> #include <stdio.h> #include <cstring> #include <time.h> #include <cerrno> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> using namespace std; #define info 0 #define debug 1 #define warning 2 #define ERROR 3 #define fatal 4 #define screen 1 #define onefile 2 #define classfile 3 #define PATH "log.txt" class Log { public: Log(int style = screen) { printstyle = style; dir = "log/"; } void enable(int method) { printstyle = method; } const char *leveltostring(int level) { switch (level) { case 0: return "info"; break; case 1: return "debug"; break; case 2: return "warning"; break; case 3: return "error"; break; case 4: return "fatal"; break; default: return "none"; break; } } void printlog(int level, const string &logtxt) { switch (printstyle) { case screen: cout << logtxt; break; case onefile: printonefile(PATH, logtxt); break; case classfile: printclassfile(level, logtxt); break; } } void logmessage(int level, const char *format, ...) { time_t t = time(0); tm *ctime = localtime(&t); char leftbuff[1024]; sprintf(leftbuff, "[%s]%d-%d-%d %d:%d:%d:", leveltostring(level), ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec); char rightbuff[1024]; va_list s; va_start(s, format); vsprintf(rightbuff, format, s); va_end(s); char logtext[2048]; sprintf(logtext, "%s %s\n", leftbuff, rightbuff); //printf(logtext); printlog(level, logtext); } void printonefile(const string& logname, const string& logtxt) { int fd = open(logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); if (fd < 0) { return; } write(fd, logtxt.c_str(), logtxt.size()); close(fd); } void printclassfile(int level, const string &logtxt) { //log.txt.info string filename = dir + PATH; filename += "."; filename += leveltostring(level); printonefile(filename, logtxt); } ~Log(){}; private: int printstyle; string dir; //分类日志,放入目录中 }; // int sum(int n, ...) // { // int sum = 0; // va_list s; // va_start(s, n); // while (n) // { // sum = sum + va_arg(s, int); // n--; // } // return sum; // }
CMakeLists.txt
cmake_minimum_required(VERSION 2.8.12.2) project(TpcServer) set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -std=c++11") set(CMAKE_CXX_STANDARD_REQUIRED ON) # 确保必须使用指定的标准 add_executable(TpcServer TpcServer.cc) target_link_libraries(TpcServer jsoncpp) add_executable(Client Client.cc) target_link_libraries(Client jsoncpp)
5. 改进
listen分离
将TpcServer里的listen套接字提出来单独成一个类,将Accpeter获取连接的函数放到这个类,通过AddCpnnection函数加入关注,这个类就做listen的连接管理。这样做的好处是可以将listen连接也作为一个正常连接统一处理
后续的设计可以加入多线程,主线程维护vector,里面是获得的连接fd,线程之间通信,争取这个fd会话,每一个线程也是一个Ractor,也可以负载均衡式的分配连接。这样就可以加入多线程,同时每个连接的报文内容处理如果比较复杂,可以交由线程池来处理,只需要拿到结果
这种一个连接一个reacotr,叫one thread one loop
连接管理
对于一些不活跃的连接需要处理。使用一个定时器类,里面保存一个最小堆,存每个连接的超时时间,可以回指connection和tpcserver做更多开发。在主循环每次事件分配后可以做一些其他事情,就是查询超时连接,如果有可以走错误处理,关闭这个链接。同时,事件分配的等待时间可以设置为堆顶的超时时间
6. Reactor的理论
它是一个半同步半异步的模型,类似于打地鼠,监测哪个链接有事件就处理哪个。同步体现在事件的就绪是需要等,异步体现在回调函数,如果不想自己做,可以交由线程处理,比如报文的处理可以由线程来,直接取得结果。这种叫反应堆