Linux——多路复用之select

avatar
作者
筋斗云
阅读量:0

目录

前言

一、select的认识

二、select的接口

三、select的使用

四、select的优缺点


前言

在前面,我们学习了五种IO模型,对IO有了基本的认识,知道了select效率很高,可以等待多个文件描述符,那他是如何等待的呢?我们又该如何使用呢?

一、select的认识

系统提供select函数来实现多路复用输入/输出模型

  • select系统调用是用来让我们的程序监视多个文件描述符的状态变化的
  • 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变

select只负责等待,不负责拷贝,一次可以等待多个文件描述符。他的作用是让read和write不再阻塞

二、select的接口

select的调用接口如下

参数 1 int nfds:值最大的文件描述符+1。

参数 2 fd_set* readfds:fd_set本质是一张位图。代表select需要关心的读事件

参数 3 fd_set* writefds:代表select需要关心的读事件

参数 4 fd_set* execptfdsfds:代表select需要关心的异常事件,我们暂时不考虑

参数 5 struct timeval* timeout:时间结构体,成员有秒和微秒,代表等待的时间

                                                  {n,m}为阻塞等待n秒m微秒,时间结束后返回

                                                  {0,0}为非阻塞等待

                                                  nullptr为阻塞等待

参数2,3,4类似,都是输入输出型参数,参数5也是输入输出型参数,输出的是剩余时间

以readfds为例

输入时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),用户关心内核,是否关心这个fd的读事件。

输出时:比特位的位置,表示文件描述符的值,比特位的内容(0/1),内核告诉用户,哪些文件fd上的读事件是否就绪

返回值:

  1. ret  >  0 :select等待的多个fd中,已经就需要的fd个数
  2. ret == 0 :select超时返回
  3. ret  <  0 :select出错

同时,fd_set 是特定的类型,我们对其赋值时,是不方便赋值的,因此库里面也给提供的一个函数,方便我们处理。

FD_CLR                    从文件描述符集合 set 中清除文件描述符 fd。

FD_ISSET                 检查文件描述符 fd 是否在文件描述符集合 set 中。

FD_SET                    将文件描述符 fd 添加到文件描述符集合 set 中。

FD_ZERO                 清空文件描述符集合 set,将其所有位都设置为零。

三、select的使用

Log.hpp

#pragma once  #include <iostream> #include <cstdarg> #include <unistd.h> #include <sys/stat.h> #include <sys/types.h> #include <pthread.h> using namespace std;  enum {     Debug = 0,     Info,     Warning,     Error,     Fatal };  enum {     Screen = 10,     OneFile,     ClassFile };  string LevelToString(int level) {     switch (level)     {     case Debug:         return "Debug";     case Info:         return "Info";     case Warning:         return "Warning";     case Error:         return "Error";     case Fatal:         return "Fatal";      default:         return "Unknown";     } }  const int default_style = Screen; const string default_filename = "Log."; const string logdir = "log";  class Log { public:     Log(int style = default_style, string filename = default_filename)         : _style(style), _filename(filename)     {         if (_style != Screen)             mkdir(logdir.c_str(), 0775);     }      // 更改打印方式     void Enable(int style)     {         _style = style;         if (_style != Screen)             mkdir(logdir.c_str(), 0775);     }      // 时间戳转化为年月日时分秒     string GetTime()     {         time_t currtime = time(nullptr);         struct tm *curr = localtime(&currtime);         char time_buffer[128];         snprintf(time_buffer, sizeof(time_buffer), "%d-%d-%d %d:%d:%d",                  curr->tm_year + 1900, curr->tm_mon + 1, curr->tm_mday, curr->tm_hour, curr->tm_min, curr->tm_sec);         return time_buffer;     }      // 写入到文件中     void WriteLogToOneFile(const string &logname, const string &message)     {         FILE *fp = fopen(logname.c_str(), "a");         if (fp == nullptr)         {             perror("fopen failed");             exit(-1);         }         fprintf(fp, "%s\n", message.c_str());          fclose(fp);     }      // 打印日志     void WriteLogToClassFile(const string &levelstr, const string &message)     {         string logname = logdir;         logname += "/";         logname += _filename;         logname += levelstr;         WriteLogToOneFile(logname, message);     }      pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;     void WriteLog(const string &levelstr, const string &message)     {         pthread_mutex_lock(&lock);         switch (_style)         {         case Screen:             cout << message << endl; // 打印到屏幕中             break;         case OneFile:             WriteLogToClassFile("all", message); // 给定all,直接写到all里             break;         case ClassFile:             WriteLogToClassFile(levelstr, message); // 写入levelstr里             break;         default:             break;         }         pthread_mutex_unlock(&lock);     }      // 提供接口给运算符重载使用     void _LogMessage(int level, const char *file, int line, char *rightbuffer)     {         char leftbuffer[1024];         string levelstr = LevelToString(level);         string currtime = GetTime();         string  idstr = to_string(getpid());          snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%s][%s][%s:%d]", levelstr.c_str(), currtime.c_str(), idstr.c_str(), file, line);          string messages = leftbuffer;         messages += rightbuffer;         WriteLog(levelstr, messages);     }      // 运算符重载     void operator()(int level, const char *file, int line, const char *format, ...)     {         char rightbuffer[1024];         va_list args;                                              // va_list 是指针         va_start(args, format);                                    // 初始化va_list对象,format是最后一个确定的参数         vsnprintf(rightbuffer, sizeof(rightbuffer), format, args); // 写入到rightbuffer中         va_end(args);         _LogMessage(level, file, line, rightbuffer);     }      ~Log()     {     }  private:     int _style;     string _filename; };  Log lg;  class Conf { public:     Conf()     {         lg.Enable(Screen);     }     ~Conf()     {     } };  Conf conf;  // 辅助宏 #define lg(level, format, ...) lg(level, __FILE__, __LINE__, format, ##__VA_ARGS__) 

