[计网底层小探索]:实现并部署多线程并发Tcp服务器框架(基于生产者消费者模型的线程池结构)

avatar
作者
猴君
阅读量:1

在这里插入图片描述

文章目录


在这里插入图片描述

一.网络层与传输层协议

  • 网络层与传输层内置于操作系统的内核中,网络层一般使用ip协议,传输层常用协议为Tcp协议和Udp协议,Tcp协议和Udp协议拥有各自的特点和应用场景:
    在这里插入图片描述

sockaddr结构体继承体系(Linux体系)

  • sockaddr_in结构体用于存储网络通信主机进程的ip和端口号等信息
    在这里插入图片描述

贯穿计算机系统的网络通信架构图示:

在这里插入图片描述

二.实现并部署多线程并发Tcp服务器框架

小项目的完整文件的gittee链接

  • Tcp服务器架构:
    在这里插入图片描述

线程池模块

#pragma once #include <iostream> #include <pthread.h> #include "log.hpp" #include <semaphore.h> #include <vector> #include <cstdio>  template<class T> class RingQueue{ private:     pthread_mutex_t Clock_;     pthread_mutex_t Plock_;     sem_t Psem_;     sem_t Csem_;     std::vector<T> Queue_;     int Pptr_;     int Cptr_;     int capacity_; public:     RingQueue(int capacity = 10) : Queue_(capacity),Pptr_(0),Cptr_(0),capacity_(capacity){         sem_init(&Psem_,0,capacity);         sem_init(&Csem_,0,0);         pthread_mutex_init(&Clock_,nullptr);         pthread_mutex_init(&Plock_,nullptr);     }     ~RingQueue(){         sem_destroy(&Psem_);         sem_destroy(&Csem_);         pthread_mutex_destroy(&Clock_);         pthread_mutex_destroy(&Plock_);     }     T Pop(){         sem_wait(&Csem_);         pthread_mutex_lock(&Clock_);         T tem = Queue_[Cptr_];         Cptr_++;         Cptr_ %= capacity_;         pthread_mutex_unlock(&Clock_);         sem_post(&Psem_);         return tem;     }     void Push(T t){         sem_wait(&Psem_);         pthread_mutex_lock(&Plock_);         Queue_[Pptr_] = t;         Pptr_++;         Pptr_%= capacity_;         pthread_mutex_unlock(&Plock_);         sem_post(&Csem_);     } }; 
#pragma once #include "sem_cp.cpp" #include <pthread.h> #include <iostream> #include <string> #include <mutex> #include "CalTask.cpp"  template<class Task> class Thread_Pool{     struct Thread_Data{         int Thread_num;         pthread_t tid;     }; private:     RingQueue<Task> Queue_;  //线程安全的环形队列     std::vector<Thread_Data> thread_arr; //管理线程的容器     static std::mutex lock_;            //单例锁     static Thread_Pool<Task> * ptr_;    //单例指针 private:     Thread_Pool(int capacity_Of_queue = 20) : Queue_(capacity_Of_queue){}     Thread_Pool(const Thread_Pool<Task>& Tp) = delete;     Thread_Pool<Task>& operator=(const Thread_Pool<Task> & Tp) = delete; public:     ~Thread_Pool(){}     //获取线程池单例-->注意C++的类模板静态成员函数需要在类体外进行定义     static Thread_Pool<Task> * Getinstance();     //创建多线程     void Create_thread(int thread_num = 10){         Thread_Data T_data;         for(int i = 0 ; i < thread_num ; ++i){             //注意线程池对象的this指针传递给线程             pthread_create(&T_data.tid,nullptr,Routine,this);             T_data.Thread_num = i + 1;             thread_arr.push_back(T_data);         }     }     //线程等待     void Thread_join(){         for(int i = 0 ;i < thread_arr.size() ; ++i){             pthread_join(thread_arr[i].tid,nullptr);         }     }     //向线程池中加入任务     void Push(Task T){         Queue_.Push(T);     }     void Push(Task && T){         Queue_.Push(std::forward<Task>(T));     } private:     //线程函数-->该函数没有在类外调用,所以无须在类体外定义     static void* Routine(void * args){         Thread_Pool<Task> * Pool = static_cast<Thread_Pool<Task> *>(args);         while(true){             std::cout << "Thread prepare to work\n" << std::endl;             Task Thread_Task = Pool->Queue_.Pop();             //要求Task类重载()-->用于执行具体任务             Thread_Task();         }         return nullptr;     } };   //初始化静态指针 template<class Task> Thread_Pool<Task> * Thread_Pool<Task>::ptr_ = nullptr; template<class Task> std::mutex Thread_Pool<Task>::lock_;  //注意C++的类模板静态成员函数需要在类体外进行定义 template<class Task> Thread_Pool<Task> * Thread_Pool<Task>::Getinstance(){     if(ptr_ == nullptr){         lock_.lock();         if(ptr_ == nullptr){             ptr_ = new Thread_Pool<Task>;         }         lock_.unlock();     }     return ptr_; } 

