目录
前言
在前面,我们学习了五种IO模型,对IO有了基本的认识,知道了select效率很高,可以等待多个文件描述符,那他是如何等待的呢?我们又该如何使用呢?
一、select的认识
系统提供select函数来实现多路复用输入/输出模型
- select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
- 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变
select只负责等待,不负责拷贝,一次可以等待多个文件描述符。他的作用是让read和write不再阻塞。
二、select的接口
select的调用接口如下
参数 1 int nfds:值最大的文件描述符+1。
参数 2 fd_set* readfds:fd_set本质是一张位图。代表select需要关心的读事件
参数 3 fd_set* writefds:代表select需要关心的读事件
参数 4 fd_set* execptfdsfds:代表select需要关心的异常事件,我们暂时不考虑
参数 5 struct timeval* timeout:时间结构体,成员有秒和微秒,代表等待的时间
{n,m}为阻塞等待n秒m微秒,时间结束后返回
{0,0}为非阻塞等待
nullptr为阻塞等待
参数2,3,4类似,都是输入输出型参数,参数5也是输入输出型参数,输出的是剩余时间
以readfds为例
输入时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),用户关心内核,是否关心这个fd的读事件。
输出时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),内核告诉用户,哪些文件fd上的读事件是否就绪
返回值:
- ret > 0 :select等待的多个fd中,已经就需要的fd个数
- ret == 0 :select超时返回
- ret < 0 :select出错
同时,fd_set 是特定的类型,我们对其赋值时,是不方便赋值的,因此库里面也给提供的一个函数,方便我们处理。
FD_CLR 从文件描述符集合
set
中清除文件描述符fd。
FD_ISSET 检查文件描述符
fd
是否在文件描述符集合set
中。FD_SET 将文件描述符
fd
添加到文件描述符集合set
中。FD_ZERO 清空文件描述符集合
set
,将其所有位都设置为零。
三、select的使用
Log.hpp
#pragma once #include <iostream> #include <cstdarg> #include <unistd.h> #include <sys/stat.h> #include <sys/types.h> #include <pthread.h> using namespace std; enum { Debug = 0, Info, Warning, Error, Fatal }; enum { Screen = 10, OneFile, ClassFile }; string LevelToString(int level) { switch (level) { case Debug: return "Debug"; case Info: return "Info"; case Warning: return "Warning"; case Error: return "Error"; case Fatal: return "Fatal"; default: return "Unknown"; } } const int default_style = Screen; const string default_filename = "Log."; const string logdir = "log"; class Log { public: Log(int style = default_style, string filename = default_filename) : _style(style), _filename(filename) { if (_style != Screen) mkdir(logdir.c_str(), 0775); } // 更改打印方式 void Enable(int style) { _style = style; if (_style != Screen) mkdir(logdir.c_str(), 0775); } // 时间戳转化为年月日时分秒 string GetTime() { time_t currtime = time(nullptr); struct tm *curr = localtime(&currtime); char time_buffer[128]; snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d", curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday, curr->tm_hour, curr->tm_min, curr->tm_sec); return time_buffer; } // 写入到文件中 void WriteLogToOneFile(const string &logname, const string &message) { FILE *fp = fopen(logname.c_str(), "a"); if (fp == nullptr) { perror("fopen failed"); exit(-1); } fprintf(fp, "%s\n", message.c_str()); fclose(fp); } // 打印日志 void WriteLogToClassFile(const string &levelstr, const string &message) { string logname = logdir; logname += "/"; logname += _filename; logname += levelstr; WriteLogToOneFile(logname, message); } pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; void WriteLog(const string &levelstr, const string &message) { pthread_mutex_lock(&lock); switch (_style) { case Screen: cout << message << endl; // 打印到屏幕中 break; case OneFile: WriteLogToClassFile("all", message); // 给定all,直接写到all里 break; case ClassFile: WriteLogToClassFile(levelstr, message); // 写入levelstr里 break; default: break; } pthread_mutex_unlock(&lock); } // 提供接口给运算符重载使用 void _LogMessage(int level, const char *file, int line, char *rightbuffer) { char leftbuffer[1024]; string levelstr = LevelToString(level); string currtime = GetTime(); string idstr = to_string(getpid()); snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s][%s:%d]", levelstr.c_str(), currtime.c_str(), idstr.c_str(), file, line); string messages = leftbuffer; messages += rightbuffer; WriteLog(levelstr, messages); } // 运算符重载 void operator()(int level, const char *file, int line, const char *format, ...) { char rightbuffer[1024]; va_list args; // va_list 是指针 va_start(args, format); // 初始化va_list对象,format是最后一个确定的参数 vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 写入到rightbuffer中 va_end(args); _LogMessage(level, file, line, rightbuffer); } ~Log() { } private: int _style; string _filename; }; Log lg; class Conf { public: Conf() { lg.Enable(Screen); } ~Conf() { } }; Conf conf; // 辅助宏 #define lg(level, format, ...) lg(level, __FILE__, __LINE__, format, ##__VA_ARGS__)
Socket.hpp
#pragma once #include <iostream> #include <string> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <cstring> #include <unistd.h> using namespace std; namespace Net_Work { static const int default_backlog = 5; static const int default_sockfd = -1; using namespace std; enum { SocketError = 1, BindError, ListenError, ConnectError, }; // 封装套接字接口基类 class Socket { public: // 封装了socket相关方法 virtual ~Socket() {} virtual void CreateSocket() = 0; virtual void BindSocket(uint16_t port) = 0; virtual void ListenSocket(int backlog) = 0; virtual bool ConnectSocket(string &serverip, uint16_t serverport) = 0; virtual Socket *AcceptSocket(string *peerip, uint16_t *peerport) = 0; virtual int GetSockFd() = 0; virtual void SetSockFd(int sockfd) = 0; virtual void CloseSocket() = 0; virtual bool Recv(string *buff, int size) = 0; virtual void Send(string &send_string) = 0; // 方法的集中在一起使用 public: void BuildListenSocket(uint16_t port, int backlog = default_backlog) { CreateSocket(); BindSocket(port); ListenSocket(backlog); } bool BuildConnectSocket(string &serverip, uint16_t serverport) { CreateSocket(); return ConnectSocket(serverip, serverport); } void BuildNormalSocket(int sockfd) { SetSockFd(sockfd); } }; class TcpSocket : public Socket { public: TcpSocket(int sockfd = default_sockfd) : _sockfd(sockfd) { } ~TcpSocket() {} void CreateSocket() override { _sockfd = socket(AF_INET, SOCK_STREAM, 0); if (_sockfd < 0) exit(SocketError); } void BindSocket(uint16_t port) override { int opt = 1; setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt)); struct sockaddr_in local; memset(&local, 0, sizeof(local)); local.sin_family = AF_INET; local.sin_port = htons(port); local.sin_addr.s_addr = INADDR_ANY; int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local)); if (n < 0) exit(BindError); } void ListenSocket(int backlog) override { int n = listen(_sockfd, backlog); if (n < 0) exit(ListenError); } bool ConnectSocket(string &serverip, uint16_t serverport) override { struct sockaddr_in addr; memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(serverport); // addr.sin_addr.s_addr = inet_addr(serverip.c_str()); inet_pton(AF_INET, serverip.c_str(), &addr.sin_addr); int n = connect(_sockfd, (sockaddr *)&addr, sizeof(addr)); if (n == 0) return true; return false; } Socket *AcceptSocket(string *peerip, uint16_t *peerport) override { struct sockaddr_in addr; socklen_t len = sizeof(addr); int newsockfd = accept(_sockfd, (sockaddr *)&addr, &len); if (newsockfd < 0) return nullptr; // *peerip = inet_ntoa(addr.sin_addr); // INET_ADDRSTRLEN 是一个定义在头文件中的宏,表示 IPv4 地址的最大长度 char ip_str[INET_ADDRSTRLEN]; inet_ntop(AF_INET, &addr.sin_addr, ip_str, INET_ADDRSTRLEN); *peerip = ip_str; *peerport = ntohs(addr.sin_port); Socket *s = new TcpSocket(newsockfd); return s; } int GetSockFd() override { return _sockfd; } void SetSockFd(int sockfd) override { _sockfd = sockfd; } void CloseSocket() override { if (_sockfd > default_sockfd) close(_sockfd); } bool Recv(string *buff, int size) override { char inbuffer[size]; ssize_t n = recv(_sockfd, inbuffer, size - 1, 0); if (n > 0) { inbuffer[n] = 0; *buff += inbuffer; return true; } else return false; } void Send(string &send_string) override { send(_sockfd, send_string.c_str(),send_string.size(),0); } private: int _sockfd; string _ip; uint16_t _port; }; }
select只负责等待,不负责处理,最初我们有一个listen_sock需要交给select去管理,当有新链接到来是,listen_sock要去接受新链接,但是接受后,不能立刻read或者write,因为不确定当前事件是否就绪,需要将新链接也交给select管理。
如何将新链接交给select呢?我们得有一个数据结构(这里用的数组),把所有的fd都管理起来,新链接到来时,都可以往这个数组里面添加文件描述符fd。后面select遍历数组,就可以找到需要管理的fd了,但这样,我们需要经常遍历这个数组
- 添加时需要遍历找到空再插入
- select传参,需要遍历查找最大的文件描述符
- select等待成功后调用处理函数时,也需遍历查找就绪的文件描述符
同时,由于select的事件参数是一个输入输出型参数,因此我们每次都得重新对该参数重新赋值。
如下是SelectServer.hpp的核心代码
SelectServer.hpp
#pragma once #include <iostream> #include <string> #include <sys/select.h> #include "Log.hpp" #include "Socket.hpp" using namespace Net_Work; const static int gdefaultport = 8888; const static int gbacklog = 8; const static int num = sizeof(fd_set) * 8; class SelectServer { public: SelectServer(int port) : _port(port), _listensock(new TcpSocket()) { } void HandlerEvent(fd_set rfds) { for (int i = 0; i < num; i++) { if (_rfds_array[i] == nullptr) continue; int fd = _rfds_array[i]->GetSockFd(); // 判断事件是否就绪 if (FD_ISSET(fd, &rfds)) { // 读事件分两类,一类是新链接到来,一类是新数据到来 if (fd == _listensock->GetSockFd()) { // 新链接到来 lg(Info, "get a new link"); // 获取连接 std::string clientip; uint16_t clientport; Socket *sock = _listensock->AcceptSocket(&clientip, &clientport); if (!sock) { lg(Error, "accept error"); return; } lg(Info, "get a client,client info is# %s:%d,fd: %d", clientip.c_str(), clientport, sock->GetSockFd()); // 此时获取连接成功了,但是不能直接read write,sockfd仍需要交给select托管 -- 添加到数组_rfds_array中 int pos = 0; for (; pos < num; pos++) { if (_rfds_array[pos] == nullptr) { _rfds_array[pos] = sock; lg(Info, "get a new link, fd is : %d", sock->GetSockFd()); break; } } if (pos == num) { sock->CloseSocket(); delete sock; lg(Warning, "server is full, be carefull..."); } } else { // 普通的读事件就绪 std::string buffer; bool res = _rfds_array[i]->Recv(&buffer, 1024); if (res) { lg(Info,"client say# %s",buffer.c_str()); buffer+=": 你好呀,同志\n"; _rfds_array[i]->Send(buffer); buffer.clear(); } else { lg(Warning,"client quit ,maybe close or error,close fd: %d",fd); _rfds_array[i]->CloseSocket(); delete _rfds_array[i]; _rfds_array[i] = nullptr; } } } } } void InitServer() { _listensock->BuildListenSocket(_port, gbacklog); for (int i = 0; i < num; i++) { _rfds_array[i] = nullptr; } _rfds_array[0] = _listensock.get(); } void Loop() { _isrunning = true; // 循环重置select需要的rfds while (_isrunning) { // 不能直接获取新链接,因为accpet可能阻塞 // 所有的fd,都要交给select,listensock上面新链接,相当于读事件 // 因此需要将listensock交给select // 遍历数组, 1.找最大的fd 2. 合法的fd添加到rfds集合中 fd_set rfds; FD_ZERO(&rfds); int max_fd = _listensock->GetSockFd(); for (int i = 0; i < num; i++) { if (_rfds_array[i] == nullptr) { continue; } else { // 添加fd到集合中 int fd = _rfds_array[i]->GetSockFd(); FD_SET(fd, &rfds); if (max_fd < fd) // 更新最大值 { max_fd = fd; } } } // 定义时间 struct timeval timeout = {0, 0}; PrintDebug(); // rfds是输入输出型参数,rfds是在select调用返回时,不断被修改,所以每次需要重置rfds int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr); switch (n) { case 0: lg(Info, "select timeout...,last time: %u.%u", timeout.tv_sec, timeout.tv_usec); break; case -1: lg(Error, "select error!!!"); default: // 正常就绪的fd lg(Info, "select success,begin event handler,last time: %u.%u", timeout.tv_sec, timeout.tv_usec); HandlerEvent(rfds); break; } } _isrunning = false; } void Stop() { _isrunning = false; } void PrintDebug() { std::cout << "current select rfds list is :"; for (int i = 0; i < num; i++) { if (_rfds_array[i] == nullptr) continue; else std::cout << _rfds_array[i]->GetSockFd() << " "; } std::cout << std::endl; } private: std::unique_ptr<Socket> _listensock; int _port; bool _isrunning; // select 服务器要被正确设计,需要程序员定义数据结构,来吧所有的fd管理起来 Socket *_rfds_array[num]; };
Main.cc
#include <iostream> #include <memory> #include "SelectServer.hpp" void Usage(char* argv) { std::cout<<"Usage: \n\t"<<argv<<" port\n"<<std::endl; } // ./select_server 8080 int main(int argc,char* argv[]) { if(argc!=2) { Usage(argv[0]); return -1; } uint16_t localport = std::stoi(argv[1]); std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(localport); svr->InitServer(); svr->Loop(); return 0; }
四、select的优缺点
优点:select只负责等待,可以等待多个fd,IO的时候,效率会比较高一些。
缺点:
- 由于select是输入输出型参数,因此我们每次都要对select的参数重新设置。
- 编写代码时,select因为要使用第三方数组,充满了遍历,这可能会影响select的效率。
- 用户到内核,内核到用户,每次select调用和返回,都要对位图重新设置,用户和内核之间,要一直进行数据拷贝。
- select让OS在底层遍历需要关心所有的fd,这也会造成效率低下,这也是为何第一个参数需要传入max_fd + 1,就是因为select的底层需要遍历。
- fd_set 是系统提供的类型,fd_set大小是固定的,就意味着位图的个数是固定的,也就是select最多能够检测到fd的总数是有上限的。