Socket.hpp 

#pragma once  #include <iostream> #include <string> #include <sys/types.h> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <cstring> #include <unistd.h> using namespace std; namespace Net_Work {     static const int default_backlog = 5;     static const int default_sockfd = -1;     using namespace std;      enum     {         SocketError = 1,         BindError,         ListenError,         ConnectError,     };      // 封装套接字接口基类     class Socket     {     public:         // 封装了socket相关方法         virtual ~Socket() {}         virtual void CreateSocket() = 0;         virtual void BindSocket(uint16_t port) = 0;         virtual void ListenSocket(int backlog) = 0;         virtual bool ConnectSocket(string &serverip, uint16_t serverport) = 0;         virtual Socket *AcceptSocket(string *peerip, uint16_t *peerport) = 0;         virtual int GetSockFd() = 0;         virtual void SetSockFd(int sockfd) = 0;         virtual void CloseSocket() = 0;         virtual bool Recv(string *buff, int size) = 0;         virtual void Send(string &send_string) = 0;          // 方法的集中在一起使用     public:         void BuildListenSocket(uint16_t port, int backlog = default_backlog)         {             CreateSocket();             BindSocket(port);             ListenSocket(backlog);         }          bool BuildConnectSocket(string &serverip, uint16_t serverport)         {             CreateSocket();             return ConnectSocket(serverip, serverport);         }          void BuildNormalSocket(int sockfd)         {             SetSockFd(sockfd);         }     };      class TcpSocket : public Socket     {     public:         TcpSocket(int sockfd = default_sockfd)             : _sockfd(sockfd)         {         }         ~TcpSocket() {}          void CreateSocket() override         {             _sockfd = socket(AF_INET, SOCK_STREAM, 0);             if (_sockfd < 0)                 exit(SocketError);         }         void BindSocket(uint16_t port) override         {             int opt = 1;             setsockopt(_sockfd, SOL_SOCKET, SO_REUSEADDR | SO_REUSEPORT, &opt, sizeof(opt));              struct sockaddr_in local;             memset(&local, 0, sizeof(local));             local.sin_family = AF_INET;             local.sin_port = htons(port);             local.sin_addr.s_addr = INADDR_ANY;              int n = bind(_sockfd, (struct sockaddr *)&local, sizeof(local));             if (n < 0)                 exit(BindError);         }         void ListenSocket(int backlog) override         {             int n = listen(_sockfd, backlog);             if (n < 0)                 exit(ListenError);         }         bool ConnectSocket(string &serverip, uint16_t serverport) override         {             struct sockaddr_in addr;             memset(&addr, 0, sizeof(addr));             addr.sin_family = AF_INET;             addr.sin_port = htons(serverport);             // addr.sin_addr.s_addr = inet_addr(serverip.c_str());             inet_pton(AF_INET, serverip.c_str(), &addr.sin_addr);             int n = connect(_sockfd, (sockaddr *)&addr, sizeof(addr));              if (n == 0)                 return true;             return false;         }         Socket *AcceptSocket(string *peerip, uint16_t *peerport) override         {             struct sockaddr_in addr;             socklen_t len = sizeof(addr);             int newsockfd = accept(_sockfd, (sockaddr *)&addr, &len);             if (newsockfd < 0)                 return nullptr;              // *peerip = inet_ntoa(addr.sin_addr);              // INET_ADDRSTRLEN 是一个定义在头文件中的宏,表示 IPv4 地址的最大长度             char ip_str[INET_ADDRSTRLEN];             inet_ntop(AF_INET, &addr.sin_addr, ip_str, INET_ADDRSTRLEN);             *peerip = ip_str;              *peerport = ntohs(addr.sin_port);              Socket *s = new TcpSocket(newsockfd);             return s;         }         int GetSockFd() override         {             return _sockfd;         }         void SetSockFd(int sockfd) override         {             _sockfd = sockfd;         }         void CloseSocket() override         {             if (_sockfd > default_sockfd)                 close(_sockfd);         }          bool Recv(string *buff, int size) override         {             char inbuffer[size];             ssize_t n = recv(_sockfd, inbuffer, size - 1, 0);             if (n > 0)             {                 inbuffer[n] = 0;                 *buff += inbuffer;                 return true;             }             else                 return false;         }          void Send(string &send_string) override         {             send(_sockfd, send_string.c_str(),send_string.size(),0);         }      private:         int _sockfd;         string _ip;         uint16_t _port;     }; }

        select只负责等待,不负责处理,最初我们有一个listen_sock需要交给select去管理,当有新链接到来是,listen_sock要去接受新链接,但是接受后,不能立刻read或者write,因为不确定当前事件是否就绪,需要将新链接也交给select管理

        如何将新链接交给select呢?我们得有一个数据结构(这里用的数组),把所有的fd都管理起来,新链接到来时,都可以往这个数组里面添加文件描述符fd。后面select遍历数组,就可以找到需要管理的fd了,但这样,我们需要经常遍历这个数组

  1. 添加时需要遍历找到空再插入
  2. select传参,需要遍历查找最大的文件描述符
  3. select等待成功后调用处理函数时,也需遍历查找就绪的文件描述符

        同时,由于select的事件参数是一个输入输出型参数,因此我们每次都得重新对该参数重新赋值。

