TCP客户端connect断线重连

avatar
作者
筋斗云
阅读量:0

文章目录


img

TCP客户端connect断线重连

1、为什么要断线重连

客户端会面临服务器崩溃的情况,我们可以试着写一个客户端重连的代码,模拟并理解一些客户端行为,比如游戏客户端等.

考虑到下面2种情况:

  • 服务器故障突然断开连接,过几秒又恢复连接,那么在这等待的几秒,客户端可以重新发起连接。
  • 客户端wifi突然断了,也就是没网,然后过几秒又恢复了,客户端又可以重新发起连接了。

2、实现代码

代码主要实现细节:

  1. 枚举类型Status:

    • 用于表示连接状态,包括NEW, CONNECTED, CONNECTING, DISCONNECTED, CLOSE
  2. 全局常量:

    • defaultsockfd,默认套接字文件描述符为-1。

    • retryinterval,重连间隔时间为1秒。

    • retryamxtimes,重连最大次数为5次。

  3. Connection类:

    • 构造函数初始化服务器IP、端口、状态、重连间隔和最大重连次数。

    • ConnectStatus()方法返回当前连接状态。

    • Connect()方法负责建立连接,并设置连接状态。

    • Process()方法负责发送和接收数据,处理通信过程中的各种状态变化。

    • ReConnect()方法处理重连逻辑,尝试在连接失败时进行多次重连。

    • DisConnect()方法负责断开连接并重置套接字文件描述符。

  4. TcpClient类:

    • 构造函数初始化连接对象。

    • Execute()方法根据连接状态执行相应操作,包括建立连接、处理数据、重连和断开连接。

  5. Usage()函数:

    • 用于输出使用说明。
  6. main()函数:

    • 检查命令行参数,初始化TcpClient对象并执行连接逻辑。
  • TcpClient.cc文件:
#include <iostream> #include <string> #include <strings.h> #include <unistd.h>  #include "Comm.hpp"  enum class Status {    NEW,    CONNECTED,    CONNECTING,    DISCONNECTED,    CLOSE };  const int defaultsockfd = -1; const int retryinterval = 1; // 重连间隔时间 const int retryamxtimes = 5; // 重连最大次数  class Connection { public:    Connection(std::string serverip, uint16_t serverport)        : _sockfd(defaultsockfd),          _serverip(serverip),          _serverport(serverport),          _status(Status::NEW),          _retry_interval(retryinterval),          _retry_max_times(retryamxtimes)    {    }     Status ConnectStatus()    {        return _status;    }     void Connect()    {        _sockfd = socket(AF_INET, SOCK_STREAM, 0);        if (_sockfd < 0)        {            _status = Status::DISCONNECTED;            exit(CREATE_ERROR);        }        struct sockaddr_in server;        bzero(&server, sizeof(server));        server.sin_family = AF_INET;        server.sin_port = htons(_serverport);         // int inet_pton(int af, const char *src, void *dst);        inet_pton(AF_INET, _serverip.c_str(), &server.sin_addr.s_addr); // 也可以&server.sin_addr,因为sin_addr只有s_addr        int n = ::connect(_sockfd, CONV(&server), sizeof(server));        if (n < 0)        {            DisConnect(); //            _status = Status::DISCONNECTED;            return;        }        std::cout << "connect success" << std::endl;        _status = Status::CONNECTED; // 已连接    }     void Process()    {        while (true)        {            // 发送数据            std::string message;            std::cout << "Please Enter# ";            std::getline(std::cin, message);            int n = send(_sockfd, message.c_str(), message.size(), 0);            if (n > 0)            {                // 接收数据                char buff[1024];                int m = recv(_sockfd, buff, sizeof(buff) - 0, 0);                if (m > 0)                {                    buff[m] = 0;                    std::cout << "Server Echo$ " << buff << std::endl;                }                else                {                    _status = Status::DISCONNECTED; // 接收不成功就重连                    std::cerr << "recv error" << std::endl;                    break;                }            }            else            {                _status = Status::CLOSE; // 发送不成功就退出                std::cerr << "send error" << std::endl;                break;            }        }    }     void ReConnect()    {        _status = Status::CONNECTING;        int cnt = 1;        while (true)        {            Connect();            if (_status == Status::CONNECTED)            {                break;            }            std::cout << "正在重连,重连次数 : " << cnt++ << std::endl;            if (cnt > _retry_max_times)            {                _status = Status::CLOSE; // 重连失败                std::cout << "重连失败,请检查网络.." << std::endl;                break;            }            sleep(_retry_interval);        }    }     void DisConnect()    {        if (_sockfd > defaultsockfd)        {            close(_sockfd);            _sockfd = defaultsockfd;        }    }  private:    int _sockfd;    std::string _serverip;    uint16_t _serverport;    Status _status;    int _retry_interval;    int _retry_max_times; };  class TcpClient { public:    TcpClient(std::string serverip, uint16_t serverport) : _connect(serverip, serverport)    {    }    void Execute()    {        while (true)        {            switch (_connect.ConnectStatus())            {            case Status::NEW:                _connect.Connect();                break;            case Status::CONNECTED:                _connect.Process();                break;            case Status::DISCONNECTED:                _connect.ReConnect();                break;            case Status::CLOSE:                _connect.DisConnect();                return; // 断开连接了,重连不管用了            default:                break;            }        }    }  private:    Connection _connect; };  void Usage() {    std::cout << "Please use format : ./tcp_client serverip serverport" << std::endl; }  int main(int argc, char *argv[]) {    if (argc != 3)    {        Usage();        exit(USAGE_ERROR);    }     std::string serverip = argv[1];    uint16_t serverport = std::stoi(argv[2]);     TcpClient tcpclient(serverip, serverport);    tcpclient.Execute();    return 0; } 