序列化反序列化工具模块

  • 序列反序列化是保证通信过程中数据完整性的关键步骤,保证数据语义完整,结构完整

在这里插入图片描述

#pragma once #include <iostream> #include <string>  // 自定义序列化反序列化协议 const std::string blank_space_sep = " "; const std::string protocol_sep = "\n"; //封装报文 std::string Encode(std::string &content){     //报文正文字节数     std::string package = std::to_string(content.size());     package += protocol_sep;     package += content;    //用分隔符封装正文     package += protocol_sep;     return package; }  //解析报文package-->"正文长度"\n"正文"\n bool Decode(std::string &package, std::string& content){     size_t pos = package.find(protocol_sep);     if(pos == std::string::npos) return false;     //解析报文正文长度     size_t Len = std::atoi(package.substr(0,pos).c_str());     //确定报文是否完整     size_t total_Len = pos + Len + 2;     if(package.size() != total_Len) return false;     //获取正文内容     content = package.substr(pos+1,Len);     package.erase(0,total_Len);     return true; }   //用户层协议请求结构体 class Request{ public:     int x;     int y;     char op;  public:     Request(int data1 , int data2 , char op)         : x(data1),y(data2),op(op){}     Request(){} public:     //请求结构体 序列化 成报文正文字符串 "x op y"     bool Serialize(std::string& out){         std::string content = std::to_string(x);         content += blank_space_sep;         content += op;         content += blank_space_sep;         content += std::to_string(y);         out = content;         return true;          // 等价的jason代码         // Json::Value root;         // root["x"] = x;         // root["y"] = y;         // root["op"] = op;         // // Json::FastWriter w;         // Json::StyledWriter w;         // out = w.write(root);         // return true;     }       //报文正文字符串 反序列化 成请求结构体     // "x op y"     bool Deserialize(const std::string &in) {         size_t left = in.find(blank_space_sep);         if(left == std::string::npos)return false;         x = std::stoi(in.substr(0,left).c_str());          std::size_t right = in.rfind(blank_space_sep);         if (right == std::string::npos)return false;         y = std::atoi(in.substr(right + 1).c_str());          if(left + 2 != right) return false;         op = in[left+1];         return true;          // 等价的jason代码         // Json::Value root;         // Json::Reader r;         // r.parse(in, root);         // x = root["x"].asInt();         // y = root["y"].asInt();         // op = root["op"].asInt();         // return true;     }     void DebugPrint()     {         std::cout << "新请求构建完成:  " << x << op << y << "=?" << std::endl;     } };    //用户层协议请求回应结构体 class Response{ public:     int result;     int code;  public:     Response(int res , int c)         : result(res),code(c){}     Response(){} public:     //请求回应结构体 序列化 成报文正文字符串 "result code"     bool Serialize(std::string& out){         std::string s = std::to_string(result);         s += blank_space_sep;         s += std::to_string(code);         out = s;         return true;          // 等价的jason代码         // Json::Value root;         // root["result"] = result;         // root["code"] = code;         // // Json::FastWriter w;         // Json::StyledWriter w;         // out = w.write(root);         // return true;     }      //"result code"     //报文正文字符串 反序列化 成请求回应结构体     bool Deserialize(const std::string &in)      {         std::size_t pos = in.find(blank_space_sep);         if (pos == std::string::npos)return false;         if(pos == 0 || pos == in.size() - 1) return false;         result = std::stoi(in.substr(0, pos).c_str());         code = std::stoi(in.substr(pos+1).c_str());         return true;          // 等价的jason代码         // Json::Value root;         // Json::Reader r;         // r.parse(in, root);         // result = root["result"].asInt();         // code = root["code"].asInt();         // return true;     }     void DebugPrint()     {         std::cout << "结果响应完成, result: " << result << ", code: "<< code << std::endl;     } }; 

