文章目录
前言
我们已经学会了Tcp、Udp网络传输协议,并且之前我们也实现了简易的聊天室和翻译器。
我们知道,传输层是OS系统里就给我们写好的, 应用层才是我们自己需要去编写的,现在我们对应用层来进行一些初步的了解。
一、自定义协议
在之前我们使用Tcp和Udp服务进行网络通信,我们一直都是以字符串的形式互相发送消息。
那么我们就只能发送字符串吗? 当然不是,Udp是基于数据流进行网络通信,Udp是基于字节流进行网络通信,虽然我们对字节流和数据流并没有一个特别清晰的认识,但是我们可以知道的是,我们其实是可以传各种各样的类型进行通行的。
这里就提出一种方案
传结构体对象
从我们之前写的代码来看,只要我们在发送数据和接收数据时制定一个协议,每次只读写一个结构体对象的大小,那么我们就可以将该结构体进行通信。 而实际上,底层的很多库也确实是这么做的,但是这并不代表这种方式不存在明显弊端。
最大的弊端就是,你能保证在不同环境之下,同样的结构体类型在不同主机、不同环境下的大小是一样的吗。 结构体的大小涉及到许多,比如说结构体字节对齐,32位和64位系统下内置类型大小可能不同…
所以我们并不推崇这种方案。
而实际上我们其实更推崇传字符串的方式。
序列化和反序列化
什么是序列化?
在现实生活中,我们介绍自己,有些人习惯先说自己的名字,有些人习惯先说自己来自于哪一个城市。 如果你需要去统计大量的人的信息,最好就是先列一个表格,然后让他们严格按照表格上的个人信息顺序去介绍自己,这就是序列化。
再比如说我们今天要写一个网络版本的计算器,我可以写成1+1,也可以写成一加一,再也可以写写成1 + 1,中间带几个空格。 那么这样的话,我们服务器接受到的数据就是形形色色的,不利于我们去解析。
于是我们就制定一个协议,你必须要写成"1 + 1"的形式,否则就是违反协议!
反序列化
反序列化很简单,就比如说我们收到了一段数据,它是一个字符串形式的"1 + 1",我们就需要将该字符串进行解析,反序列化成int x = 1, char op = ‘+’ , int y = 1.
这也是一种反序列化。
二、计算器服务端(线程池版本)
1.main.cc
#include "serverCal.hpp" void Usage(const char *mes) { std::cout << "Usage: " << mes << " port[8080-9000]" << std::endl; } const std::string default_ip = "0.0.0.0"; enum{ Usage_Err = 1 }; int main(int argc, char *argv[]) { if (argc != 2) { Usage("./serverCal"); exit(Usage_Err); } ServerCal sc; sc.Init(AF_INET, default_ip, atoi(argv[1])); sc.Run(); return 0; }
2.Socket.hpp
对套接字进行封装
#pragma once #include <iostream> #include <sys/socket.h> #include <netinet/in.h> #include <arpa/inet.h> #include <string.h> #include "log.hpp" enum { Socket_Err = 1, Bind_Err, Listen_Err }; extern Log lg; const int backlog = 10; class Socket { public: Socket() : _sockfd(-1) { } int Getfd() { return _sockfd; } void Init() { int socket_fd = socket(AF_INET, SOCK_STREAM, 0); if (socket_fd < 0) { lg(Fatal, "Socket Create Failed..."); exit(Socket_Err); } lg(Info, "Socket Create Succeeded..."); _sockfd = socket_fd; } void Bind(const int sinfamily, const std::string &ip, const uint16_t port) { memset(&_sockaddr, 0, sizeof _sockaddr); switch (sinfamily) { case AF_INET: _sockaddr.sin_family = AF_INET; break; case AF_INET6: _sockaddr.sin_family = AF_INET6; break; } _sockaddr.sin_port = htons(port); inet_aton(ip.c_str(), &(_sockaddr.sin_addr)); int n = bind(_sockfd, (const struct sockaddr *)&_sockaddr, sizeof _sockaddr); if (n < 0) { lg(Fatal, "Bind Failed..."); exit(Bind_Err); } lg(Info, "Bind Succeeded..., port: %d", port); } void Listen() { int n = listen(_sockfd, backlog); if (n < 0) { lg(Fatal, "Listen Failed..."); exit(Listen_Err); } lg(Info, "Listen Succeeded..."); } int Accept(struct sockaddr_in *clientsock, socklen_t *len) { int fd = accept(_sockfd, (sockaddr *)clientsock, len); if (fd < 0) { lg(Warning, "Accept Failed..."); return -1; } lg(Info, "Accept Succeeded..., Get A new Link, fd: %d", fd); return fd; } int Connect(const std::string &ip, const std::string &port) { struct sockaddr_in serversock; serversock.sin_port = htons(atoi(port.c_str())); serversock.sin_family = AF_INET; inet_aton(ip.c_str(), &serversock.sin_addr); int n = connect(_sockfd, (const struct sockaddr *)&serversock, sizeof serversock); if (n < 0) { lg(Warning, "Accept Failed..."); return n; } lg(Info, "Connect Succeeded..."); return n; } ~Socket() { close(_sockfd); } private: int _sockfd; struct sockaddr_in _sockaddr; };
3.protocol.hpp
序列化和反序列化的协议制定
#pragma once #include <iostream> #include <string> #include "log.hpp" extern Log lg; const char blank_space_sep = ' '; const char protocol_sep = '\n'; enum Code { Div_Zero_Err = 1, Mod_Zeor_Err, Operatorr_Err }; class Request { public: Request() {} // 提供一个无参构造 Request(int x, int y, char op) : _x(x), _y(y), _operator(op) {} bool serialize(std::string *out_str) { // 协议规定 字符串格式应序列化为"len\n""_x + _y\n" std::string main_body = std::to_string(_x); main_body += blank_space_sep; main_body += _operator; main_body += blank_space_sep; main_body += std::to_string(_y); *out_str = std::to_string(main_body.size()); *out_str += protocol_sep; *out_str += main_body; *out_str += protocol_sep; return true; } bool deserialize(std::string &in_str) { // 协议规定 in_str的格式应为"len\n""_x + _y\n..." size_t pos = in_str.find(protocol_sep); if (pos == std::string::npos) { // 说明没找到'\n' lg(Warning, "Message Format Error..., No Found The First Second \\n"); return false; } std::string sl = in_str.substr(0, pos); int len = std::stoi(sl); // 如果这里的sl不是一串数字,stoi就会抛异常! BUG int total_len = sl.size() + 1 + len + 1; if (in_str.size() < total_len) { lg(Warning, "Message Format Error..., Lenth Error"); return false; } if (in_str[total_len - 1] != '\n') { lg(Warning, "Message Format Error..., No Found The Second \\n"); return false; } std::string main_body = in_str.substr(pos + 1, len); // main_body"_x + _y" int left = main_body.find(blank_space_sep); if (left == std::string::npos) { // 说明没找到' ' lg(Warning, "Message Format Error..., No Found The First ' '"); return false; } int right = main_body.rfind(blank_space_sep); if (left == right) { // 说明只有一个' ' lg(Warning, "Message Format Error...,No Found The Second ' '"); return false; } _x = std::stoi(main_body.substr(0, left)); // 如果这里的sl不是一串数字,stoi就会抛异常! BUG _y = std::stoi(main_body.substr(right + 1)); // 如果这里的sl不是一串数字,stoi就会抛异常! BUG _operator = main_body[left + 1]; in_str.erase(0, total_len); return true; } void print() { std::cout << _x << " " << _operator << " " << _y << std::endl; } ~Request() {} public: int _x; int _y; char _operator; }; class Respond { public: Respond() {} // 提供一个无参构造 Respond(int result, int code) : _result(result), _code(code) {} bool serialize(std::string *out_str) { // 协议规定 字符串格式应序列化为"_result _code" *out_str = std::to_string(_result); *out_str += blank_space_sep; *out_str += std::to_string(_code); return true; } bool deserialize(const std::string &in_str) { // 协议规定 in_str的格式应为"_result _code" size_t pos = in_str.find(blank_space_sep); if (pos == std::string::npos) { // 没找到字符' ' lg(Warning, "Result Message Error..."); return false; } _result = std::stoi(in_str.substr(0, pos)); _code = std::stoi(in_str.substr(pos + 1)); return true; } void print() { std::cout << _result << " " << _code << std::endl; } ~Respond() {} public: int _result; int _code = -1; // 表示结果可信度 0表示可信 };
4.Calculator.hpp
计算器功能接口函数
#pragma once #include"protocol.hpp" class Calculator{ public: Calculator() {} Respond calculate(const Request& rq) { Respond rs; switch (rq._operator) { case '+': rs._result = rq._x + rq._y; break; case '-': rs._result = rq._x - rq._y; break; case '*': rs._result = rq._x * rq._y; break; case '/': if(rq._y == 0) { lg(Warning,"Found Div Zero Error..."); rs._code = Div_Zero_Err; return rs; } rs._result = rq._x / rq._y; break; case '%': if(rq._y == 0) { lg(Warning,"Found Mod Zero Error..."); rs._code = Mod_Zeor_Err; return rs; } rs._result = rq._x - rq._y; break; default: lg(Warning,"Found Operator Error..."); rs._code = Operatorr_Err; return rs; } rs._code = 0; return rs; } };
5.serverCal.hpp
代码如下(示例):
#pragma once #include "Socket.hpp" #include "protocol.hpp" #include "threadPool.hpp" #include "Task.hpp" class ServerCal { public: ServerCal() { } void Init(const int sinfamily, const std::string &ip, const uint16_t port) { _listensock.Init(); _listensock.Bind(sinfamily, ip, port); _listensock.Listen(); } void Run() { ThreadPool<Task> *tp = ThreadPool<Task>::GetInstance(); tp->Start(); struct sockaddr_in client; while (true) { memset(&client, 0, sizeof client); socklen_t len; int socketfd = _listensock.Accept(&client, &len); if (socketfd < 0) continue; tp->Push(socketfd); } } private: Socket _listensock; };
6.threadPool.hpp
很熟悉的线程池封装
#pragma once #include <iostream> #include <vector> #include <string> #include <queue> #include <pthread.h> #include <unistd.h> struct ThreadInfo { pthread_t tid; std::string name; }; static const int defalutnum = 10; template <class T> class ThreadPool { public: void Lock() { pthread_mutex_lock(&mutex_); } void Unlock() { pthread_mutex_unlock(&mutex_); } void Wakeup() { pthread_cond_signal(&cond_); } void ThreadSleep() { pthread_cond_wait(&cond_, &mutex_); } bool IsQueueEmpty() { return tasks_.empty(); } std::string GetThreadName(const pthread_t tid) { for (const auto &ti : threads_) { if (ti.tid == tid) return ti.name; } return "None"; } public: static void *HandlerTask(void *args) { ThreadPool<T> *tp = static_cast<ThreadPool<T> *>(args); std::string name = tp->GetThreadName(pthread_self()); while (true) { tp->Lock(); while (tp->IsQueueEmpty()) { tp->ThreadSleep(); } T t = tp->Pop(); tp->Unlock(); t(); } } void Start() { int num = threads_.size(); for (int i = 0; i < num; i++) { threads_[i].name = "thread-" + std::to_string(i + 1); pthread_create(&(threads_[i].tid), nullptr, HandlerTask, this); } } T Pop() { T t = tasks_.front(); tasks_.pop(); return t; } void Push(const T &t) { Lock(); tasks_.push(t); Wakeup(); Unlock(); } static ThreadPool<T> *GetInstance() { if (nullptr == tp_) // ??? { pthread_mutex_lock(&lock_); if (nullptr == tp_) { // std::cout << "log: singleton create done first!" << std::endl; tp_ = new ThreadPool<T>(); } pthread_mutex_unlock(&lock_); } return tp_; } private: ThreadPool(int num = defalutnum) : threads_(num) { pthread_mutex_init(&mutex_, nullptr); pthread_cond_init(&cond_, nullptr); } ~ThreadPool() { pthread_mutex_destroy(&mutex_); pthread_cond_destroy(&cond_); } ThreadPool(const ThreadPool<T> &) = delete; const ThreadPool<T> &operator=(const ThreadPool<T> &) = delete; // a=b=c private: std::vector<ThreadInfo> threads_; std::queue<T> tasks_; pthread_mutex_t mutex_; pthread_cond_t cond_; static ThreadPool<T> *tp_; static pthread_mutex_t lock_; }; template <class T> ThreadPool<T> *ThreadPool<T>::tp_ = nullptr; template <class T> pthread_mutex_t ThreadPool<T>::lock_ = PTHREAD_MUTEX_INITIALIZER;
7.Task.hpp
派发给线程池的任务
#pragma once #include "Socket.hpp" #include "protocol.hpp" #include "Calculator.hpp" class Task { public: Task(int socket_fd) : _socket_fd(socket_fd) { } void run() { char in_buffer[1024]; Calculator cal; std::string message = ""; while (true) { memset(in_buffer, 0, sizeof in_buffer); //std:: cout << "开始等待读取..." <<std::endl; int n = read(_socket_fd, (void *)in_buffer, sizeof in_buffer - 1); //std::cout << n << " " << strerror(errno) <<std::endl; //std::cout << "读取到的有效字符为" << n << std::endl; if (n == 0) { lg(Warning, "Connection closed by foreign host, socketfd[%d] closed...", _socket_fd); break; } else if (n < 0) { lg(Warning, "Read Error, socketfd[%d]...", _socket_fd); break; } in_buffer[n] = 0; message += in_buffer; //std::cout << "报文大小: "<< message.size() <<" ,报文内容: "<< message << std::endl; Request rq; if(!rq.deserialize(message)) continue; Respond rs = cal.calculate(rq); std::string res; rs.serialize(&res); printf("%d %c %d = %d\n",rq._x,rq._operator,rq._y,rs._result); write(_socket_fd, res.c_str(), res.size()); } } void operator()() { run(); close(_socket_fd); } ~Task() {} private: int _socket_fd; };
8. log.hpp
输出日志消息
#pragma once #include <iostream> #include <time.h> #include <stdarg.h> #include <sys/types.h> #include <sys/stat.h> #include <fcntl.h> #include <unistd.h> #include <stdlib.h> #define SIZE 1024 #define Info 0 #define Debug 1 #define Warning 2 #define Error 3 #define Fatal 4 #define Screen 1 #define Onefile 2 #define Classfile 3 #define LogFile "log.txt" class Log { public: Log() { printMethod = Screen; path = "./log/"; } void Enable(int method) { printMethod = method; } std::string levelToString(int level) { switch (level) { case Info: return "Info"; case Debug: return "Debug"; case Warning: return "Warning"; case Error: return "Error"; case Fatal: return "Fatal"; default: return "None"; } } void printLog(int level, const std::string &logtxt) { switch (printMethod) { case Screen: std::cout << logtxt << std::endl; break; case Onefile: printOneFile(LogFile, logtxt); break; case Classfile: printClassFile(level, logtxt); break; default: break; } } void printOneFile(const std::string &logname, const std::string &logtxt) { std::string _logname = path + logname; int fd = open(_logname.c_str(), O_WRONLY | O_CREAT | O_APPEND, 0666); // "log.txt" if (fd < 0) return; write(fd, logtxt.c_str(), logtxt.size()); close(fd); } void printClassFile(int level, const std::string &logtxt) { std::string filename = LogFile; filename += "."; filename += levelToString(level); // "log.txt.Debug/Warning/Fatal" printOneFile(filename, logtxt); } ~Log() { } void operator()(int level, const char *format, ...) { time_t t = time(nullptr); struct tm *ctime = localtime(&t); char leftbuffer[SIZE]; snprintf(leftbuffer, sizeof(leftbuffer), "[%s][%d-%d-%d %d:%d:%d]", levelToString(level).c_str(), ctime->tm_year + 1900, ctime->tm_mon + 1, ctime->tm_mday, ctime->tm_hour, ctime->tm_min, ctime->tm_sec); va_list s; va_start(s, format); char rightbuffer[SIZE]; vsnprintf(rightbuffer, sizeof(rightbuffer), format, s); va_end(s); // 格式:默认部分+自定义部分 char logtxt[SIZE * 2]; snprintf(logtxt, sizeof(logtxt), "%s %s", leftbuffer, rightbuffer); // printf("%s", logtxt); // 暂时打印 printLog(level, logtxt); } private: int printMethod; std::string path; }; Log lg;
客户端
#include "Socket.hpp" #include "protocol.hpp" void Usage(const char *mes) { std::cout << "Usage: " << mes << " ip[xxx.xxx.xxx.xxx] port[8080-9000]" << std::endl; } int main(int argc, char *argv[]) { if (argc != 3) { Usage("./clientCal"); } Socket local; local.Init(); int n = local.Connect(argv[1], argv[2]); if (n < 0) { return 1; } int localfd = local.Getfd(); std::cout << " 简易计算器, 目前仅支持\" + - * / %\"运算符 " << std::endl; std::cout << " 数字和运算符请用空格或回车隔开" << std::endl; Request rq; Respond rs; std::string message; char buffer[1024]; while (true) { memset(buffer, 0, sizeof buffer); std::cout << "请输入您的算式@ "; std::cin >> rq._x >> rq._operator >> rq._y; rq.serialize(&message); write(localfd, message.c_str(), message.size()); // 开始等待结果 n = read(localfd, buffer, sizeof buffer - 1); if (n == 0) { lg(Warning, "Connection closed by foreign host, socketfd[%d] closed...",localfd); break; } else if (n < 0) { lg(Warning, "Read Error, socketfd[%d]...", localfd); break; } buffer[n] = 0; std::string res = buffer; rs.deserialize(res); if(rs._code != 0) { switch(rs._code) { case 1: std::cout << "出现除0错误" << std::endl; break; case 2: std::cout << "出现模0错误" << std::endl; break; case 3: std::cout << "使用了除 + - * / % 以外的运算符" << std::endl; break; default: std::cout << "发生未知错误" <<std::endl; break; } continue; } printf("%d %c %d = %d\n",rq._x,rq._operator,rq._y,rs._result); } return 0; }
Windows客户端
#include<iostream> #include<string> #include<WinSock2.h> #include<Windows.h> #include<functional> #include<stdlib.h> #include"protocol.hpp" #pragma comment(lib, "ws2_32.lib") #pragma warning(disable:4996) #pragma execution_character_set("utf-8") const int server_port = 8889; const std::string server_ip = "43.143.58.29"; int main() { //初始化网络环境 WSADATA wsd; WSAStartup(MAKEWORD(2, 2), &wsd); system("chcp 65001"); //申请套接字 SOCKET socket_fd = socket(AF_INET, SOCK_STREAM, 0); if (socket_fd == SOCKET_ERROR) { perror("Socket Error"); exit(1); } //创建并初始化Server端sockaddr_in结构体 struct sockaddr_in server; memset(&server, 0, sizeof server); server.sin_family = AF_INET; server.sin_addr.s_addr = inet_addr(server_ip.c_str()); server.sin_port = htons(server_port); //开始连接服务器 int n = connect(socket_fd, (const struct sockaddr*)&server, sizeof server); if (n < 0) { //连接失败 std::cout << "Connect Failed" << std::endl; return 1; } std::cout << " 简易计算器, 目前仅支持\" + - * / %\"运算符 " << std::endl; std::cout << " 数字和运算符请用空格或回车隔开" << std::endl; Request rq; Respond rs; std::string message; char buffer[1024]; while (true) { memset(buffer, 0, sizeof buffer); std::cout << "请输入您的算式@ "; std::cin >> rq._x >> rq._operator >> rq._y; rq.serialize(&message); send(socket_fd, message.c_str(), (int)message.size(),0); // 开始等待结果 n = recv(socket_fd, buffer, sizeof buffer - 1,0); if (n == 0) { lg(Warning, "Connection closed by foreign host, socketfd[%d] closed...", socket_fd); break; } else if (n < 0) { lg(Warning, "Read Error, socketfd[%d]...", socket_fd); break; } buffer[n] = 0; std::string res = buffer; rs.deserialize(res); if (rs._code != 0) { switch (rs._code) { case 1: std::cout << "出现除0错误" << std::endl; break; case 2: std::cout << "出现模0错误" << std::endl; break; case 3: std::cout << "使用了除 + - * / % 以外的运算符" << std::endl; break; default: std::cout << "发生未知错误" << std::endl; break; } continue; } printf("%d %c %d = %d\n", rq._x, rq._operator, rq._y, rs._result); } //清理环境 closesocket(socket_fd); WSACleanup(); return 0; }