和UDP服务器响应程序一样,客户端发什么就回什么,只不过多了建立连接的步骤。

  • Comm.hpp文件
#pragma once #include "InetAddr.hpp"  enum errorcode {   CREATE_ERROR = 1,   BIND_ERROR,   LISTEN_ERROR,   SEND_ERROR,   RECV_ERROR,   CONNECT_ERROR,   FORK_ERROR,   USAGE_ERROR };  #define CONV(ADDR) ((struct sockaddr *)ADDR)  std::string CombineIpAndPort(InetAddr addr) {   return "[" + addr.Ip() + ":" + std::to_string(addr.Port()) + "] "; } 
  • InetAddr.hpp文件
#pragma once  #include <iostream> #include <sys/types.h> /* See NOTES */ #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h> #include <string>  class InetAddr {   void GetAddress(std::string *ip, uint16_t *port)   {       // char *inet_ntoa(struct in_addr in);       *ip = inet_ntoa(_addr.sin_addr);       *port = ntohs(_addr.sin_port);   }  public:   InetAddr(const struct sockaddr_in &addr) : _addr(addr)   {       GetAddress(&_ip, &_port);   }    std::string Ip()   {       return _ip;   }    uint16_t Port()   {       return _port;   }   bool operator==(InetAddr &addr)   {       return _ip == addr.Ip() && _port == addr.Port();   }    const struct sockaddr_in& GetAddr()   {       return _addr;   }    ~InetAddr() {}  private:   struct sockaddr_in _addr;   std::string _ip;   uint16_t _port; }; 
  • LockGuard.hpp文件
# pragma once  #include <pthread.h>   class LockGuard { public:   LockGuard(pthread_mutex_t *mutex) : _mutex(mutex)   {       pthread_mutex_lock(_mutex); // 构造加锁   }   ~LockGuard()   {       pthread_mutex_unlock(_mutex); // 析构解锁   }  private:   pthread_mutex_t *_mutex; }; 
  • Log.hpp文件