通信信道建立模块

#pragma once #include <iostream> #include <string> #include <sys/types.h>    #include <sys/socket.h> #include "log.hpp" #include <memory.h> #include <arpa/inet.h> #include <netinet/in.h>   namespace MySocket{      //Tcp通讯构建器     class TcpServer{         enum{             UsageError = 1,             SocketError,             BindError,             ListenError,         };     private:         int socketfd_ = -1;         std :: string ip_;         uint16_t port_;         int backlog_ = 10;     public:         TcpServer(const std::string& ip = "172.19.29.44", uint16_t port = 8081) : ip_(ip) , port_(port){}         ~TcpServer(){if(socketfd_ > 0) close(socketfd_);}     public:         //确定通信协议,建立文件描述符         void BuildSocket(){             socketfd_ = socket(AF_INET,SOCK_STREAM,0);             if(socketfd_ < 0){                 lg(Fatal,"socket error,%s\n",strerror(errno));                 exit(SocketError);             }         }         //文件描述符与服务器ip : 端口号绑定         void SocketBind(){             struct sockaddr_in addr;             memset(&addr,0,sizeof(addr));             addr.sin_port = htons(port_);             addr.sin_family = AF_INET;             addr.sin_addr.s_addr = inet_addr(ip_.c_str());             if(bind(socketfd_,(const sockaddr*)&addr,sizeof(addr)) < 0){                 lg(Fatal,"socket bind error,%s\n",strerror(errno));                 exit(BindError);             }             lg(Info,"socket bind success\n");         }          //启动服务监听,等待客户端的连接         void Socklisten(){             if(socketfd_ <= 0){                 lg(Fatal,"socket error,%s\n",strerror(errno));                 exit(SocketError);             }             if(listen(socketfd_,backlog_) < 0){                 lg(Fatal, "listen error, %s: %d", strerror(errno), errno);                 exit(ListenError);             }         }         //服务器接收客户端的连接-->并创建用于通信的文件描述符-->一个客户端连接对应一个文件描述符         int SockAccept(std::string& cilent_ip, uint16_t& cilent_port){             struct sockaddr_in client_addr;  // 输出型参数,用于获取用户的ip : 端口号             memset(&client_addr,0,sizeof(client_addr));             socklen_t Len = sizeof(client_addr);             int newfd = accept(socketfd_,(struct sockaddr*)&client_addr,&Len);             if(newfd < 0){                 lg(Warning, "accept error, %s: %d", strerror(errno), errno);                 return -1;             }             //提取客户端信息-->输出参数             char ipstr[64];             cilent_ip = inet_ntop(AF_INET,&client_addr.sin_addr,ipstr,sizeof(ipstr));             cilent_ip = ipstr;             cilent_port = ntohs(client_addr.sin_port);             return newfd;         }       public:         int Get_Server_fd(){             return socketfd_;         }         void Close_fd(){             if(socketfd_ > 0){                 close(socketfd_);                 socketfd_ = -1;             }         }     }; };  

服务器主体模块

在这里插入图片描述

