C++网络编程之使用Boost库搭建简单的异步服务器

avatar
作者
猴君
阅读量:0

C++网络编程之使用Boost库搭建简单的异步服务器

技术概述

  • 做什么的:C++ Boost库是一个开源的C++软件库集合,旨在提升C++的标准库的性能和功能。在C++编程中,Boost库通常用于解决一些通用但复杂的问题。
  • 应用:网络通信、异步IO操作、跨平台方案、提高完成任务效率。
  • 学习动机:扩展标准库的功能、提高开发效率。
  • 技术难点:API复杂,学习成本高;相对于主流技术,学习资源少。

技术详述

配置

由于主要在Windows下编程练习,所以只介绍Windows下如和配置Boost库。

下载库
  • Boost库官网下载
    在这里插入图片描述

  • 解压后文件夹下有个一个bootstrap.bat文件,双击运行会生成b2.exe
    在这里插入图片描述

  • 然后在boost文件夹下启动cmd,执行 “.\b2.exe toolset=msvc”。执行编译过后,会在stage文件夹下生成lib文件夹,里面就是我们要用到的lib库。

项目中配置Boost库
  • 右键点击项目,选择“属性”打开项目的属性页。在VC++的包含目录中配置boost库的目录位置(就是下载后解压的位置),库目录配置boost库中的stage\lib即可。
    在这里插入图片描述

  • 或者可以打开属性管理器,自建属性页配置(自建属性页配置好后会有一个文件,可以复用到其他项目,自建属性页所在目录要和)
    在这里插入图片描述

注意,自建属性页目录要和当前项目的版本一致,如当前是Debug的x64程序,自建属性页就要在Debug | x64目录下。
在这里插入图片描述

最后的配置过程和配置项目的属性页一样。

使用Boost库

俗话说得好,饭要一口一口吃,所以先从Boost库中网络编程的比较主要和重要的开始使用。演示程序就是一个回显客户端消息的服务器。架构也很简单。下面我将自顶向下的介绍如何完成其编写及其细节。在此之前,必须要注意到的是,异步操作的执行时机总是不确定,所以代码中会大量地用到shared_ptr等智能指针来自动托管一个对象的生命周期。

基本架构

在这里插入图片描述

main
#include "Server.h"  int main(void) {  	try {         //创建一个boost库的IO服务,这是boost使用库进行各种IO操作的核心,io_context的旧版本是io_service 		boost::asio::io_context ioc;         //实例化化Server类 		Server server(ioc, 10086);         //启动IO服务 		ioc.run(); 	} 	catch (std::exception& e) { 		std::cerr << "Exception: " << e.what() << std::endl; 	}  	return 0; } 
Server
头文件
#pragma once #include <boost/asio.hpp> #include <iostream> #include "Session.h" #include <memory> #include <map> #include "Session.h"  class Session;  class Server { public:     //构造函数,引用一个IO服务,同时指定监听的端口 	Server(boost::asio::io_context& ioc, unsigned short port);      //按照Session的uuid删除uuid 	void ClearSession(const std::string& uuid) { 		_sessions.erase(uuid); 	}  private:     //开始一个异步的监听socket连接 	void start_accept();     //处理收到的socket连接 	void handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& ec); 	     //io_context不支持拷贝构造,使用引用方式 	boost::asio::io_context& _ioc;     //用于监听连接 	boost::asio::ip::tcp::acceptor _acceptor;  	//通过智能指针延长Session对象的生命周期 	std::map<std::string, std::shared_ptr<Session>> _sessions;  }; 
定义代码
#include "Server.h" //注意这里的_acceptor的构造方式,接受一个io_context,以及一个端点。这个端点通过一个默认可用的ip和指定端口进行构造。 Server::Server(boost::asio::io_context& ioc, unsigned short port): _ioc(ioc), 	_acceptor(ioc, boost::asio::ip::tcp::endpoint(boost::asio::ip::tcp::v4(), port)) {   	std::cout << "Server starts on success, using port: " << port << std::endl;      //启动监听     //这么说不是一个严谨的说法,稍后将看到为什么 	start_accept();  }  void Server::start_accept() { 	//创建一个Session,注意到这里的构造又用到了io_context 	auto new_session = std::make_shared<Session>(_ioc, this);     //这里_acceptor执行异步的accept函数     //这里做的事情不是accept,而是往io_context中注册一个accept事件,以及这个事件的处理函数 	_acceptor.async_accept( 		new_session->Socket(), //传入Session的socket,事件发生时,_acceptor会初始化成连接进来的socket 		std::bind( //修饰处理收到连接的方法 			&Server::handle_accept, 			this, 			new_session, 			std::placeholders::_1 		) 	); }  void Server::handle_accept(std::shared_ptr<Session> new_session, const boost::system::error_code& ec) { 	if (ec) { 		std::cout << "Error occured in accepting. Errmessage: " << ec.message() << std::endl; 		//delete new_session; 	} 	else { //没有错误         //启动Session 		new_session->Start(); 		_sessions.insert(std::make_pair(new_session->GetUuid(), new_session)); 	} 	     //这里无论成功还是失败,都要重新注册一个监听事件给io_context     //以便能够继续监听连接 	start_accept();  } 
源程序
Session