#pragma once  #include <string> #include <iostream> #include <fstream> #include <unistd.h> #include <stdarg.h> #include <sys/types.h> #include "LockGuard.hpp"  using namespace std;  bool isSave = false; // 默认向显示器打印 pthread_mutex_t mutex = PTHREAD_MUTEX_INITIALIZER; #define FILEPATH "./log.txt"  enum level {   DEBUG = 0,   INFO,   WARNING,   ERROR,   FATAL };  void SaveToFile(const string &message) {   ofstream out(FILEPATH, ios_base::app);   if (!out.is_open())       return;   out << message;   out.close(); }  std::string LevelToString(int level) {   switch (level)   {   case DEBUG:       return "Debug";   case INFO:       return "Info";   case WARNING:       return "Warning";   case ERROR:       return "Error";   case FATAL:       return "Fatal";   default:       return "Unknow";   } }  std::string GetTimeString() {   time_t curr_time = time(nullptr);   struct tm *format_time = localtime(&curr_time);   if (format_time == nullptr)       return "None";   char buff[1024];   snprintf(buff, sizeof(buff), "%d-%d-%d %d:%d:%d",            format_time->tm_year + 1900,            format_time->tm_mon + 1,            format_time->tm_mday,            format_time->tm_hour,            format_time->tm_min,            format_time->tm_sec);   return buff; }  void LogMessage(const std::string filename, int line, bool issave, int level, const char *format, ...) {   std::string levelstr = LevelToString(level);   std::string timestr = GetTimeString();   pid_t pid = getpid();    char buff[1024];   va_list arg;   // int vsnprintf(char *str, size_t size, const char *format, va_list ap); // 使用可变参数   va_start(arg, format);   vsnprintf(buff, sizeof(buff), format, arg);   va_end(arg);    LockGuard lock(&mutex);   std::string message = "[" + timestr + "]" + "[" + levelstr + "]" + "[pid:" + std::to_string(pid) + "]" + "[" + filename + "]" + "[" + std::to_string(line) + "] " + buff + '\n';   if (issave == false)       std::cout << message;   else       SaveToFile(message); }  // 固定文件名和行数 #define LOG(level, format, ...)                                               \   do                                                                        \   {                                                                         \       LogMessage(__FILE__, __LINE__, isSave, level, format, ##__VA_ARGS__); \   } while (0)  #define EnableScreen()  \   do                  \   {                   \       isSave = false; \   } while (0)  #define EnableFile()   \   do                 \   {                  \       isSave = true; \   } while (0)  void Test(int num, ...) {   va_list arg;   va_start(arg, num);   while (num--)   {       int data = va_arg(arg, int);       std::cout << data << " ";   }   std::cout << std::endl;   va_end(arg); } 
  • Main.cc文件
#include <iostream> #include <memory> #include "TcpServer.hpp"  void Usage() {   // printf("./udp_server serverip serverport\n");   printf("Usage : ./udp_server serverport\n"); // ip 已经设置为0 }  int main(int argc, char *argv[]) {   // if (argc != 3)   if (argc != 2)   {       Usage();       exit(USAGE_ERROR);   }    uint16_t serverport = std::stoi(argv[1]);   std::unique_ptr<TcpServer> tsvr = std::make_unique<TcpServer>(serverport);   tsvr->InitServer();   tsvr->Start();    return 0; } 
  • Makefile文件
.PHONY:all all:tcp_client tcp_server  tcp_client:TcpClient.cc 	g++ -o $@ $^ -std=c++14 -lpthread tcp_server:Main.cc 	g++ -o $@ $^ -std=c++14 -lpthread  .PHONY:clean clean: 	rm -f tcp_server tcp_client 
  • TcpServer.hpp文件