如下是SelectServer.hpp的核心代码 

SelectServer.hpp

#pragma once #include <iostream> #include <string> #include <sys/select.h> #include "Log.hpp" #include "Socket.hpp"  using namespace Net_Work; const static int gdefaultport = 8888; const static int gbacklog = 8; const static int num = sizeof(fd_set) * 8;  class SelectServer { public:     SelectServer(int port) : _port(port), _listensock(new TcpSocket())     {     }     void HandlerEvent(fd_set rfds)     {         for (int i = 0; i < num; i++)         {             if (_rfds_array[i] == nullptr)                 continue;              int fd = _rfds_array[i]->GetSockFd();             // 判断事件是否就绪             if (FD_ISSET(fd, &rfds))             {                 // 读事件分两类,一类是新链接到来,一类是新数据到来                 if (fd == _listensock->GetSockFd())                 {                     // 新链接到来                     lg(Info, "get a new link");                     // 获取连接                     std::string clientip;                     uint16_t clientport;                     Socket *sock = _listensock->AcceptSocket(&clientip, &clientport);                     if (!sock)                     {                         lg(Error, "accept error");                         return;                     }                     lg(Info, "get a client,client info is# %s:%d,fd: %d", clientip.c_str(), clientport, sock->GetSockFd());                     // 此时获取连接成功了,但是不能直接read write,sockfd仍需要交给select托管 -- 添加到数组_rfds_array中                     int pos = 0;                     for (; pos < num; pos++)                     {                         if (_rfds_array[pos] == nullptr)                         {                             _rfds_array[pos] = sock;                             lg(Info, "get a new link, fd is : %d", sock->GetSockFd());                             break;                         }                     }                     if (pos == num)                     {                         sock->CloseSocket();                         delete sock;                         lg(Warning, "server is full, be carefull...");                     }                 }                 else                 {                     // 普通的读事件就绪                     std::string buffer;                     bool res = _rfds_array[i]->Recv(&buffer, 1024);                     if (res)                     {                         lg(Info,"client say# %s",buffer.c_str());                         buffer+=": 你好呀,同志\n";                         _rfds_array[i]->Send(buffer);                         buffer.clear();                     }                     else                     {                         lg(Warning,"client quit ,maybe close or error,close fd: %d",fd);                         _rfds_array[i]->CloseSocket();                         delete _rfds_array[i];                         _rfds_array[i] = nullptr;                     }                 }             }         }     }     void InitServer()     {         _listensock->BuildListenSocket(_port, gbacklog);         for (int i = 0; i < num; i++)         {             _rfds_array[i] = nullptr;         }         _rfds_array[0] = _listensock.get();     }      void Loop()     {         _isrunning = true;         // 循环重置select需要的rfds         while (_isrunning)         {             // 不能直接获取新链接,因为accpet可能阻塞             // 所有的fd,都要交给select,listensock上面新链接,相当于读事件             // 因此需要将listensock交给select              // 遍历数组, 1.找最大的fd  2. 合法的fd添加到rfds集合中             fd_set rfds;             FD_ZERO(&rfds);             int max_fd = _listensock->GetSockFd();             for (int i = 0; i < num; i++)             {                 if (_rfds_array[i] == nullptr)                 {                     continue;                 }                 else                 {                     // 添加fd到集合中                     int fd = _rfds_array[i]->GetSockFd();                     FD_SET(fd, &rfds);                     if (max_fd < fd) // 更新最大值                     {                         max_fd = fd;                     }                 }             }              // 定义时间             struct timeval timeout = {0, 0};              PrintDebug();              // rfds是输入输出型参数,rfds是在select调用返回时,不断被修改,所以每次需要重置rfds             int n = select(max_fd + 1, &rfds, nullptr, nullptr, /*&timeout*/ nullptr);             switch (n)             {             case 0:                 lg(Info, "select timeout...,last time: %u.%u", timeout.tv_sec, timeout.tv_usec);                 break;             case -1:                 lg(Error, "select error!!!");             default:                 // 正常就绪的fd                 lg(Info, "select success,begin event handler,last time: %u.%u", timeout.tv_sec, timeout.tv_usec);                 HandlerEvent(rfds);                  break;             }         }         _isrunning = false;     }      void Stop()     {         _isrunning = false;     }      void PrintDebug()     {         std::cout << "current select rfds list is :";         for (int i = 0; i < num; i++)         {             if (_rfds_array[i] == nullptr)                 continue;             else                 std::cout << _rfds_array[i]->GetSockFd() << " ";         }         std::cout << std::endl;     }  private:     std::unique_ptr<Socket> _listensock;     int _port;     bool _isrunning;      // select 服务器要被正确设计,需要程序员定义数据结构,来吧所有的fd管理起来     Socket *_rfds_array[num]; };

 Main.cc

