TcpServer模块的公开接口:
对于TcpServer对象设置回调函数,其实最终会被设置到Connection对象上,而每个客户端对于一个Connection对象。
启动非活跃连接销毁,Tcpserver对象默认是关闭非活跃连接销毁的,添加定时任务下面会讲。
TcpServer对象创建后,就需要调用Start()函数进行线程池的创建,以及主线程EventLooop对象的运行,这个就是死循环。
添加定时任务:
这个添加定时任务是添加到主线程中的EventLoop对象的定时器对象当中去的。
而子线程是不会添加这种定时器任务,他只会运行非活跃连接的销毁工作。
连接的删除:
当连接断开时,需要将断开的连接从容器中删除。
这是设置给新连接的关闭回调函数,注意这个需要最后删除,否则Connection对象中保存的资源就不存在了,如果再去访问这些资源会出错,会什么还要去访问这些资源,因为如果你先删除Connection对象,就会找不到那些还没删除的资源,造成资源泄漏。
创建新连接并且为新连接设置回调函数:
新连接的回调函数是外部进行设置的。
上面就是有新连接到来调用的回调函数。
为新连接分配EventLoop对象和_next_id,将回调函数设置给Connection对象,在connection对象连接的不同状态中,进行调用。这不是设置给Channel对象的,channel对象设置的回调函数,是Connection对象的私有成员函数。
该回调函数最终会被设置给Acceptor对象。
当有新连接到来时,就会用新连接的文件描述符,初始化Connection,并将初始化好的connection对象放到存储容器中。
TcpServer模块中的成员变量:
_next_id:
他是自动增长的,可以标记连接的唯一性。
_port:
这是服务器端口号。
_timeout和_enable_inactive_release:
启动非活跃连接时设置的超时销毁时间 和 销毁标志。
EventLoop:
这个是主线程对应的事件管理对象。有可能客户端多个客户端对应一个套接字对象,主线程上的套接字对应一个EventLoop对象。这也体现Reactor模型。
Acceptor:
这个就是TcpServer模块如何让EventLoop对象去监听,主线程上的套接字有没有新的套接字到来。
LoopThreadPool:
TcpServer模块对应的线程池,该线程池中的每一个线程都绑定这一个EventLoop对象。
std::unordered_map<uint64_t, PtrConnection>:
这是用_next_id将Connection对象标识起来,存放到容器中,方便查找。
TcpServer模块整体代码:
class TcpServer { private: uint64_t _next_id; //这是一个自动增长的连接ID, int _port; int _timeout; //这是非活跃连接的统计时间---多长时间无通信就是非活跃连接 bool _enable_inactive_release;//是否启动了非活跃连接超时销毁的判断标志 EventLoop _baseloop; //这是主线程的EventLoop对象,负责监听事件的处理 Acceptor _acceptor; //这是监听套接字的管理对象 LoopThreadPool _pool; //这是从属EventLoop线程池 std::unordered_map<uint64_t, PtrConnection> _conns;//保存管理所有连接对应的shared_ptr对象 using ConnectedCallback = std::function<void(const PtrConnection&)>; using MessageCallback = std::function<void(const PtrConnection&, Buffer *)>; using ClosedCallback = std::function<void(const PtrConnection&)>; using AnyEventCallback = std::function<void(const PtrConnection&)>; using Functor = std::function<void()>; ConnectedCallback _connected_callback; MessageCallback _message_callback; ClosedCallback _closed_callback; AnyEventCallback _event_callback; private: void RunAfterInLoop(const Functor &task, int delay) { _next_id++; _baseloop.TimerAdd(_next_id, delay, task); } //为新连接构造一个Connection进行管理 void NewConnection(int fd) { _next_id++; PtrConnection conn(new Connection(_pool.NextLoop(), _next_id, fd)); conn->SetMessageCallback(_message_callback); conn->SetClosedCallback(_closed_callback); conn->SetConnectedCallback(_connected_callback); conn->SetAnyEventCallback(_event_callback); conn->SetSrvClosedCallback(std::bind(&TcpServer::RemoveConnection, this, std::placeholders::_1)); if (_enable_inactive_release) conn->EnableInactiveRelease(_timeout);//启动非活跃超时销毁 conn->Established();//就绪初始化 _conns.insert(std::make_pair(_next_id, conn)); } void RemoveConnectionInLoop(const PtrConnection &conn) { int id = conn->Id(); auto it = _conns.find(id); if (it != _conns.end()) { _conns.erase(it); } } //从管理Connection的_conns中移除连接信息 void RemoveConnection(const PtrConnection &conn) { _baseloop.RunInLoop(std::bind(&TcpServer::RemoveConnectionInLoop, this, conn)); } public: TcpServer(int port): _port(port), _next_id(0), _enable_inactive_release(false), _acceptor(&_baseloop, port), _pool(&_baseloop) { _acceptor.SetAcceptCallback(std::bind(&TcpServer::NewConnection, this, std::placeholders::_1)); _acceptor.Listen();//将监听套接字挂到baseloop上 } void SetThreadCount(int count) { return _pool.SetThreadCount(count); } void SetConnectedCallback(const ConnectedCallback&cb) { _connected_callback = cb; } void SetMessageCallback(const MessageCallback&cb) { _message_callback = cb; } void SetClosedCallback(const ClosedCallback&cb) { _closed_callback = cb; } void SetAnyEventCallback(const AnyEventCallback&cb) { _event_callback = cb; } void EnableInactiveRelease(int timeout) { _timeout = timeout; _enable_inactive_release = true; } //用于添加一个定时任务 void RunAfter(const Functor &task, int delay) { _baseloop.RunInLoop(std::bind(&TcpServer::RunAfterInLoop, this, task, delay)); } void Start() { _pool.Create(); _baseloop.Start(); } };