文章目录
1.初识poll
poll是Linux系统中的一个系统调用,它用于监控多个文件描述符(file descriptors,如套接字、管道、文件等)的状态变化。通过poll,程序可以同时等待多个文件描述符上发生的特定事件(如可读、可写、错误等),而无需为每个文件描述符创建单独的线程或进程。这使得poll成为处理多个并发连接和I/O操作的有效手段。
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout);
fds:是一个指向struct pollfd结构体数组的指针,每个元素指定了一个要检查的文件描述符及其感兴趣的事件。
nfds:指定了fds数组中元素的数量。
timeout:指定了poll调用阻塞的时间,以毫秒为单位。如果设置为-1,poll将无限期地等待;如果设置为0,poll将立即返回;如果设置为正数,poll将在指定的毫秒数后超时。
poll与select的主要联系与区别
- 联系
多路复用:poll和select都是I/O多路复用机制,允许程序同时监控多个文件描述符,提高了程序处理并发I/O的效率。
事件通知:两者都提供了一种机制,通过事件通知程序哪些文件描述符已经准备好进行读取、写入或有异常情况。
非阻塞I/O:它们都可以与非阻塞I/O结合使用,使得程序可以在没有数据可读或写入时继续执行其他任务。 - 区别
- 文件描述符数量限制:
select:通常受到系统文件描述符数量限制的限制,一般在1024个左右。
poll:没有文件描述符数量的硬限制,理论上可以监控更多的文件描述符(但实际上受限于系统资源和内存限制)。 - 数据结构:
select:使用三个独立的位图(bitmap)来跟踪文件描述符的状态,这些位图在内核空间维护。
poll:使用一个struct pollfd结构体数组来存储要监控的文件描述符及其事件,这个数组在用户空间维护。 - 事件信息丰富度:
select:需要程序遍历位图来查找事件,对于每个文件描述符的状态变化,select提供的信息相对较少。
poll:struct pollfd结构体中的revents字段在事件发生时会被内核设置,提供了更丰富的信息关于每个文件描述符的状态变化。 - 性能:
select:在文件描述符数量较多时,性能会下降,因为需要遍历整个位图来查找就绪的文件描述符。
poll:虽然也需要在用户空间和内核空间之间复制文件描述符集合,但由于其数据结构的设计,在处理大量文件描述符时可能具有更好的性能。然而,如果监控的文件描述符数量非常大,仍然可能遇到性能瓶颈。 - 超时精度:
select:超时参数是一个整数值,表示调用应该等待的秒数,精度较低。
poll:超时参数是一个毫秒值,提供了更高的时间精度。 - 移植性:
两者都具有良好的可移植性,可以在不同的Unix-like系统中使用。但需要注意的是,在某些特定系统或环境下,poll的支持可能不如select广泛。
综上所述,poll和select在Linux系统中都扮演着重要的角色,但在具体使用时需要根据应用场景、文件描述符数量、性能要求等因素进行选择。
poll的原理
poll函数是Linux系统中的一个重要系统调用,用于监控多个文件描述符(file descriptors)的状态变化。下面从参数和底层原理两个方面对poll函数进行简要叙述。
#include <poll.h> int poll(struct pollfd fds[], nfds_t nfds, int timeout);
fds:这是一个指向struct pollfd结构体数组的指针。每个pollfd结构体代表了一个要监控的文件描述符及其感兴趣的事件。pollfd结构体的定义通常如下:
c struct pollfd { int fd; // 文件描述符 short events; // 等待的事件 short revents; // 实际发生了的事件 };
fd:要监控的文件描述符。
events:请求监控的事件,可以是读、写、异常等多种事件的组合。
revents:函数返回时,由内核设置,表示实际发生的事件。
nfds:指定了fds数组中元素的数量,即要监控的文件描述符的总数。
timeout:指定了poll调用阻塞的时间,以毫秒为单位。
如果timeout为正数,poll将等待指定的毫秒数。
如果timeout为0,poll将立即返回,不阻塞。
如果timeout为-1,poll将无限期地等待,直到有事件发生。
poll函数的底层原理
poll函数的底层实现原理主要基于等待队列。当调用poll函数时,它会遍历传入的pollfd结构体数组,为每个感兴趣的文件描述符注册一个等待事件。这些等待事件会被挂接到内核中相应的等待队列上。
等待队列:内核中的每个文件描述符都可能关联有一个或多个等待队列,用于存放等待该文件描述符上特定事件发生的进程或线程。
轮询与阻塞:poll函数会轮询检查每个文件描述符的等待队列,查看是否有事件发生。
如果有文件描述符上发生了感兴趣的事件,poll函数会立即返回,并将这些事件记录在对应pollfd结构体的revents字段中。
如果没有任何文件描述符上发生事件,并且timeout参数指定了非零值,poll函数会进入阻塞状态,直到超时时间到达或至少有一个文件描述符上发生了事件。
返回与错误处理:
当有文件描述符上发生事件或超时时间到达时,poll函数会返回。返回值表示发生了事件的文件描述符数量(如果大于0),或者在超时时返回0,或者在发生错误时返回-1并设置errno以指示错误原因。
效率与限制:虽然poll函数没有像select那样有文件描述符数量的硬限制,但在处理大量文件描述符时,仍然需要将整个pollfd数组在用户空间和内核空间之间复制,这可能会导致性能下降。此外,poll函数在内部仍然需要遍历所有要监控的文件描述符,因此在大规模并发场景下可能不是最高效的解决方案。
综上所述,poll函数通过轮询和等待队列机制实现了对多个文件描述符状态的监控,是Linux系统中处理I/O多路复用的重要手段之一。然而,在处理大量文件描述符时,可能需要考虑其性能限制并探索更高效的解决方案(如epoll)。
源码
#include <poll.h> int poll(struct pollfd *fds, nfds_t nfds, int timeout); // pollfd结构 struct pollfd { int fd; /* file descriptor */ short events; /* requested events */ short revents; /* returned events */ };
多路转接包括:用户告诉内核你需要关心什么 && 内核告诉用户你让我关心的fd有哪些就绪了。select 用位图,poll 用结构体数组。poll 在用户传给内核的时候,告诉内核需要关心 struct pollfd 结构体中的 fd 中的 events 事件;返回时,内核设置struct pollfd 结构体中的 revents 事件,表示该fd的该事件就绪。poll 最大的特点:将输入和输出事件进行分离!
内核怎么知道是关心读事件还是写事件还是其他事件呢?当内核返回用户也一样。 events 和 revents 都是 short 类型,都是 16 个比特位,在 Linux 中,使用比特位传参!把事件设置成位图的形式。
poll 的本质是将读写事件分离,传入用户定的数组元素的大小,通过 events 和 revents 以位图的方式来传递就绪和关心标记位的解决方案!
poll的优点
poll 也是多路转接方案的一种,它主要解决的就是 select 中的等待 fd 有上限的问题,以及每次都要对关心的 fd 进行事件重置的问题。
不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现.
pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便。
poll并没有最大数量限制 (但是数量过大后性能也是会下降).
poll的缺点
poll中监听的文件描述符数目增多时和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.
每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.
同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效率也会线性下降.
poll vs select
poll 本质上是通过一个结构体数组来等待 fd 的,它解决了 select 等待 fd 有上限的问题,如何解决?_event_fds 这个数组的大小是自己定的,可以定的非常大,大到内存扛不住,此时就是操作系统/内存 软件或硬件的问题了,不是 poll 接口本身的问题。select 等待 fd 有上限的问题,本质上是接口本身的问题,poll 本质上是解决了 select 等待 fd 有上限的问题。
poll 与 select 都需要遍历检测有哪些文件描述符就绪,poll 在内核中需要遍历检测有哪些文件描述符就绪;在用户层需要遍历检测有哪些事件已经就绪。
poll 和 select 都避免不开遍历的问题,当fd过多,效率提升不明显。
2.poll开发多客户端echo服务器
封装套接字接口
#pragma once #include <iostream> #include <string> #include <unistd.h> #include <cstring> #include <sys/types.h> #include <sys/stat.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include "Log.hpp" enum { SocketErr = 2, BindErr, ListenErr, }; const int g_backlog = 10; class Sock { private: int _sockfd; public: Sock() { } ~Sock() { } public: void Socket() { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) { lg(Fatal, "socker error, %s: %d", strerror(errno), errno); exit(SocketErr); } } void Bind(uint16_t port) { struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; if (bind(_sockfd, (struct sockaddr *)&local, sizeof(local)) < 0) { lg(Fatal, "bind error, %s: %d", strerror(errno), errno); exit(BindErr); } } void Listen() { if (listen(_sockfd, g_backlog) < 0) { lg(Fatal, "listen error, %s: %d", strerror(errno), errno); exit(ListenErr); } } int Accept(std::string *clientip, uint16_t *clientport) { struct sockaddr_in peer; socklen_t len = sizeof(peer); int newfd = accept(_sockfd, (struct sockaddr *)&peer, &len); if (newfd < 0) { lg(Warning, "accept error, %s: %d", strerror(errno), errno); return -1; } char ipstr[64]; inet_ntop(AF_INET, &peer.sin_addr, ipstr, sizeof(ipstr)); *clientip = ipstr; *clientport = ntohs(peer.sin_port); return newfd; } bool Connect(const std::string &ip, const uint16_t &port) { struct sockaddr_in peer; memset(&peer, 0, sizeof(peer)); peer.sin_family = AF_INET; peer.sin_port = htons(port); inet_pton(AF_INET, ip.c_str(), &(peer.sin_addr)); int n = connect(_sockfd, (struct sockaddr *)&peer, sizeof(peer)); if (n == -1) { std::cerr << "connect to " << ip << ":" << port << " error" << std::endl; return false; } return true; } void CloseFd() { close(_sockfd); } int getSocketFd() { return _sockfd; } };
Makefile
poll_server:Main.cc g++ -o $@ $^ -std=c++11 .PHONY:clean clean: rm -f poll_server
主函数
#include "PollServer.hpp" #include <memory> int main() { std::unique_ptr<PollServer> svr(new PollServer()); svr->Init(); svr->Start(); return 0; }
日志服务
#pragma once #include <iostream> #include <time.h> #include <stdarg.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #define SIZE 1024 #define Info 0 #define Debug 1 #define Warning 2 #define Error 3 #define Fatal 4 #define Screen 1 #define Onefile 2 #define Classfile 3 #define LogFile "log.txt" class Log { private: int printMethod; std::string path; public: Log() { printMethod = Screen; path = "./"; } void Enable(int method) { printMethod = method; } std::string levelToString(int level) { switch (level) { case Info: return "Info"; case Debug: return "Debug"; case Warning: return "Warning"; case Error: return "Error"; case Fatal: return "Fatal"; default: return "None"; } } /* void logmessage(int level, const char *format, ...) { time_t t = time(nullptr); struct tm *ctime = localtime(&t); char leftbuffer[SIZE]; snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(), ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec); va_list s; va_start(s, format); char rightbuffer[SIZE]; vsnprintf(rightbuffer, sizeof(rightbuffer), format, s); va_end(s); // 格式:默认部分+自定义部分 char logtxt[SIZE * 2]; snprintf(logtxt, sizeof(logtxt), "%s %s\n", leftbuffer, rightbuffer); // printf("%s", logtxt); printLog(level, logtxt); } */ // lg(Warning, "accept error, %s: %d", strerror(errno), errno); void operator()(int level, const char *msg_format, ...) { time_t timestamp = time(nullptr); struct tm *ctime = localtime(×tamp); //level 年月日 char leftbuffer[SIZE]; snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(), ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec); //自定义msg va_list arg_list;//存储可变参数列表信息 va_start(arg_list, msg_format);//初始化 使其指向函数参数列表中format参数之后的第一个可变参数 char rightbuffer[SIZE]; vsnprintf(rightbuffer, sizeof(rightbuffer), msg_format, arg_list); va_end(arg_list);//清理va_list变量 // 格式:默认部分+自定义部分 char log_content[SIZE * 2]; snprintf(log_content, sizeof(log_content), "%s %s", leftbuffer, rightbuffer); // printf("%s", logtxt); // 暂时打印 printLog(level, log_content); } void printLog(int level, const std::string &log_content) { switch (printMethod) { case Screen: std::cout << log_content << std::endl; break; case Onefile: printOneFile(LogFile, log_content); break; case Classfile: printClassFile(level, log_content); break; default: break; } } void printOneFile(const std::string &log_filename, const std::string &log_content) { //path = "./"; #define LogFile "log.txt" std::string _logFilename = path + log_filename; int fd = open(_logFilename.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt" if (fd < 0) return; write(fd, log_content.c_str(), log_content.size()); close(fd); } void printClassFile(int level, const std::string &log_content) { //#define LogFile "log.txt" std::string filename = LogFile; filename += "."; filename += levelToString(level); // "log.txt.Debug" printOneFile(filename, log_content); } ~Log() { } }; Log lg; /* int sum(int n, ...) { va_list s; // char* va_start(s, n); int sum = 0; while(n) { sum += va_arg(s, int); // printf("hello %d, hello %s, hello %c, hello %d,", 1, "hello", 'c', 123); n--; } va_end(s); //s = NULL return sum; } */
聊天服务器
#pragma once #include <iostream> #include <poll.h> #include <sys/time.h> #include "Socket.hpp" using namespace std; static const uint16_t defaultport = 8888; static const int fd_num_max = 64; int defaultFd = -1; int non_event = 0; class PollServer { private: Sock _listensock; uint16_t _port; struct pollfd _event_fds[fd_num_max]; // 结构体数组 // struct pollfd *_event_fds; 动态数组 可扩容 // int fd_array[fd_num_max]; // int wfd_array[fd_num_max]; public: PollServer(uint16_t port = defaultport) : _port(port) { for (int i = 0; i < fd_num_max; i++) { _event_fds[i].fd = defaultFd; _event_fds[i].events = non_event; _event_fds[i].revents = non_event; } } bool Init() { _listensock.Socket(); _listensock.Bind(_port); _listensock.Listen(); return true; } void Accepter() { // 连接事件就绪 std::string clientip; uint16_t clientport = 0; int sock = _listensock.Accept(&clientip, &clientport); // 会不会阻塞在这里?不会 if (sock < 0) return; lg(Info, "accept success, %s: %d, sock fd: %d", clientip.c_str(), clientport, sock); // sock -> _event_fds[] int pos = 1; for (; pos < fd_num_max; pos++) // 第二个循环 { if (_event_fds[pos].fd != defaultFd) continue; else break; } if (pos == fd_num_max) { lg(Warning, "server is full, close %d now!", sock); close(sock); // 也可以不关闭fd 扩容存fd } else { _event_fds[pos].fd = sock; _event_fds[pos].events = POLLIN; _event_fds[pos].revents = non_event; PrintFd(); } } void Recver(int fd, int pos) { char buffer[1024]; ssize_t n = read(fd, buffer, sizeof(buffer) - 1); // 需要考虑是否是完整数据包问题(此处忽略) if (n > 0) { buffer[n] = 0; cout << "get a messge: " << buffer << endl; } else if (n == 0) { lg(Info, "client quit, me too, close fd is : %d", fd); close(fd); _event_fds[pos].fd = defaultFd; } else { lg(Warning, "recv error: fd is : %d", fd); close(fd); _event_fds[pos].fd = defaultFd; } } void Dispatcher() { for (int i = 0; i < fd_num_max; i++) // 这是第三个循环 { int fd = _event_fds[i].fd; if (fd == defaultFd) continue; if (_event_fds[i].revents & POLLIN) { if (fd == _listensock.getSocketFd()) Accepter(); // 连接管理器 else Recver(fd, i); // non listenfd } } } void Start() { _event_fds[0].fd = _listensock.getSocketFd(); _event_fds[0].events = POLLIN; int timeout = 3000; // 3s for (;;) { int n = poll(_event_fds, fd_num_max, timeout); switch (n) { case 0: cout << "time out... " << endl; break; case -1: cerr << "poll error" << endl; break; default: // 有事件就绪了 cout << "get a new link!!!!!" << endl; Dispatcher(); break; } } } void PrintFd() { cout << "online fd list: "; for (int i = 0; i < fd_num_max; i++) { if (_event_fds[i].fd == defaultFd) continue; cout << _event_fds[i].fd << " "; } cout << endl; } ~PollServer() { _listensock.CloseFd(); } };