#include <iostream> #include <memory> #include "SelectServer.hpp"  void Usage(char* argv) {          std::cout<<"Usage: \n\t"<<argv<<" port\n"<<std::endl; } // ./select_server 8080 int main(int argc,char* argv[]) {     if(argc!=2)     {         Usage(argv[0]);         return -1;     }     uint16_t localport = std::stoi(argv[1]);     std::unique_ptr<SelectServer> svr = std::make_unique<SelectServer>(localport);     svr->InitServer();     svr->Loop();      return 0; }

四、select的优缺点

优点:select只负责等待,可以等待多个fd,IO的时候,效率会比较高一些。

缺点:

  1. 由于select是输入输出型参数,因此我们每次都要对select的参数重新设置。
  2. 编写代码时,select因为要使用第三方数组,充满了遍历,这可能会影响select的效率。
  3. 用户到内核,内核到用户,每次select调用和返回,都要对位图重新设置,用户和内核之间,要一直进行数据拷贝。
  4. select让OS在底层遍历需要关心所有的fd,这也会造成效率低下,这也是为何第一个参数需要传入max_fd + 1,就是因为select的底层需要遍历。
  5. fd_set 是系统提供的类型,fd_set大小是固定的,就意味着位图的个数是固定的,也就是select最多能够检测到fd的总数是有上限的。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!