在处理读写数据时,我们不知道客户端会发多少数据来,同时还要处理数据粘包的问题。所以服务器和服务端之间的通信采用tlv(type length value)的协议进行通信。由于是回显服务器,type可用省略,在处理读到的数据时,要先解析头部,再读取头部看客户端发来多少数据。

头文件
#pragma once #include <iostream> #include <boost/asio.hpp> #include <boost/uuid/uuid_generators.hpp> #include <boost/uuid/uuid_io.hpp> #include "Server.h" #include <mutex> #include <queue> #include "MsgNode.h" #include <iomanip>  class Server;  class MsgNode;  //注意到这里继承了std::enable_shared_from_this<Session> //这样可以使得类内部能够使用shared_from_this()统一获取this指针的shared_ptr,shared_ptr的唯一性,防止不同的shared_ptr托管同一个this指针的资源 class Session: public std::enable_shared_from_this<Session> { public:     //构造函数 	Session(boost::asio::io_context& ioc, Server* server); 	boost::asio::ip::tcp::socket& Socket();      //启动读写 	void Start();      //发送数据 	void Send(char* buf, std::size_t len);      //返回uuid 	const std::string& GetUuid() { 		return _uuid; 	}  	~Session() { 		std::cout << "delete session: uuid=" << _uuid << std::endl; 	}   private:     //处理读事件 	void HandleRead(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> _self_ptr);     //处理写事件 	void HandleWrite(const boost::system::error_code& ec, std::shared_ptr<Session> _self_ptr); 	boost::asio::ip::tcp::socket _socket; 	enum {max_length = 1024};     //接受数据的缓冲区 	char _data[max_length] = {0};      //发送队列 	std::queue<std::shared_ptr<MsgNode>> _send_que;     //队列的锁 	std::mutex _send_lock;      //用于标识连接 	std::string _uuid;     //引用server,以便能够在读写出错时,清除自身的连接 	Server* _server;      //接受消息的节点 	std::shared_ptr<MsgNode> _recv_msg_node;     //标识消息头部是否解析完成 	bool _b_head_parse;     //头部消息节点 	std::shared_ptr<MsgNode> _recv_head_node;  };   
定义代码
#include "Session.h" Session::Session(boost::asio::io_context& ioc, Server* server) : _socket(ioc), 	_server(server), _b_head_parse(false) {     //为Session生成uuid     	boost::uuids::uuid a_uuid = boost::uuids::random_generator()(); 	_uuid = boost::uuids::to_string(a_uuid);     //初始化消息头,通常头的长度是一个short类型的长度 	_recv_head_node = std::make_shared<MsgNode>(HEAD_LENGTH); } //返回使用的socket boost::asio::ip::tcp::socket& Session::Socket() { 	return _socket; }  void Session::Start() {  	//对象创建时已初始化_data     //往io_context注册读事件,区别于另一个async_read函数,这里是能读就返回 	_socket.async_read_some( 		boost::asio::buffer( //构造一个buffer 			_data, 			max_length 		), 		std::bind( //修饰处理 			&Session::HandleRead, 			this, 			std::placeholders::_1, 			std::placeholders::_2, 			//std::make_shared<Session>(this)导致引用计数不同步 			shared_from_this() 		) 	); }  void Session::Send(char* buf, std::size_t len) {      bool pending = false;  	std::lock_guard<std::mutex> guard(_send_lock);     //如果在为加入消息节点之前,队列不为空,说明这个队列的消息正在发送(写完一个消息后,写事件的处理函数还会继续注册写事件,直到队列为空,所以,如果此时有消息,就说明后续还会继续回调写事件,不再需要在这里注册写事件) 	if (_send_que.size() > 0) { 		pending = true; 	}     //往队列投递消息 	_send_que.emplace(std::make_shared<MsgNode>(buf, len)); 	if (pending) { 		return; 	}       //注册写事件 	boost::asio::async_write( 		_socket, 		boost::asio::buffer( 			buf, 			len 		), 		std::bind( 			&Session::HandleWrite, 			this, 			std::placeholders::_1, 			shared_from_this() 		) 	);  }  //由于这里使用async_read_some,一读到数据就返回,所以处理的过程比较繁琐。还可以可以使用async_read,读取指定字节在返回,其内部实现依然是异步的,和async_read_some一样高效。 void Session::HandleRead(const boost::system::error_code& ec, std::size_t bytes_transferred, std::shared_ptr<Session> _self_ptr) { 	if (ec) { 		std::cout << "Error occured in reading. ErrMessage: " << ec.message() << std::endl; 	 		_server->ClearSession(_uuid); 	} 	else {  		int copy_len = 0; 		while (bytes_transferred > 0) { 			if (!_b_head_parse) { 				//收到的数据不足头部大小 				if (bytes_transferred + _recv_head_node->_cur_len < HEAD_LENGTH) { 					memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, bytes_transferred); 					_recv_head_node->_cur_len += bytes_transferred; 					::memset(_data, 0, max_length); 					_socket.async_read_some(boost::asio::buffer(_data, max_length 					), 						std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr)); 					return; 				} 				//收到的数据比头部多 				//头部剩余未复制的长度 				int head_remain = HEAD_LENGTH - _recv_head_node->_cur_len; 				memcpy(_recv_head_node->_msg + _recv_head_node->_cur_len, _data + copy_len, head_remain); 				//更新已处理的data长度和剩余未处理的长度 				copy_len += head_remain; 				bytes_transferred -= head_remain; 				//获取头部数据 				int data_len = 0; 				memcpy(&data_len, _recv_head_node->_msg, HEAD_LENGTH); 				std::cout << "data_len is " << data_len << std::endl; 				//头部长度非法 				if (data_len > max_length) { 					std::cout << "invalid data length is " << data_len << std::endl; 					_server->ClearSession(_uuid); 					return; 				} 				_recv_msg_node = std::make_shared<MsgNode>(data_len); 				//消息的长度小于头部规定的长度,说明数据未收全,则先将部分消息放到接收节点里 				if (bytes_transferred < data_len) { 					memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); 					_recv_msg_node->_cur_len += bytes_transferred; 					::memset(_data, 0, max_length); 					_socket.async_read_some(boost::asio::buffer(_data, max_length), 						std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr)); 					//头部处理完成 					_b_head_parse = true; 					return; 				} 				memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, data_len); 				_recv_msg_node->_cur_len += data_len; 				copy_len += data_len; 				bytes_transferred -= data_len; 				_recv_msg_node->_msg[_recv_msg_node->_tot_len] = '\0'; 				std::cout << "receive data is " << _recv_msg_node->_msg << std::endl; 				//此处可以调用Send发送测试 				Send(_recv_msg_node->_msg, _recv_msg_node->_tot_len); 				//继续轮询剩余未处理数据 				_b_head_parse = false; 				_recv_head_node->Clear(); 				if (bytes_transferred <= 0) { 					::memset(_data, 0, max_length); 					_socket.async_read_some(boost::asio::buffer(_data, max_length), 						std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr)); 					return; 				} 				continue; 			} 			//已经处理完头部,处理上次未接受完的消息数据 			//接收的数据仍不足剩余未处理的 			int remain_msg = _recv_msg_node->_tot_len - _recv_msg_node->_cur_len; 			if (bytes_transferred < remain_msg) { 				memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, bytes_transferred); 				_recv_msg_node->_cur_len += bytes_transferred; 				::memset(_data, 0, max_length); 				_socket.async_read_some(boost::asio::buffer(_data, max_length), 					std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr)); 				return; 			} 			memcpy(_recv_msg_node->_msg + _recv_msg_node->_cur_len, _data + copy_len, remain_msg); 			_recv_msg_node->_cur_len += remain_msg; 			bytes_transferred -= remain_msg; 			copy_len += remain_msg; 			_recv_msg_node->_msg[_recv_msg_node->_tot_len] = '\0'; 			std::cout << "receive data is " << _recv_msg_node->_msg << std::endl; 			//此处可以调用Send发送测试 			Send(_recv_msg_node->_msg, _recv_msg_node->_tot_len); 			//继续轮询剩余未处理数据 			_b_head_parse = false; 			_recv_head_node->Clear(); 			if (bytes_transferred <= 0) { 				::memset(_data, 0, max_length); 				_socket.async_read_some(boost::asio::buffer(_data, max_length), 					std::bind(&Session::HandleRead, this, std::placeholders::_1, std::placeholders::_2, _self_ptr)); 				return; 			} 			continue; 		} 	} } //处理写事件 void Session::HandleWrite(const boost::system::error_code& ec, std::shared_ptr<Session> _self_ptr) { 	if (ec) { 		std::cout << "Error occured in writting. ErrMessage: " << ec.message() << std::endl; 		//delete this; 		_server->ClearSession(_uuid); 	} 	else { 		std::lock_guard<std::mutex> guard(_send_lock); 		_send_que.pop(); //使用async_write,保证发完指定的字节数 		if (!_send_que.empty()) { 			auto& msg = _send_que.front(); 			boost::asio::async_write( 				_socket, 				boost::asio::buffer( 					msg->_msg, 					msg->_tot_len 				), //使用async_write,可以不做偏移 				std::bind( 					&Session::HandleWrite, 					this, 					std::placeholders::_1, 					_self_ptr 				) 			); 		} 	} } 
MsgNode
#pragma once  #include <memory> #include "Session.h"  const int HEAD_LENGTH = sizeof(int);  class MsgNode { public:  	friend class Session;  	MsgNode(char* msg, int tot_len) :_cur_len(0), _tot_len(tot_len) { 		_msg = new char[_tot_len + 1]; 		memcpy(_msg, &tot_len, HEAD_LENGTH); 		memcpy(_msg + HEAD_LENGTH, msg, _tot_len);  		_msg[_tot_len] = '\0'; 	}  	MsgNode(int tot_len) : _cur_len(0), _tot_len(tot_len) { 		_msg = new char[_tot_len + 1]; 		memset(_msg, 0, _tot_len + 1); 	}  	~MsgNode() { 		delete[] _msg; 		_msg = nullptr; 	}  	void Clear() { 		memset(_msg, 0, _tot_len); 		_cur_len = 0; 	}  private:  	char* _msg; 	int _cur_len; 	int _tot_len;  };   

技术使用中遇到的问题和解决过程。

问题

实际上,在项目开始时尽管知道了异步函数执行的时机不能确定,但是如何延长Session的生命周期使其恰到好处的和注册的读写事件同步,也就是读写事件发生时,Session必须有效,它的读写事件不会再注册后,必须回收它的资源。

解决过程

  • 使用shared_ptr管理Session,同时Session要注册到Server的_sessions。在Session出错或者对端关闭socket的写端之后,再从Server中使用ClearSession()清除自己。
  • 仅仅这样还是不够的,为什么?因为这个连接在仅仅在这个事件清除了自己,但是后续可能还有这个连接的其他事件,这样会使后续事件遇到无效的对象。所以对于每个事件回调,还要拷贝这个Session的shared_ptr,以保证这个Session在这个事件的回调中是有效的。

进行总结。

根据自己的学习路线,这属于项目实战的内容。尽管只介绍了一个简单的回显服务器,但是这个例子已经能够很好的体现异步服务器的层次和框架。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!