Reactor模式下的TCP服务器(http项目改进)

avatar
作者
猴君
阅读量:1

目录

事件多路分离器

事件分发器

事件处理器

事件处理器绑定实现方法

定义回调类型

设置连接的回调函数

添加连接到服务器的连接管理 

绑定回调函数

 Epoll检测事件并调用Dispatcher方法

异常处理 

读取处理 

TCPServer整体实现代码

实现逻辑分析

初始化 

Init 函数

AddConnnection方法

Accepter

新连接到来时,EPoll通知机制调用accept方法分析

 事件循环

事件处理

补充知识点

EPOLLIN和EPOLLET使用 

fcnt1 文件描述符的操作函数

结合使用具体代码 

Init函数 

Accepter 函数

std::bind

 ​编辑

绑定全局函数 

绑定成员函数

绑定成员变量

绑定函数对象

sender函数

​编辑

EnableEvent函数


事件多路分离器

通过epoll实现,负责监听特定文件描述符中的特定事件

class Epoller : public nocopy {     static const int size = 128;  public:     Epoller()     {         _epfd = epoll_create(size);         if (_epfd == -1)         {             lg(Error, "epoll_create error: %s", strerror(errno));         }         else         {             lg(Info, "epoll_create success: %d", _epfd);         }     }     int EpollerWait(struct epoll_event revents[], int num, int timeout)     {         int n = epoll_wait(_epfd, revents, num, timeout);         return n;     }     int EpllerUpdate(int oper, int sock, uint32_t event)     {         int n = 0;         if (oper == EPOLL_CTL_DEL)         {             n = epoll_ctl(_epfd, oper, sock, nullptr);             if (n != 0)             {                 lg(Error, "epoll_ctl delete error! sockfd: %d", sock);             }         }         else         {             // EPOLL_CTL_MOD || EPOLL_CTL_ADD             struct epoll_event ev;             ev.events = event;             ev.data.fd = sock;               n = epoll_ctl(_epfd, oper, sock, &ev);             if (n != 0)             {                 lg(Error, "epoll_ctl error!");             }         }         return n;     }     ~Epoller()     {         if (_epfd >= 0)             close(_epfd);     }  private:     int _epfd;     int _timeout{3000}; };

事件分发器

TcpServer类中Dispatcher作为reactor的事件分发器,主要作用是从epoll中获取就绪事件并调取相应的回调函数 

void Dispatcher(int timeout) {     int n = _epoller_ptr->EpollerWait(revs, num, timeout);     for (int i = 0; i < n; i++) {         uint32_t events = revs[i].events;//哪些事件就绪         int sock = revs[i].data.fd;//哪一个文件描述符          if ((events & EPOLLIN) && IsConnectionSafe(sock)) {             if (_connections[sock]->_recv_cb)                 _connections[sock]->_recv_cb(_connections[sock]);         }         if ((events & EPOLLOUT) && IsConnectionSafe(sock)) {             if (_connections[sock]->_send_cb)                 _connections[sock]->_send_cb(_connections[sock]);         }     } } 

事件处理器

 Connection类中定义回调函数,同时在TcpServer类中进行绑定;回调函数(Recver \ Sender \ Excepter)分别处理读、写和异常事件

class Connection { public:     void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb) {         _recv_cb = recv_cb;         _send_cb = send_cb;         _except_cb = except_cb;     } };  void TcpServer::Recver(std::weak_ptr<Connection> conn) {     auto connection = conn.lock();     if (!connection) return;     // 处理读事件     int sock = connection->SockFd();     while (true) {         char buffer[g_buffer_size];         ssize_t n = recv(sock, buffer, sizeof(buffer), 0);         if (n > 0) {             connection->AppendInBuffer(buffer);         } else if (n == 0) {             connection->_except_cb(connection);             return;         } else {             if (errno == EWOULDBLOCK) break;             else {                 connection->_except_cb(connection);                 return;             }         }     }     _OnMessage(connection); }  void TcpServer::Sender(std::weak_ptr<Connection> conn) {     auto connection = conn.lock();     if (!connection) return;     auto &outbuffer = connection->OutBuffer();     while (true) {         ssize_t n = send(connection->SockFd(), outbuffer.data(), outbuffer.size(), 0);         if (n > 0) {             outbuffer.erase(0, n);             if (outbuffer.empty()) break;         } else {             if (errno == EWOULDBLOCK) break;             else {                 connection->_except_cb(connection);                 return;             }         }     }     if (!outbuffer.empty()) {         EnableEvent(connection->SockFd(), true, true);     } else {         EnableEvent(connection->SockFd(), true, false);     } }  void TcpServer::Excepter(std::weak_ptr<Connection> conn) {     auto connection = conn.lock();     if (!connection) return;     int fd = connection->SockFd();     _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);     close(fd);     _connections.erase(fd); } 

事件处理器绑定实现方法

定义回调类型

func_t 用于处理常规事件;except_func用于处理异常事件

 

设置连接的回调函数

 描述连接的类connection中,存储着处理三种事件的函数,同时设置拥有设置回调函数的接口

 

添加连接到服务器的连接管理 

创建新的连接,同时绑定该连接的回调函数,并将其加入到Epoll中进行管理

  

绑定回调函数

 服务器初始化或者处理连接的时候,使用std库中的bind来创建特定的成员函数回调。通过这些回调来处理网络事件(接收数据、发送数据、处理异常)

std::bind:C++标准库的工具,主要是用于创建函数对象,将函数与其参数绑定在一起。通过将成员函数和对象实例进行绑定,并创建一个可调用的对象。

 

 Epoll检测事件并调用Dispatcher方法

Dispatcher根据事件类型调用相应的回调函数。Loop不断循环调用Dispatcher方法,处理所有已经就绪的事件。Dispatcher方法从Epoll中获取事件,并调用相应的回调函数处理事件。

 

 

异常处理 

  • 检查链接是否有效,如果指针为空则直接返回
  • 获取文件描述符,并记录文件描述以及对应客户端信息
  • 使用EpollerUpdate删除该文件描述符同时停止对该事件的关心
  • 关闭该文件描述符,同时将其从管理连接的map中移除
 void Excepter(std::weak_ptr<Connection> connection)     {         if(connection.expired()) return;         auto conn = connection.lock();          int fd = conn->SockFd();         lg(Warning, "Excepter hander sockfd: %d, client info %s:%d excepter handler",            conn->SockFd(), conn->_ip.c_str(), conn->_port);         // 1. 移除对特定fd的关心         // EnableEvent(connection->SockFd(), false, false);         _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0);         // 2. 关闭异常的文件描述符         lg(Debug, "close %d done...\n", fd);         close(fd);         // 3. 从unordered_map中移除         lg(Debug, "remove %d from _connections...\n", fd);         _connections.erase(fd);     }

读取处理 

  • 获取该连接的指针和文件描述符
  • 将sock中的数据非阻塞读取到缓冲区中
  • 判断是否读取成功
    • 读取成功,接收数据,追加到连接的输入缓冲区
    • 关闭连接,记录到日志
    • 错误处理,接收错误、信号中断错误、其他错误

    void Recver(std::weak_ptr<Connection> conn)     {         if(conn.expired()) return;         auto connection = conn.lock();         // std::cout << "haha, got you!!!!, sockfd: " << connection->SockFd() << std::endl;         int sock = connection->SockFd();         while (true)         {             char buffer[g_buffer_size];             memset(buffer, 0, sizeof(buffer));             ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取             if (n > 0)             {                 connection->AppendInBuffer(buffer);             }             else if (n == 0)             {                 lg(Info, "sockfd: %d, client info %s:%d quit...", sock, connection->_ip.c_str(), connection->_port);                 connection->_except_cb(connection);                 return;             }             else             {                 if (errno == EWOULDBLOCK)                     break;                 else if (errno == EINTR)                     continue;                 else                 {                     lg(Warning, "sockfd: %d, client info %s:%d recv error...", sock, connection->_ip.c_str(), connection->_port);                     connection->_except_cb(connection);                     return;                 }             }         }         // 交给上层检测数据接收的是否完整         _OnMessage(connection); // 你读到的sock所有的数据connection     

TCPServer整体实现代码

实现逻辑分析

初始化 

  • Init 函数

    • 创建监听套接字
    • 设置非阻塞模式
    • 绑定该服务器的端口
    • 开始监听
    • 添加到Epoll事件管理器
  • AddConnnection方法

    • 将套接字文件描述符添加到epoll实例中,监听EVENT_IN事件
    • 将TcpServer::Accepter方法绑定回调函数(当有新连接到来的时候,Epoll会调用Accepter方法)

 

 

 

  • Accepter

    •  服务器接收到客户端新连接并进行处理
    • while循环
      • 循环调用accept接受新的连接,其调用返回一个新的套接字用于客户端和服务端进行通信
      • 处理新连接
        • peerport:从网络字节序转换为主机字节序,获取客户端端口号。
        • inet_ntop:将客户端 IP 地址转换为字符串格式,存储在 ipbuf 中。
        • lg:记录日志,输出新连接的客户端信息。
        • 设置非阻塞模式,提高IO处理效率
        • 添加到EPoll
        • 使用std::bind绑定TcpServer类方法中的读写和异常方法,确保在处理事件的时候能够访问到当前对象的成员函数

 

void Accepter(std::weak_ptr<Connection> conn) {     auto connection = conn.lock();     while (true)     {         struct sockaddr_in peer;         socklen_t len = sizeof(peer);         int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len);         if (sock > 0)         {             uint16_t peerport = ntohs(peer.sin_port);             char ipbuf[128];             inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));             lg(Debug, "get a new client, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, sock);              SetNonBlockOrDie(sock);             AddConnection(sock, EVENT_IN,                           std::bind(&TcpServer::Recver, this, std::placeholders::_1),                           std::bind(&TcpServer::Sender, this, std::placeholders::_1),                           std::bind(&TcpServer::Excepter, this, std::placeholders::_1),                           ipbuf, peerport); // TODO         }         else         {             if (errno == EWOULDBLOCK)                 break;             else if (errno == EINTR)                 continue;             else                 break;         }     } } 

新连接到来时,EPoll通知机制调用accept方法分析

  •  Init函数将监听套接字以及回调方法注册到EPoll实例中
  • 服务器启动Loop 函数,进入事件循环,等待需要处理的连接
  • 当监听套接字上有新的连接请求时,EPoll会检测到监听套接字的EPOLLIN事件,同时调用回调函数Accepter进行处理

Init函数运行完成后,服务器不会自己循环等待接收新的连接,而是进入了一个事件驱动的循环,这个循环则是由EPoll进行管理。

 事件循环

作用:等待符合条件的事件发生,并调用其回调函数对连接进行处理

  • 回调函数绑定的时机:文件描述符注册到EPoll实例的时候,回调函数就已经绑定了
  • 调用回调函数:当epoll_wait返回的时候,遍历触发的事件,调用预先绑定好的回调函数去处理这些事件

 

 

事件处理

  •  新连接到来时
    • 调用connection类去存储新连接,同时存储连接需要调用的方法
    • 连接上有消息到来的时候,调用onmessage进行处理
      • 获取是哪个连接需要进行处理
      • 使用handler函数处理输入报文,并生成响应报文,如果生成的响应报文为空,则返回
      • 将响应报文追加到连接对象的输出缓冲区中
      • 发送响应报文

 

 

 

补充知识点

EPOLLIN和EPOLLET使用 

#include <sys/epoll.h> #include <unistd.h> #include <fcntl.h> #include <iostream>  // 设置文件描述符为非阻塞模式 void SetNonBlock(int fd) {     int flags = fcntl(fd, F_GETFL, 0);     fcntl(fd, F_SETFL, flags | O_NONBLOCK); }  int main() {     int epoll_fd = epoll_create1(0);     if (epoll_fd == -1) {         std::cerr << "Failed to create epoll file descriptor\n";         return 1;     }      int listen_fd = socket(AF_INET, SOCK_STREAM, 0);     if (listen_fd == -1) {         std::cerr << "Failed to create socket\n";         return 1;     }      SetNonBlock(listen_fd);      // 配置监听套接字     struct epoll_event event;     event.data.fd = listen_fd;     event.events = EPOLLIN | EPOLLET; // 使用 EVENT_IN      if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) {         std::cerr << "Failed to add file descriptor to epoll\n";         close(listen_fd);         return 1;     }      while (true) {         struct epoll_event events[10];         int n = epoll_wait(epoll_fd, events, 10, -1);         for (int i = 0; i < n; ++i) {             if (events[i].events & EPOLLIN) {                 // 处理可读事件                 std::cout << "Data is available to read\n";             }         }     }      close(listen_fd);     close(epoll_fd);     return 0; } 
fcnt1 文件描述符的操作函数

设置文件描述符属性

  • 参数
    • fd:文件描述符。
    • cmd:控制命令,决定了后续参数和操作的类型。
    • arg:可选参数,根据 cmd 的不同而不同。
  • 常用命令
    • F_GETFL:获取文件描述符标志。
    • F_SETFL:设置文件描述符标志。
    • F_GETFD:获取文件描述符的附加标志。
    • F_SETFD:设置文件描述符的附加标志。
    • F_SETLK:设置文件锁定或解锁(非阻塞)。
    • F_SETLKW:设置文件锁定或解锁(阻塞)。
  • 高并发服务器中的作用:设置非阻塞模式、设置文件锁等

 

结合使用具体代码 

Init函数 

函数作用总结

  • 创建套接字
  • 设置非阻塞
  • 绑定、监听
  • 添加新连接

Accepter 函数

    void Accepter(std::weak_ptr<Connection> conn)     {         auto connection = conn.lock();      // 检查共享指针是否有效     if (!connection) {         // 如果无效,说明 Connection 对象已被销毁         std::cerr << "Failed to lock connection, it may have been destroyed." << std::endl;         return;  // 直接返回,退出函数     }          while (true)         {             struct sockaddr_in peer;             socklen_t len = sizeof(peer);             int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len);             if (sock > 0)             {                 uint16_t peerport = ntohs(peer.sin_port);                 char ipbuf[128];                 inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf));                 lg(Debug, "get a new client, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, sock);                  SetNonBlockOrDie(sock);                 // listensock只需要设置_recv_cb, 而其他sock,读,写,异常                 AddConnection(sock, EVENT_IN,                               std::bind(&TcpServer::Recver, this, std::placeholders::_1),                               std::bind(&TcpServer::Sender, this, std::placeholders::_1),                               std::bind(&TcpServer::Excepter, this, std::placeholders::_1),                               ipbuf, peerport); // TODO             }             else             {                 if (errno == EWOULDBLOCK)                     break;                 else if (errno == EINTR)                     continue;                 else                     break;             }         }     }

  • auto connection = conn.lock()
    • 获取Connection对象的共享指针connection,如果Connection独享已经被销毁,则推导出的是空指针
  • 如果connection是空指针,则连接对象已经被销毁,则不需要继续往后执行
  • 无限循环,不断的接收新的客户端连接,然后将客户端连接地址信息存储到peer中,返回新的sock套接字
  • 获取客户端连接的IP地址和端口号,同时记录到日志
  • 套接字设置成非阻塞模式;调用增加连接函数,对该连接的接收、发送和异常处理函数进行绑定

std::bind

 

  • F&& f:要绑定的可调用对象,可以是函数指针、成员函数指针、函数对象、或仿函数。
  • Args&&... args:一组参数,这些参数将绑定到 f 的参数上。可以是具体的值,也可以是占位符(来自 std::placeholders 命名空间),用来表示在调用时由用户提供的参数。
  • std::placeholders::_1:表示绑定函数调用时的第一个参数。
  • std::placeholders::_2:表示绑定函数调用时的第二个参数。
  • std::placeholders::_3:表示绑定函数调用时的第三个参数,以此类推。

绑定全局函数 

#include <iostream> #include <functional>  int add(int a, int b) {     return a + b; }  int main() {     // 绑定函数add,固定参数a为10     auto add_10 = std::bind(add, 10, std::placeholders::_1);      // 调用add_10(5),相当于调用add(10, 5)     std::cout << "Result: " << add_10(5) << std::endl; // 输出 15      return 0; } 

绑定成员函数

#include <iostream> #include <functional>  class MyClass { public:     void print_sum(int a, int b) const {         std::cout << "Sum: " << a + b << std::endl;     } };  int main() {     MyClass obj;     // 绑定成员函数print_sum,固定对象为obj     auto bound_print_sum = std::bind(&MyClass::print_sum, &obj, std::placeholders::_1, std::placeholders::_2);      // 调用bound_print_sum(3, 4),相当于调用obj.print_sum(3, 4)     bound_print_sum(3, 4); // 输出 Sum: 7      return 0; } 

绑定成员变量

#include <iostream> #include <functional>  class MyClass { public:     int value; };  int main() {     MyClass obj;     obj.value = 42;      // 绑定成员变量value,固定对象为obj     auto get_value = std::bind(&MyClass::value, &obj);      // 调用get_value(),相当于访问obj.value     std::cout << "Value: " << get_value() << std::endl; // 输出 42      return 0; } 

绑定函数对象

#include <iostream> #include <functional>  struct Multiply {     int operator()(int a, int b) const {         return a * b;     } };  int main() {     Multiply multiply;     // 绑定函数对象multiply,固定参数a为5     auto multiply_by_5 = std::bind(multiply, 5, std::placeholders::_1);      // 调用multiply_by_5(3),相当于调用multiply(5, 3)     std::cout << "Result: " << multiply_by_5(3) << std::endl; // 输出 15      return 0; } 

sender函数

使用时机:已经对接收到的数据进行了处理,需要将处理后的数据发送出去

逻辑实现:从连接对象的输出缓冲区(outbuffer)中读取数据,并尝试发送到连接的套接字;如果发送过程中出现错误,则按照对应的错误类型进行处理;如果缓冲区中仍有未发送的数据,则启动对写事件的关心;否则则关闭对写事件的关心。

作用:检查和处理发送缓冲区的数据,实现对连接对象的可靠数据发送,同时考虑发送成功、连接关闭、非阻塞和中断等多种情况,同时必要时启动或者关闭对写事件的关注,从而实现高效的非阻塞的IO操作

EnableEvent函数

  • (readable ? EPOLLIN : 0) :如果读事件为真则设为真,否则设置为0,最后将关心的事件添加到event
  • 通过EpllerUpdate更新该套接字对读写的关心

    void EnableEvent(int sock, bool readable, bool writeable)     {         uint32_t events = 0;         events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET);         _epoller_ptr->EpllerUpdate(EPOLL_CTL_MOD, sock, events);     }

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!