#pragma once #include "ThreadPool.cpp" #include "TcpServer.cpp" #include "CalTask.cpp" #include "log.hpp" #include <signal.h>  //构建计算器服务器 class CalServer{     const int size = 2048; private:     Thread_Pool<CalTask> * Pool_ptr_;     MySocket::TcpServer Socket_;     int Socket_fd_ = -1; public:     CalServer(const std::string& de_ip = "172.19.29.44",uint16_t de_port = 8081)         : Socket_(de_ip,de_port)     {         Pool_ptr_ = Thread_Pool<CalTask>::Getinstance();         if(Pool_ptr_ == nullptr){             lg(Fatal,"Pool_ptr_ is nullptr\n");             return;         }         Pool_ptr_->Create_thread();     }     ~CalServer(){} public:     //建立Tcp连接条件     bool Init(){         Socket_.BuildSocket();         Socket_fd_ = Socket_.Get_Server_fd();         if(Socket_fd_ < 0){             lg(Fatal,"BuildSocket failed\n");             return true;         }         Socket_.SocketBind();         Socket_.Socklisten();         lg(Info, "init server .... done");         return true;     }     //启动服务器     void Start(){         signal(SIGCHLD, SIG_IGN);         signal(SIGPIPE, SIG_IGN);         char ReadBuffer[size];         while(true){             //接受用户请求             std::string client_ip;             uint16_t client_port;             int client_fd = Socket_.SockAccept(client_ip,client_port);             if(client_fd < 0){                 lg(Warning,"SockAccept error\n");                 continue;             }             lg(Info, "accept a new link, sockfd: %d, clientip: %s, clientport: %d", client_fd, client_ip.c_str(), client_port);              int n = read(client_fd,ReadBuffer,sizeof(ReadBuffer));             ReadBuffer[n] = 0;               std::string TaskStr(ReadBuffer);             printf("receives mess from client : %s",ReadBuffer);             if(n < 0){                 lg(Warning,"read error\n");                 break;             }             CalTask task(client_fd,client_ip,client_port,TaskStr);             Pool_ptr_->Push(task);         }     } }; 

任务回调模块(根据具体应用场景可重构)

#pragma once #include <string> #include "ThreadPool.cpp" #include "Protocol.cpp"   enum{     Div_Zero = 1,     Mod_Zero,     Other_Oper };  class CalTask{ private:     int socketfd_;                //网络通信文件描述符     std :: string ip_;            //客户端ip     uint16_t port_;               //客户端端口号     std::string package_;         //客户请求字符串 public:     CalTask(int socketfd,const std::string& ip , uint16_t & port,std::string & str)         : socketfd_(socketfd),ip_(ip),port_(port),package_(str){}     CalTask(){}//类一定要有默认构造函数     ~CalTask(){} public:      //执行计算任务并将结果发送给用户     void operator() (){         std::cout << "Task Running ... \n" << std::endl;          std::string content;         //将用户发送的报文进行解包获取正文         bool r = Decode(package_, content);         if (!r)return;          //将报文正文进行反序列化         Request req;         r = req.Deserialize(content);         if (!r)return ;         req.DebugPrint();          content = "";          //构建计算结果                                  Response resp = CalculatorHelper(req);         resp.DebugPrint();          //计算结果序列化成字符串         resp.Serialize(content);         //字符串正文封装成报文发送给用户         std::string ResStr = Encode(content);         write(socketfd_,ResStr.c_str(),ResStr.size());          if(socketfd_ > 0)close(socketfd_);     }  private:     Response CalculatorHelper(const Request &req){         //构建请求回应结构体         Response resp(0, 0);         switch (req.op){         case '+':             resp.result = req.x + req.y;             break;         case '-':             resp.result = req.x - req.y;             break;         case '*':             resp.result = req.x * req.y;             break;         case '/':{             if (req.y == 0)                 resp.code = Div_Zero;             else                 resp.result = req.x / req.y;         }         break;         case '%':{             if (req.y == 0)                 resp.code = Mod_Zero;             else                 resp.result = req.x % req.y;         }         break;         default:             resp.code = Other_Oper;             break;         }         return resp;     } }; 

Tips:DebugC++代码过程中遇到的问题记录

  • 使用C++类模板时,若在类模板中定义了静态成员函数,且该静态成员函数在类外被调用,则该静态成员函数必须定义在类外,不然链接器无法找到函数体.
  • 注意类模板静态成员的声明格式需要加关键字temlpate<>
  • 声明类模板静态成员无需特化模版类型参数
  • 跨主机并发通信测试:
    在这里插入图片描述
    在这里插入图片描述

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!