#pragma once  #include <sys/types.h> /* See NOTES */ #include <sys/wait.h> #include <sys/socket.h> #include <arpa/inet.h> #include <netinet/in.h>  #include <error.h> #include <string.h> #include <pthread.h> #include <functional>  #include "Log.hpp" #include "InetAddr.hpp" #include "Comm.hpp" #include "Threadpool.hpp"  const int defaultsockfd = -1; int gbacklog = 16; // 暂时先用  using task_t = std::function<void()>;  // 声明 class TcpServer;  class ThreadData { public:   ThreadData(int sockfd, InetAddr addr, TcpServer *self)       : _sockfd(sockfd), _addr(addr), _self(self) {}   ~ThreadData() = default;  public:   int _sockfd;   InetAddr _addr;   TcpServer *_self; };  class TcpServer { public:   TcpServer(uint16_t port) : _port(port), _listensock(defaultsockfd), _isrunning(false)   {   }    void InitServer()   {       // 创建       _listensock = socket(AF_INET, SOCK_STREAM, 0); // 这个就是文件描述符       if (_listensock < 0)       {           LOG(FATAL, "create sockfd error, error code : %d, error string : %s", errno, strerror(errno));           exit(CREATE_ERROR);       }       LOG(INFO, "create sockfd success");        struct sockaddr_in local;       bzero(&local, sizeof(local));       local.sin_family = AF_INET;       local.sin_port = htons(_port);       local.sin_addr.s_addr = INADDR_ANY;       // 绑定       int n = ::bind(_listensock, CONV(&local), sizeof(local));       if (n < 0)       {           LOG(FATAL, "bind sockfd error, error code : %d, error string : %s", errno, strerror(errno));           exit(BIND_ERROR);       }       LOG(INFO, "bind sockfd success");   }    void Service(int sockfd, InetAddr client)   {       while (true)       {           // TCP是字节流(可以使用write和read接口),UDP是数据报           char buff[1024];           // 接收消息           int n = ::read(sockfd, buff, sizeof(buff)); // bug,接收数据可能收到的不完整,比如1+100,可能先收到1+1,再收到00 -- 按序到达           std::string clientAddr = CombineIpAndPort(client);            if (n > 0)           {               buff[n] = 0;               std::string message = clientAddr + buff;               LOG(INFO, "get message : \n %s", message.c_str());                // 发送消息               int m = ::write(sockfd, buff, strlen(buff));                if (m < 0)               {                   LOG(FATAL, "send message error ,error code : %d , error string : %s", errno, strerror(errno));                   exit(SEND_ERROR);               }           }           else if (n == 0)           {               // 发送端不发送数据了               LOG(INFO, "%s quit", clientAddr.c_str());               break;           }           else           {               LOG(FATAL, "recv message error ,error code : %d , error string : %s", errno, strerror(errno));               exit(RECV_ERROR);           }       }       ::close(sockfd); // 服务结束,关闭文件描述符,避免文件描述符泄漏   }    static void *HandlerService(void *args)   {       pthread_detach(pthread_self()); // 分离线程       ThreadData *td = static_cast<ThreadData *>(args);       td->_self->Service(td->_sockfd, td->_addr);       delete td;       return nullptr;   }    void Start()   {       _isrunning = true;       while (_isrunning)       {           // 监听           int ret = ::listen(_listensock, gbacklog);           if (ret < 0)           {               LOG(FATAL, "listen error, error code : %d , error string : %s", errno, strerror(errno));               exit(LISTEN_ERROR);           }           LOG(INFO, "listen success!");            struct sockaddr_in peer;           socklen_t len = sizeof(peer);           // 获取新连接           int sockfd = accept(_listensock, CONV(&peer), &len); // 建立连接成功,创建新文件描述符进行通信           if (sockfd < 0)           {               LOG(WARNING, "accept error, error code : %d , error string : %s", errno, strerror(errno));               continue;           }           LOG(INFO, "accept success! new sockfd : %d", sockfd);            InetAddr addr(peer); // 给后面提供传入的ip、port           // 服务 -- 发送和接收数据           // V0 -- 单进程           // Service(sockfd, addr); // 这里是while死循环,没有运行完就一直运行,下一个请求来的时候得这个while退出才能执行            // v1 -- 多进程           // int id = fork();           // if (id == 0)           // {           //     // 子进程           //     ::close(_listensock); // 子进程对监听文件描述符不关心           //     if (fork() > 0)           //         exit(0);           // 子进程创建进程后退出,孙子进程被系统领养,不用等待           //     Service(sockfd, addr); // 孙子进程执行任务           //     exit(0);           // }           // else if (id > 0)           // {           //     ::close(sockfd); // 这里每次关闭的文件描述符都是4,使得每次accept创建的文件描述符都是4,这个4是留个各个子进程(子进程再给孙子进程)的(互不影响)           //     // 父进程           //     pid_t rid = waitpid(id, nullptr, 0); // 虽然是阻塞等待,但是子进程是刚创建就退出来了,让孙子进程(孤儿进程)执行任务,接下来继续监听和建立连接           //     if (rid == id)           //     {           //         LOG(INFO, "wait child process success");           //     }           // }           // else           // {           //     // 异常           //     LOG(FATAL, "fork error ,error code : %d , error string : %s", errno, strerror(errno));           //     exit(FORK_ERROR);           // }            // v2 -- 多线程           // pthread_t tid;           // ThreadData *td = new ThreadData(sockfd, addr, this); // 传指针           // pthread_create(&tid, nullptr, HandlerService, td); // 这里创建线程后,线程去做执行任务,主线程继续向下执行 , 并且线程不能关闭sockf,线程和进程共享文件描述符表            // v3 -- 线程池           task_t t = std::bind(&TcpServer::Service, this, sockfd, addr);           Threadpool<task_t>::GetInstance()->Enqueue(t);            // v4 -- 进程池 -- 不推荐,需要传递文件描述符!       }       _isrunning = false;   }    ~TcpServer()   {       if (_listensock > defaultsockfd)           ::close(_listensock); // 不用了关闭监听   }  private:   uint16_t _port;   int _listensock;   bool _isrunning; }; 
  • Thread.hpp文件
