目录
事件多路分离器
通过epoll实现,负责监听特定文件描述符中的特定事件
class Epoller : public nocopy { static const int size = 128; public: Epoller() { _epfd = epoll_create(size); if (_epfd == -1) { lg(Error, "epoll_create error: %s", strerror(errno)); } else { lg(Info, "epoll_create success: %d", _epfd); } } int EpollerWait(struct epoll_event revents[], int num, int timeout) { int n = epoll_wait(_epfd, revents, num, timeout); return n; } int EpllerUpdate(int oper, int sock, uint32_t event) { int n = 0; if (oper == EPOLL_CTL_DEL) { n = epoll_ctl(_epfd, oper, sock, nullptr); if (n != 0) { lg(Error, "epoll_ctl delete error! sockfd: %d", sock); } } else { // EPOLL_CTL_MOD || EPOLL_CTL_ADD struct epoll_event ev; ev.events = event; ev.data.fd = sock; n = epoll_ctl(_epfd, oper, sock, &ev); if (n != 0) { lg(Error, "epoll_ctl error!"); } } return n; } ~Epoller() { if (_epfd >= 0) close(_epfd); } private: int _epfd; int _timeout{3000}; };
事件分发器
TcpServer类中Dispatcher作为reactor的事件分发器,主要作用是从epoll中获取就绪事件并调取相应的回调函数
void Dispatcher(int timeout) { int n = _epoller_ptr->EpollerWait(revs, num, timeout); for (int i = 0; i < n; i++) { uint32_t events = revs[i].events;//哪些事件就绪 int sock = revs[i].data.fd;//哪一个文件描述符 if ((events & EPOLLIN) && IsConnectionSafe(sock)) { if (_connections[sock]->_recv_cb) _connections[sock]->_recv_cb(_connections[sock]); } if ((events & EPOLLOUT) && IsConnectionSafe(sock)) { if (_connections[sock]->_send_cb) _connections[sock]->_send_cb(_connections[sock]); } } }
事件处理器
Connection类中定义回调函数,同时在TcpServer类中进行绑定;回调函数(Recver \ Sender \ Excepter)分别处理读、写和异常事件
class Connection { public: void SetHandler(func_t recv_cb, func_t send_cb, except_func except_cb) { _recv_cb = recv_cb; _send_cb = send_cb; _except_cb = except_cb; } }; void TcpServer::Recver(std::weak_ptr<Connection> conn) { auto connection = conn.lock(); if (!connection) return; // 处理读事件 int sock = connection->SockFd(); while (true) { char buffer[g_buffer_size]; ssize_t n = recv(sock, buffer, sizeof(buffer), 0); if (n > 0) { connection->AppendInBuffer(buffer); } else if (n == 0) { connection->_except_cb(connection); return; } else { if (errno == EWOULDBLOCK) break; else { connection->_except_cb(connection); return; } } } _OnMessage(connection); } void TcpServer::Sender(std::weak_ptr<Connection> conn) { auto connection = conn.lock(); if (!connection) return; auto &outbuffer = connection->OutBuffer(); while (true) { ssize_t n = send(connection->SockFd(), outbuffer.data(), outbuffer.size(), 0); if (n > 0) { outbuffer.erase(0, n); if (outbuffer.empty()) break; } else { if (errno == EWOULDBLOCK) break; else { connection->_except_cb(connection); return; } } } if (!outbuffer.empty()) { EnableEvent(connection->SockFd(), true, true); } else { EnableEvent(connection->SockFd(), true, false); } } void TcpServer::Excepter(std::weak_ptr<Connection> conn) { auto connection = conn.lock(); if (!connection) return; int fd = connection->SockFd(); _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0); close(fd); _connections.erase(fd); }
事件处理器绑定实现方法
定义回调类型
func_t 用于处理常规事件;except_func用于处理异常事件
设置连接的回调函数
描述连接的类connection中,存储着处理三种事件的函数,同时设置拥有设置回调函数的接口
添加连接到服务器的连接管理
创建新的连接,同时绑定该连接的回调函数,并将其加入到Epoll中进行管理
绑定回调函数
服务器初始化或者处理连接的时候,使用std库中的bind来创建特定的成员函数回调。通过这些回调来处理网络事件(接收数据、发送数据、处理异常)
std::bind:C++标准库的工具,主要是用于创建函数对象,将函数与其参数绑定在一起。通过将成员函数和对象实例进行绑定,并创建一个可调用的对象。
Epoll检测事件并调用Dispatcher方法
Dispatcher根据事件类型调用相应的回调函数。Loop不断循环调用Dispatcher方法,处理所有已经就绪的事件。Dispatcher方法从Epoll中获取事件,并调用相应的回调函数处理事件。
异常处理
- 检查链接是否有效,如果指针为空则直接返回
- 获取文件描述符,并记录文件描述以及对应客户端信息
- 使用EpollerUpdate删除该文件描述符同时停止对该事件的关心
- 关闭该文件描述符,同时将其从管理连接的map中移除
void Excepter(std::weak_ptr<Connection> connection) { if(connection.expired()) return; auto conn = connection.lock(); int fd = conn->SockFd(); lg(Warning, "Excepter hander sockfd: %d, client info %s:%d excepter handler", conn->SockFd(), conn->_ip.c_str(), conn->_port); // 1. 移除对特定fd的关心 // EnableEvent(connection->SockFd(), false, false); _epoller_ptr->EpllerUpdate(EPOLL_CTL_DEL, fd, 0); // 2. 关闭异常的文件描述符 lg(Debug, "close %d done...\n", fd); close(fd); // 3. 从unordered_map中移除 lg(Debug, "remove %d from _connections...\n", fd); _connections.erase(fd); }
读取处理
- 获取该连接的指针和文件描述符
- 将sock中的数据非阻塞读取到缓冲区中
- 判断是否读取成功
- 读取成功,接收数据,追加到连接的输入缓冲区
- 关闭连接,记录到日志
- 错误处理,接收错误、信号中断错误、其他错误
void Recver(std::weak_ptr<Connection> conn) { if(conn.expired()) return; auto connection = conn.lock(); // std::cout << "haha, got you!!!!, sockfd: " << connection->SockFd() << std::endl; int sock = connection->SockFd(); while (true) { char buffer[g_buffer_size]; memset(buffer, 0, sizeof(buffer)); ssize_t n = recv(sock, buffer, sizeof(buffer) - 1, 0); // 非阻塞读取 if (n > 0) { connection->AppendInBuffer(buffer); } else if (n == 0) { lg(Info, "sockfd: %d, client info %s:%d quit...", sock, connection->_ip.c_str(), connection->_port); connection->_except_cb(connection); return; } else { if (errno == EWOULDBLOCK) break; else if (errno == EINTR) continue; else { lg(Warning, "sockfd: %d, client info %s:%d recv error...", sock, connection->_ip.c_str(), connection->_port); connection->_except_cb(connection); return; } } } // 交给上层检测数据接收的是否完整 _OnMessage(connection); // 你读到的sock所有的数据connection
TCPServer整体实现代码
实现逻辑分析
初始化
Init 函数
- 创建监听套接字
- 设置非阻塞模式
- 绑定该服务器的端口
- 开始监听
- 添加到Epoll事件管理器
AddConnnection方法
- 将套接字文件描述符添加到epoll实例中,监听EVENT_IN事件
- 将TcpServer::Accepter方法绑定回调函数(当有新连接到来的时候,Epoll会调用Accepter方法)
Accepter
- 服务器接收到客户端新连接并进行处理
- while循环
- 循环调用accept接受新的连接,其调用返回一个新的套接字用于客户端和服务端进行通信
- 处理新连接
peerport
:从网络字节序转换为主机字节序,获取客户端端口号。inet_ntop
:将客户端 IP 地址转换为字符串格式,存储在ipbuf
中。lg
:记录日志,输出新连接的客户端信息。- 设置非阻塞模式,提高IO处理效率
- 添加到EPoll
- 使用std::bind绑定TcpServer类方法中的读写和异常方法,确保在处理事件的时候能够访问到当前对象的成员函数
void Accepter(std::weak_ptr<Connection> conn) { auto connection = conn.lock(); while (true) { struct sockaddr_in peer; socklen_t len = sizeof(peer); int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len); if (sock > 0) { uint16_t peerport = ntohs(peer.sin_port); char ipbuf[128]; inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf)); lg(Debug, "get a new client, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, sock); SetNonBlockOrDie(sock); AddConnection(sock, EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1), std::bind(&TcpServer::Excepter, this, std::placeholders::_1), ipbuf, peerport); // TODO } else { if (errno == EWOULDBLOCK) break; else if (errno == EINTR) continue; else break; } } }
新连接到来时,EPoll通知机制调用accept方法分析
- Init函数将监听套接字以及回调方法注册到EPoll实例中
- 服务器启动Loop 函数,进入事件循环,等待需要处理的连接
- 当监听套接字上有新的连接请求时,EPoll会检测到监听套接字的EPOLLIN事件,同时调用回调函数Accepter进行处理
Init函数运行完成后,服务器不会自己循环等待接收新的连接,而是进入了一个事件驱动的循环,这个循环则是由EPoll进行管理。
事件循环
作用:等待符合条件的事件发生,并调用其回调函数对连接进行处理
- 回调函数绑定的时机:文件描述符注册到EPoll实例的时候,回调函数就已经绑定了
- 调用回调函数:当epoll_wait返回的时候,遍历触发的事件,调用预先绑定好的回调函数去处理这些事件
事件处理
- 新连接到来时
- 调用connection类去存储新连接,同时存储连接需要调用的方法
- 连接上有消息到来的时候,调用onmessage进行处理
- 获取是哪个连接需要进行处理
- 使用handler函数处理输入报文,并生成响应报文,如果生成的响应报文为空,则返回
- 将响应报文追加到连接对象的输出缓冲区中
- 发送响应报文
补充知识点
EPOLLIN和EPOLLET使用
#include <sys/epoll.h> #include <unistd.h> #include <fcntl.h> #include <iostream> // 设置文件描述符为非阻塞模式 void SetNonBlock(int fd) { int flags = fcntl(fd, F_GETFL, 0); fcntl(fd, F_SETFL, flags | O_NONBLOCK); } int main() { int epoll_fd = epoll_create1(0); if (epoll_fd == -1) { std::cerr << "Failed to create epoll file descriptor\n"; return 1; } int listen_fd = socket(AF_INET, SOCK_STREAM, 0); if (listen_fd == -1) { std::cerr << "Failed to create socket\n"; return 1; } SetNonBlock(listen_fd); // 配置监听套接字 struct epoll_event event; event.data.fd = listen_fd; event.events = EPOLLIN | EPOLLET; // 使用 EVENT_IN if (epoll_ctl(epoll_fd, EPOLL_CTL_ADD, listen_fd, &event) == -1) { std::cerr << "Failed to add file descriptor to epoll\n"; close(listen_fd); return 1; } while (true) { struct epoll_event events[10]; int n = epoll_wait(epoll_fd, events, 10, -1); for (int i = 0; i < n; ++i) { if (events[i].events & EPOLLIN) { // 处理可读事件 std::cout << "Data is available to read\n"; } } } close(listen_fd); close(epoll_fd); return 0; }
fcnt1 文件描述符的操作函数
设置文件描述符属性
- 参数
- fd:文件描述符。
- cmd:控制命令,决定了后续参数和操作的类型。
- arg:可选参数,根据
cmd
的不同而不同。- 常用命令
- F_GETFL:获取文件描述符标志。
- F_SETFL:设置文件描述符标志。
- F_GETFD:获取文件描述符的附加标志。
- F_SETFD:设置文件描述符的附加标志。
- F_SETLK:设置文件锁定或解锁(非阻塞)。
- F_SETLKW:设置文件锁定或解锁(阻塞)。
- 高并发服务器中的作用:设置非阻塞模式、设置文件锁等
结合使用具体代码
Init函数
函数作用总结
- 创建套接字
- 设置非阻塞
- 绑定、监听
- 添加新连接
Accepter 函数
void Accepter(std::weak_ptr<Connection> conn) { auto connection = conn.lock(); // 检查共享指针是否有效 if (!connection) { // 如果无效,说明 Connection 对象已被销毁 std::cerr << "Failed to lock connection, it may have been destroyed." << std::endl; return; // 直接返回,退出函数 } while (true) { struct sockaddr_in peer; socklen_t len = sizeof(peer); int sock = ::accept(connection->SockFd(), (struct sockaddr *)&peer, &len); if (sock > 0) { uint16_t peerport = ntohs(peer.sin_port); char ipbuf[128]; inet_ntop(AF_INET, &peer.sin_addr.s_addr, ipbuf, sizeof(ipbuf)); lg(Debug, "get a new client, get info-> [%s:%d], sockfd : %d", ipbuf, peerport, sock); SetNonBlockOrDie(sock); // listensock只需要设置_recv_cb, 而其他sock,读,写,异常 AddConnection(sock, EVENT_IN, std::bind(&TcpServer::Recver, this, std::placeholders::_1), std::bind(&TcpServer::Sender, this, std::placeholders::_1), std::bind(&TcpServer::Excepter, this, std::placeholders::_1), ipbuf, peerport); // TODO } else { if (errno == EWOULDBLOCK) break; else if (errno == EINTR) continue; else break; } } }
- auto connection = conn.lock()
- 获取Connection对象的共享指针connection,如果Connection独享已经被销毁,则推导出的是空指针
- 如果connection是空指针,则连接对象已经被销毁,则不需要继续往后执行
- 无限循环,不断的接收新的客户端连接,然后将客户端连接地址信息存储到peer中,返回新的sock套接字
- 获取客户端连接的IP地址和端口号,同时记录到日志
- 套接字设置成非阻塞模式;调用增加连接函数,对该连接的接收、发送和异常处理函数进行绑定
std::bind
F&& f
:要绑定的可调用对象,可以是函数指针、成员函数指针、函数对象、或仿函数。Args&&... args
:一组参数,这些参数将绑定到f
的参数上。可以是具体的值,也可以是占位符(来自std::placeholders
命名空间),用来表示在调用时由用户提供的参数。std::placeholders::_1
:表示绑定函数调用时的第一个参数。std::placeholders::_2
:表示绑定函数调用时的第二个参数。std::placeholders::_3
:表示绑定函数调用时的第三个参数,以此类推。
绑定全局函数
#include <iostream> #include <functional> int add(int a, int b) { return a + b; } int main() { // 绑定函数add,固定参数a为10 auto add_10 = std::bind(add, 10, std::placeholders::_1); // 调用add_10(5),相当于调用add(10, 5) std::cout << "Result: " << add_10(5) << std::endl; // 输出 15 return 0; }
绑定成员函数
#include <iostream> #include <functional> class MyClass { public: void print_sum(int a, int b) const { std::cout << "Sum: " << a + b << std::endl; } }; int main() { MyClass obj; // 绑定成员函数print_sum,固定对象为obj auto bound_print_sum = std::bind(&MyClass::print_sum, &obj, std::placeholders::_1, std::placeholders::_2); // 调用bound_print_sum(3, 4),相当于调用obj.print_sum(3, 4) bound_print_sum(3, 4); // 输出 Sum: 7 return 0; }
绑定成员变量
#include <iostream> #include <functional> class MyClass { public: int value; }; int main() { MyClass obj; obj.value = 42; // 绑定成员变量value,固定对象为obj auto get_value = std::bind(&MyClass::value, &obj); // 调用get_value(),相当于访问obj.value std::cout << "Value: " << get_value() << std::endl; // 输出 42 return 0; }
绑定函数对象
#include <iostream> #include <functional> struct Multiply { int operator()(int a, int b) const { return a * b; } }; int main() { Multiply multiply; // 绑定函数对象multiply,固定参数a为5 auto multiply_by_5 = std::bind(multiply, 5, std::placeholders::_1); // 调用multiply_by_5(3),相当于调用multiply(5, 3) std::cout << "Result: " << multiply_by_5(3) << std::endl; // 输出 15 return 0; }
sender函数
使用时机:已经对接收到的数据进行了处理,需要将处理后的数据发送出去
逻辑实现:从连接对象的输出缓冲区(outbuffer)中读取数据,并尝试发送到连接的套接字;如果发送过程中出现错误,则按照对应的错误类型进行处理;如果缓冲区中仍有未发送的数据,则启动对写事件的关心;否则则关闭对写事件的关心。
作用:检查和处理发送缓冲区的数据,实现对连接对象的可靠数据发送,同时考虑发送成功、连接关闭、非阻塞和中断等多种情况,同时必要时启动或者关闭对写事件的关注,从而实现高效的非阻塞的IO操作
EnableEvent函数
- (readable ? EPOLLIN : 0) :如果读事件为真则设为真,否则设置为0,最后将关心的事件添加到event
- 通过EpllerUpdate更新该套接字对读写的关心
void EnableEvent(int sock, bool readable, bool writeable) { uint32_t events = 0; events |= ((readable ? EPOLLIN : 0) | (writeable ? EPOLLOUT : 0) | EPOLLET); _epoller_ptr->EpllerUpdate(EPOLL_CTL_MOD, sock, events); }