#ifndef __THREAD_HPP__ #define __THREAD_HPP__  #include <iostream> #include <string> #include <unistd.h> #include <functional> #include <pthread.h>  using namespace std;  // 封装Linux线程 namespace ThreadModule {   using func_t = function<void(string &)>;   class Thread   {   public:       // /* ThreadData* */Thread(func_t<T> func, T data, const string& name = "default name") : _func(func), _data(data), _threadname(name), _stop(true) {}       Thread(func_t func, const string &name = "default name") : _func(func), _threadname(name), _stop(true) {}        void Execute()       {           _func(_threadname);            // _func(_data);       }        //  隐含this       static void *threadroutine(void *arg)       {           Thread *self = static_cast<Thread *>(arg);           self->Execute(); // static 访问不了成员变量           return nullptr;       }       bool Start()       {           int n = pthread_create(&_tid, nullptr, threadroutine, this);           if (!n)           {               _stop = false;               return true;           }           else           {               return false;           }       }        void Detach()       {           if (!_stop)           {               pthread_detach(_tid);           }       }        void Join()       {           if (!_stop)           {               pthread_join(_tid, nullptr);           }       }        string name()       {           return _threadname;       }        void Stop()       {           _stop = true;       }       // ~Thread() {}    private:       pthread_t _tid;       string _threadname;       func_t _func;       bool _stop;   };  } // namespace ThreadModule  #endif 
  • Threadpool.hpp文件
#pragma once  #include <vector> #include <queue> #include <queue> #include "Thread.hpp" #include <pthread.h> #include "LockGuard.hpp"  using namespace ThreadModule;  const int NUM = 3;  template <typename T> class Threadpool {   void LockQueue(pthread_mutex_t &mutex)   {       pthread_mutex_lock(&mutex);   }    void UnLockQueue(pthread_mutex_t &mutex)   {       pthread_mutex_unlock(&mutex);   }    void SleepThread(pthread_cond_t &cond, pthread_mutex_t &mutex)   {       pthread_cond_wait(&cond, &mutex);   }   void WakeUpThread(pthread_cond_t &cond)   {       pthread_cond_signal(&cond);   }   void WakeUpAll(pthread_cond_t &cond)   {       pthread_cond_broadcast(&_cond);   }   Threadpool(const int threadnum = NUM) : _threadnum(threadnum), _waitnum(0), _isrunning(false)   {       pthread_mutex_init(&_mutex, nullptr);       pthread_cond_init(&_cond, nullptr);       LOG(INFO, "Threadpool Constructor successful ! ");   }    void TaskHandler(string &name)   {       // sleep(1);       // cout  << name << " : hh " << endl;       // sleep(1);       LOG(DEBUG, "%s is running", name.c_str());       while (true)       {           LockQueue(_mutex);           while (_task_queue.empty() && _isrunning)           {               // 等待               ++_waitnum;               SleepThread(_cond, _mutex);               --_waitnum;           }           // 此时一定大于一个线程没有休眠           if (_task_queue.empty() && !_isrunning)           {               // 此时任务队列已经没有内容,且此时线程池已经停止               UnLockQueue(_mutex);               cout << name << " quit ... " << endl;               break;           }           LOG(DEBUG, "%s get task sucessful !", name.c_str());           //  其他情况就得处理任务           T t = _task_queue.front();           _task_queue.pop();           UnLockQueue(_mutex);            // 处理任务           t();           // cout << name << " : " << t.stringResult() << endl;           // LOG(DEBUG, "%s handler task sucessful ! Result is %s", name.c_str(), t.stringResult().c_str());           sleep(1);       }   }   void InitThreadPool()   {       for (int i = 0; i < _threadnum; ++i)       {           string name = "Thread - " + to_string(i + 1);           _threads.emplace_back(bind(&Threadpool::TaskHandler, this, placeholders::_1), name);       }       _isrunning = true;       LOG(INFO, "Init Threadpool successful !");   }  public:   static Threadpool<T> *GetInstance(int threadnum = NUM)   {        if (_instance == nullptr)       {           LockGuard lockguard(&_lock);           if (_instance == nullptr)           {               // pthread_mutex_lock(&_lock);               // 第一次创建线程池               _instance = new Threadpool<T>(threadnum);               _instance->InitThreadPool();               _instance->Start();               LOG(DEBUG, "第一次创建线程池");               // pthread_mutex_unlock(&_lock);               return _instance;           }       }        LOG(DEBUG, "获取线程池");       return _instance;   }    bool Enqueue(const T &in)   {       bool ret = false;       LockQueue(_mutex);       if (_isrunning)       {           _task_queue.push(in);           if (_waitnum > 0)               WakeUpThread(_cond);           LOG(DEBUG, "enqueue sucessful...");           ret = true;       }        UnLockQueue(_mutex);       return ret;   }    void Stop()   {       LockQueue(_mutex);       _isrunning = false;       if (_waitnum > 0)           WakeUpAll(_cond);       UnLockQueue(_mutex);   }    void Start()   {       for (auto &thread : _threads)       {           thread.Start();           LOG(INFO, "%s is start sucessful...", thread.name().c_str());       }   }    void Wait()   {       for (auto &thread : _threads)       {           thread.Join();           LOG(INFO, "%s is quit...", thread.name().c_str());       }   }   ~Threadpool()   {       pthread_mutex_destroy(&_mutex);       pthread_cond_destroy(&_cond);       LOG(INFO, "delete mutex sucessful !");   }  private:   vector<Thread> _threads;   queue<T> _task_queue;   int _threadnum;   int _waitnum;    pthread_mutex_t _mutex; // 互斥访问任务队列   pthread_cond_t _cond;    bool _isrunning;    // 懒汉模式   static Threadpool<T> *_instance;   static pthread_mutex_t _lock; };  template <typename T> Threadpool<T> *Threadpool<T>::_instance = nullptr; template <typename T> pthread_mutex_t Threadpool<T>::_lock = PTHREAD_MUTEX_INITIALIZER; 

运行结果:


OKOK,TCP客户端connect短线重连就到这里,如果你对Linux和C++也感兴趣的话,可以看看我的主页哦。下面是我的github主页,里面记录了我的学习代码和leetcode的一些题的题解,有兴趣的可以看看。

Xpccccc的github主页

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!