muduo源码阅读
Timestamp类,时间戳类
时间戳主要是获得时间
成员变量int64_t microSecondsSinceEpoch_ 是时间
now()函数是获得当前时间
toString()函数是把时间转换成人看的string
Timestamp Timestamp::now() { struct timeval tv; gettimeofday(&tv, NULL); int64_t seconds = tv.tv_sec; return Timestamp(seconds * kMicroSecondsPerSecond + tv.tv_usec); //把时间秒换算成毫秒 } string Timestamp::toString() const { char buf[32] = {0}; int64_t seconds = microSecondsSinceEpoch_ / kMicroSecondsPerSecond; int64_t microseconds = microSecondsSinceEpoch_ % kMicroSecondsPerSecond; snprintf(buf, sizeof(buf), "%" PRId64 ".%06" PRId64 "", seconds, microseconds); return buf; }
Log日志类
有一个日志等级
enum LogLevel { TRACE, DEBUG, INFO, WARN, ERROR, FATAL, NUM_LOG_LEVELS, };
Channel类
Channel理解为通道,封装了sockfd和其感兴趣的event,如EPOLLIN、EPOLLOUT事件,还绑定了poller返回的具体事件。当拿到文件描述符时,就去拿对应的channel,当文件描述符发生读、写事件,系统就会去channel里拿对应的读、写回调函数。
channel与eventloop打交道,epoll与eventloop打交道
EventLoop* loop_; const int fd_; //因为是文件描述符绑定,所以必须有fd文件描述符 int events_; //注册的事件 int revents_; //epoll或者poll监听到的事件, int index_; // used by Poller. bool logHup_;
四个回调函数,以及四个方法设置回调函数
ReadEventCallback readCallback_; EventCallback writeCallback_; EventCallback closeCallback_; EventCallback errorCallback_; typedef std::function<void()> EventCallback; typedef std::function<void(Timestamp)> ReadEventCallback; void setReadCallback(ReadEventCallback cb) { readCallback_ = std::move(cb); } //std::move是移动,移动完之后,cb内容就消失了 void setWriteCallback(EventCallback cb) { writeCallback_ = std::move(cb); } void setCloseCallback(EventCallback cb) { closeCallback_ = std::move(cb); } void setErrorCallback(EventCallback cb) { errorCallback_ = std::move(cb); }
设置事件
void enableReading() { events_ |= kReadEvent; update(); } //update()类似epollctrl void disableReading() { events_ &= ~kReadEvent; update(); } void enableWriting() { events_ |= kWriteEvent; update(); } void disableWriting() { events_ &= ~kWriteEvent; update(); } void disableAll() { events_ = kNoneEvent; update(); } bool isWriting() const { return events_ & kWriteEvent; } bool isReading() const { return events_ & kReadEvent; }
update()
loop_->updateChannel(this); //channel的update是调用eventloop的updateChannel //eventloop的updateChannel是调用poller的updateChannel poller_->updateChannel(channel); //epoller的updateChannel就是调用epollctrl来修改文件描述符的关键 //update(EPOLL_CTL_DEL, channel); ==》 //::epoll_ctl(epollfd_, operation, fd, &event) void EPollPoller::updateChannel(Channel* channel) { Poller::assertInLoopThread(); const int index = channel->index(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events() << " index = " << index; if (index == kNew || index == kDeleted) { // a new one, add with EPOLL_CTL_ADD int fd = channel->fd(); if (index == kNew) { assert(channels_.find(fd) == channels_.end()); channels_[fd] = channel; } else // index == kDeleted { assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); } channel->set_index(kAdded); update(EPOLL_CTL_ADD, channel); } else { // update existing one with EPOLL_CTL_MOD/DEL int fd = channel->fd(); (void)fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(index == kAdded); if (channel->isNoneEvent()) { update(EPOLL_CTL_DEL, channel); channel->set_index(kDeleted); } else { update(EPOLL_CTL_MOD, channel); } } } void EPollPoller::update(int operation, Channel* channel) { struct epoll_event event; memZero(&event, sizeof event); event.events = channel->events(); event.data.ptr = channel; int fd = channel->fd(); LOG_TRACE << "epoll_ctl op = " << operationToString(operation) << " fd = " << fd << " event = { " << channel->eventsToString() << " }"; if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) { if (operation == EPOLL_CTL_DEL) { LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; } else { LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; } } }
handleEvent()
Channel::handleEvent(Timestamp receiveTime) //eventloop里调用handleEvent //poller监听channel哪些事件发生了变化,然后上报给eventloop //handleEvent就是根据目前fd上发生的事件,然后去调用对应的回调函数
Poller类
poller是基类
typedef std::vector<Channel*> ChannelList; //放channel的 //三个纯虚函数,在底层实现 virtual Timestamp poll(int timeoutMs, ChannelList* activeChannels) = 0; virtual void updateChannel(Channel* channel) = 0; virtual void removeChannel(Channel* channel) = 0; //是否包含channel virtual bool hasChannel(Channel* channel) const; // typedef std::map<int, Channel*> ChannelMap; //poller所指向的eventloop EventLoop* ownerLoop_;
epollpoller
Timestamp poll(int timeoutMs, ChannelList* activeChannels) override //在这个函数会调用void fillActiveChannels(int numEvents, ChannelList* activeChannels) const; //fillActiveChannels是填写活跃的连接,将epoll监测到的活跃事件返回给activeChannel里 //成员变量 int epollfd_; //静态成员变量,表示eventList的大小,后面可以扩容 static const int kInitEventListSize = 16; typedef std::vector<struct epoll_event> EventList; //epollfd在构造的时候使用epoll_create1创建 epollfd_(::epoll_create1(EPOLL_CLOEXEC)), //poll函数就是等待事件发生 Timestamp EPollPoller::poll(int timeoutMs, ChannelList* activeChannels) { LOG_TRACE << "fd total count " << channels_.size(); //调用epoll_wait来等待事件发生, int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs); int savedErrno = errno; Timestamp now(Timestamp::now()); //如果有事件发生 if (numEvents > 0) { LOG_TRACE << numEvents << " events happened"; fillActiveChannels(numEvents, activeChannels); if (implicit_cast<size_t>(numEvents) == events_.size()) //扩容操作 { events_.resize(events_.size()*2); } } else if (numEvents == 0) { LOG_TRACE << "nothing happened"; } else { // error happens, log uncommon ones if (savedErrno != EINTR) { errno = savedErrno; LOG_SYSERR << "EPollPoller::poll()"; } } return now; } void EPollPoller::fillActiveChannels(int numEvents, ChannelList* activeChannels) const { assert(implicit_cast<size_t>(numEvents) <= events_.size()); for (int i = 0; i < numEvents; ++i) { //把事件取出来,事件和channel是绑定的,因此转化为channel,然后放入到activeChannels里 Channel* channel = static_cast<Channel*>(events_[i].data.ptr); #ifndef NDEBUG int fd = channel->fd(); ChannelMap::const_iterator it = channels_.find(fd); assert(it != channels_.end()); assert(it->second == channel); #endif channel->set_revents(events_[i].events); activeChannels->push_back(channel); } } const int kNew = -1; const int kAdded = 1; const int kDeleted = 2; //上面三个在channel是index void EPollPoller::updateChannel(Channel* channel) { Poller::assertInLoopThread(); const int index = channel->index(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events() << " index = " << index; if (index == kNew || index == kDeleted) { // a new one, add with EPOLL_CTL_ADD int fd = channel->fd(); if (index == kNew) { assert(channels_.find(fd) == channels_.end()); channels_[fd] = channel; } else // index == kDeleted { assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); } channel->set_index(kAdded); update(EPOLL_CTL_ADD, channel); } else { // update existing one with EPOLL_CTL_MOD/DEL int fd = channel->fd(); (void)fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(index == kAdded); if (channel->isNoneEvent()) { update(EPOLL_CTL_DEL, channel); channel->set_index(kDeleted); } else { update(EPOLL_CTL_MOD, channel); } } }
EventLoop类
eventfd是专门用于事件通知的fd
其他线程可以调用eventloop,所以isInLoopThread就是判断是否在本线程里
// 把回调函数放入到队列里 void queueInLoop(Functor cb); //执行回调函数 void runInLoop(Functor cb); //执行上层的回调函数 void EventLoop::doPendingFunctors() //主要的函数loop,在循环里,用poller监测活跃的channel,监测到后,去执行对应的事件,最后调用上层的回调函数 void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; // FIXME: what if someone calls quit() before loop() ? LOG_TRACE << "EventLoop " << this << " start looping"; while (!quit_) { activeChannels_.clear(); pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); ++iteration_; if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // TODO sort channel by priority eventHandling_ = true; for (Channel* channel : activeChannels_) { currentActiveChannel_ = channel; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; doPendingFunctors(); } //这里说明一下,当执行doPendingFunctors();回调时,又加入到5个回调,如果不处理,那么下一次循环中,就会阻塞在poller_->poll //因此queueInLoop里就会wakeup()一下,往wakeupfd写一个事件,然后poller监测到后就不阻塞了 LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; }
eventloop都一个threadid,这个threadid是通过currentthread::tid()获得的
wakeupfd是通过createEventfd()创建的
createEventfd()是用linux的eventfd创建,当监听到写事件时,就会唤醒wakeup线程
单reactor和多reactor,
如果是单reactor,上层接受连接、读写,都是在一个eventloop里执行的,因此直接执行回调函数
如果是多reactor,比如上层接受连接,然后下发给子reactor,读写关闭等回调是放到子reactor里,子reactor是通过queueInloop来实现的
void EventLoop::runInLoop(Functor cb) { if (isInLoopThread()) { cb(); } else { queueInLoop(std::move(cb)); } }
//将上层的回调函数插入到vector的 pendingFunctors里 void EventLoop::queueInLoop(Functor cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(std::move(cb)); } //当回调正在执行时,说明上面刚加入的回调是在下一轮被执行,而不是马上执行。因此需要唤醒一下,让下一次loop()里的poller->poll()不再阻塞 if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } }
handleRead是wakeup的回调函数
//构造的时候设置了回调 wakeupChannel_->setReadCallback( std::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd wakeupChannel_->enableReading(); void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; } } //唤醒就是向wakeupFd写事件,wakeupchannel就会发生读事件 void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } } void EventLoop::doPendingFunctors() { std::vector<Functor> functors; //callingPendingFunctors_表示正在执行回调 callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); //交换的方式减少锁的临界区域范围,提升效率 //如果是先加锁,把所有的回调函数都执行完,然后再解锁。可能会导致死锁,因为在queueInLoop里,会出现在执行回调函数过程中 //还加入回调,加入回调也是加锁,然后就进入死锁。一个线程获得了一个锁,想再次获得该锁,就会死锁。 //因此陈硕通过加锁交换,解锁,再唤醒的方式 } for (const Functor& functor : functors) { functor(); } callingPendingFunctors_ = false; }
thread类
成员变量
using ThreadFunc = std::function<void()>; bool started_; bool joined_; std::shared_ptr<std::thread> thread_; pid_t tid_; // 在线程创建时再绑定 ThreadFunc func_; // 线程回调函数 std::string name_; static std::atomic_int numCreated_;
构造函数的时候,先把function放进去,start()的时候,才启动线程开启function
比较重要的是用信号量来保证线程创建的有序性,start()是启动线程
void Thread::start() // 一个Thread对象 记录的就是一个新线程的详细信息 { started_ = true; sem_t sem; sem_init(&sem, false, 0); // false指的是 不设置进程间共享 // 开启线程 thread_ = std::shared_ptr<std::thread>(new std::thread([&]() { tid_ = CurrentThread::tid(); // 获取线程的tid值 sem_post(&sem); func_(); // 开启一个新线程 专门执行该线程函数 })); // 这里必须等待获取上面新创建的线程的tid值 sem_wait(&sem); }
EventLoopThread类
成员变量
using ThreadInitCallback = std::function<void(EventLoop *)>; EventLoop *loop_; bool exiting_; Thread thread_; std::mutex mutex_; // 互斥锁 std::condition_variable cond_; // 条件变量 ThreadInitCallback callback_;
EventLoopThread对应的function,这个函数会以回调函数的形式放入到thread_里
在构造函数里,有这样的一条语句。function是放进去了,不过没有执行,这就是回调函数的好处
thread_(std::bind(&EventLoopThread::threadFunc, this), name)
// 下面这个方法是在单独的新线程里运行的 void EventLoopThread::threadFunc() { EventLoop loop; // 创建一个独立的EventLoop对象 和上面的线程是一一对应的 级one loop per thread if (callback_) { callback_(&loop); } { std::unique_lock<std::mutex> lock(mutex_); loop_ = &loop; cond_.notify_one(); } loop.loop(); // 执行EventLoop的loop() 开启了底层的Poller的poll() //只有等eventloop quit()之后才会执行下面的语句,不然会一直阻塞在上面的语句 std::unique_lock<std::mutex> lock(mutex_); loop_ = nullptr; }
然后startloop()才是执行,会去调用thread的start(), thread的start()才是创建真正的线程
EventLoop *EventLoopThread::startLoop() { thread_.start(); // 启用底层线程Thread类对象thread_中通过start()创建的线程 //上面的start创建线程,是Thread类是调用std::thread来创建的,线程真正的创建在背后的内核,所以我们要一直等线程创建好。 EventLoop *loop = nullptr; { std::unique_lock<std::mutex> lock(mutex_); while(loop_ == nullptr) { //一直等待真正的线程创建好 cond_.wait(lock); } //线程创建好之后就可以返回对应的eventloop,说明eventloop的线程创建好了 loop = loop_; } return loop; }
总结:会把回调函数threadFunc先放入到thread_,但是这个时候还没有去创建线程,然后调用startLoop()函数时,再去调用thread的的start(),这个时候回去创建真正的线程。startLoop函数会返回创建好的eventloop指针,但是必须等线程创建好,这个时候就使用锁和条件变量,在回调函数threadFunc里等eventloop创建好后,会使用cond.notify_one();通知startloop线程创建好了*
EventLoopThreadPool
成员变量
EventLoop *baseLoop_; // 用户使用muduo创建的loop 如果线程数为1 那直接使用用户创建的loop 否则创建多EventLoop std::string name_; bool started_; int numThreads_; int next_; // 轮询的下标 std::vector<std::unique_ptr<EventLoopThread>> threads_; //线程vector std::vector<EventLoop *> loops_; //EventLoop的vector
void EventLoopThreadPool::start(const ThreadInitCallback &cb) { started_ = true; for(int i = 0; i < numThreads_; ++i) { char buf[name_.size() + 32]; snprintf(buf, sizeof buf, "%s%d", name_.c_str(), i); EventLoopThread *t = new EventLoopThread(cb, buf); //绑定回调函数cb threads_.push_back(std::unique_ptr<EventLoopThread>(t)); loops_.push_back(t->startLoop()); // 底层创建线程 绑定一个新的EventLoop 并返回该loop的地址 } if(numThreads_ == 0 && cb) // 整个服务端只有一个线程运行baseLoop { cb(baseLoop_); } }
这个baseloop就是一开始程序运行的那个eventloop
// 如果工作在多线程中,baseLoop_(mainLoop)会默认以轮询的方式分配Channel给subLoop EventLoop *EventLoopThreadPool::getNextLoop() { EventLoop *loop = baseLoop_; // 如果只设置一个线程 也就是只有一个mainReactor 无subReactor 那么轮询只有一个线程 getNextLoop()每次都返回当前的baseLoop_ if(!loops_.empty()) // 通过轮询获取下一个处理事件的loop { loop = loops_[next_]; ++next_; if(next_ >= loops_.size()) { next_ = 0; } } return loop; }
getAllLoops就是返回所有的eventloop
InetAddress类
主要是封装sockaddr_in,封装地址、端口等等
还有输出地址、端口
Socket类
主要是封装文件描述符socket
提供socket相关的操作函数
bind listen accept
int fd() const { return sockfd_; } void bindAddress(const InetAddress &localaddr); void listen(); int accept(InetAddress *peeraddr); void shutdownWrite(); void setTcpNoDelay(bool on); void setReuseAddr(bool on); void setReusePort(bool on); void setKeepAlive(bool on);
Acceptor类
Acceptor类用来接受连接并处理连接
成员变量EventLoop是baseLoop,在TcpServer里会传入baseLoop给Acceptor
using NewConnectionCallback = std::function<void(int sockfd, const InetAddress &)>; EventLoop *loop_; // Acceptor用的就是用户定义的那个baseLoop 也称作mainLoop Socket acceptSocket_; Channel acceptChannel_; NewConnectionCallback NewConnectionCallback_; bool listenning_; int idleFd_; //用于当文件描述符过多时的操作,当文件描述符过多时,就丢到黑洞里
构造函数
Acceptor::Acceptor(EventLoop *loop, const InetAddress &listenAddr, bool reuseport) : loop_(loop) //loop是上层传入的 , acceptSocket_(createNonblocking()) //调用函数来创建socket , acceptChannel_(loop, acceptSocket_.fd()) , listenning_(false) , idleFd_(::open("/dev/null", O_RDONLY | O_CLOEXEC)) { acceptSocket_.setReuseAddr(true); acceptSocket_.setReusePort(true); acceptSocket_.bindAddress(listenAddr); // TcpServer::start() => Acceptor.listen() 如果有新用户连接 要执行一个回调(accept => connfd => 打包成Channel => 唤醒subloop) // baseloop监听到有事件发生 => acceptChannel_(listenfd) => 执行该回调函数 acceptChannel_.setReadCallback( std::bind(&Acceptor::handleRead, this)); //当有连接发生时,eventloop会监测到,然后socketfd的channel会去执行回调函数,这个回调函数就是handleRead }
//创建socket,用于构造函数 static int createNonblocking() { int sockfd = ::socket(AF_INET, SOCK_STREAM | SOCK_NONBLOCK | SOCK_CLOEXEC, IPPROTO_TCP); if (sockfd < 0) { LOG_FATAL("%s:%s:%d listen socket create err:%d\n", __FILE__, __FUNCTION__, __LINE__, errno); } return sockfd; }
// listenfd有事件发生了,就是有新用户连接了 //该函数功能是响应和接受连接请求 void Acceptor::handleRead() { InetAddress peerAddr; //调用accept int connfd = acceptSocket_.accept(&peerAddr); if (connfd >= 0) { //NewConnectionCallback_上层回调,TcpServer传入的 if (NewConnectionCallback_) { NewConnectionCallback_(connfd, peerAddr); // 轮询找到subLoop 唤醒并分发当前的新客户端的Channel } else { ::close(connfd); } } else { LOG_SYSERR << "in Acceptor::handleRead"; // Read the section named "The special problem of // accept()ing when you can't" in libev's doc. // By Marc Lehmann, author of libev. if (errno == EMFILE) { ::close(idleFd_); idleFd_ = ::accept(acceptSocket_.fd(), NULL, NULL); ::close(idleFd_); idleFd_ = ::open("/dev/null", O_RDONLY | O_CLOEXEC); } } }
析构函数
Acceptor::~Acceptor() { acceptChannel_.disableAll(); // 把从Poller中感兴趣的事件删除掉 acceptChannel_.remove(); // 调用EventLoop->removeChannel => Poller->removeChannel 把Poller的ChannelMap对应的部分删除 ::close(idleFd_); }
监听
void Acceptor::listen() { listenning_ = true; acceptSocket_.listen(); // listen acceptChannel_.enableReading(); // acceptChannel_注册至Poller !重要 }
Buffer类
buffer内部是一个std::vector<char>,
前8个字节预留着用于记录数据长度,readIndex 指向可读取数据的初始位置,writeIndex指向空闲区的起始位置
成员变量
std::vector<char> buffer_; size_t readerIndex_; size_t writerIndex_; static const size_t kCheapPrepend = 8; static const size_t kInitialSize = 1024;
每次读数据,readerIndex_向右移动。每次写数据,writerIndex向右移动。
//返回可读区域 size_t readableBytes() const { return writerIndex_ - readerIndex_; } //返回可写区域 size_t writableBytes() const { return buffer_.size() - writerIndex_; } //返回预留区域 size_t prependableBytes() const { return readerIndex_; }
取数据的操作
//取数据的操作 void retrieve(size_t len) { if (len < readableBytes()) { readerIndex_ += len; // 说明应用只读取了可读缓冲区数据的一部分,就是len长度 还剩下readerIndex+=len到writerIndex_的数据未读 } else // len == readableBytes() { retrieveAll(); } } //当把所有的数据取出来的时候,就把reader指针和writer指针归位,归到开头 void retrieveAll() { readerIndex_ = kCheapPrepend; writerIndex_ = kCheapPrepend; }
下面的函数是把数据取出来,并转化为string类型
// 把onMessage函数上报的Buffer数据 转成string类型的数据返回 std::string retrieveAllAsString() { return retrieveAsString(readableBytes()); } std::string retrieveAsString(size_t len) { std::string result(peek(), len); retrieve(len); // 上面一句把缓冲区中可读的数据已经读取出来 这里肯定要对缓冲区进行复位操作 return result; }
要插入数据,先看一下可写区大小够不够,如果不够,那么就要扩容
// buffer_.size - writerIndex_ void ensureWritableBytes(size_t len) { if (writableBytes() < len) { makeSpace(len); // 扩容 } }
扩容操作,先看一下可写区和预留区的总和是否大于len,如果大于,就说明buffer容量还够,我们只需要把reader指针归位,writer指针往前移动一点距离。如果小于,就说明buffer容量不够,我们需要进行resize扩容
void makeSpace(size_t len) { /** * | kCheapPrepend |xxx| reader | writer | // xxx标示reader中已读的部分 * | kCheapPrepend | reader | len | **/ if (writableBytes() + prependableBytes() < len + kCheapPrepend) // 也就是说 len > xxx + writer的部分 { buffer_.resize(writerIndex_ + len); } else // 这里说明 len <= xxx + writer 把reader搬到从xxx开始 使得xxx后面是一段连续空间 { size_t readable = readableBytes(); // readable = reader的长度 std::copy(begin() + readerIndex_, begin() + writerIndex_, // 把这一部分数据拷贝到begin+kCheapPrepend起始处 begin() + kCheapPrepend); readerIndex_ = kCheapPrepend; writerIndex_ = readerIndex_ + readable; } }
添加数据
// 把[data, data+len]内存上的数据添加到writable缓冲区当中 void append(const char *data, size_t len) { ensureWritableBytes(len); std::copy(data, data+len, beginWrite()); writerIndex_ += len; }
从fd读数据,先开两个缓冲区。内核缓冲区里有数据,应该要尽快一次性读进来。而不是先扩容,再读数据,不然后面内核缓冲区还会继续来数据。
/** * 从fd上读取数据 Poller工作在LT模式 * Buffer缓冲区是有大小的! 但是从fd上读取数据的时候 却不知道tcp数据的最终大小 * * @description: 从socket读到缓冲区的方法是使用readv先读至buffer_, * Buffer_空间如果不够会读入到栈上65536个字节大小的空间,然后以append的 * 方式追加入buffer_。既考虑了避免系统调用带来开销,又不影响数据的接收。 **/ ssize_t Buffer::readFd(int fd, int *saveErrno) { // 栈额外空间,用于从套接字往出读时,当buffer_暂时不够用时暂存数据,待buffer_重新分配足够空间后,在把数据交换给buffer_。 char extrabuf[65536] = {0}; // 栈上内存空间 65536/1024 = 64KB /* struct iovec { ptr_t iov_base; // iov_base指向的缓冲区存放的是readv所接收的数据或是writev将要发送的数据 size_t iov_len; // iov_len在各种情况下分别确定了接收的最大长度以及实际写入的长度 }; */ // 使用iovec分配两个连续的缓冲区 struct iovec vec[2]; const size_t writable = writableBytes(); // 这是Buffer底层缓冲区剩余的可写空间大小 不一定能完全存储从fd读出的数据 // 第一块缓冲区,指向可写空间 vec[0].iov_base = begin() + writerIndex_; vec[0].iov_len = writable; // 第二块缓冲区,指向栈空间 vec[1].iov_base = extrabuf; vec[1].iov_len = sizeof(extrabuf); // when there is enough space in this buffer, don't read into extrabuf. // when extrabuf is used, we read 128k-1 bytes at most. // 这里之所以说最多128k-1字节,是因为若writable为64k-1,那么需要两个缓冲区 第一个64k-1 第二个64k 所以做多128k-1 // 如果第一个缓冲区>=64k 那就只采用一个缓冲区 而不使用栈空间extrabuf[65536]的内容 const int iovcnt = (writable < sizeof(extrabuf)) ? 2 : 1; const ssize_t n = ::readv(fd, vec, iovcnt); if (n < 0) { *saveErrno = errno; } else if (n <= writable) // Buffer的可写缓冲区已经够存储读出来的数据了 { writerIndex_ += n; } else // extrabuf里面也写入了n-writable长度的数据 { writerIndex_ = buffer_.size(); append(extrabuf, n - writable); // 对buffer_扩容 并将extrabuf存储的另一部分数据追加至buffer_ } return n; }
把数据写到fd上
// inputBuffer_.readFd表示将对端数据读到inputBuffer_中,移动writerIndex_指针 // outputBuffer_.writeFd标示将数据写入到outputBuffer_中,从readerIndex_开始,可以写readableBytes()个字节 ssize_t Buffer::writeFd(int fd, int *saveErrno) { ssize_t n = ::write(fd, peek(), readableBytes()); if (n < 0) { *saveErrno = errno; } return n; }
TcpConnection类
TcpServer => Acceptor => 有一个新用户连接,通过accept函数拿到connfd
=> TcpConnection设置回调 => 设置到Channel => Poller => Channel回调
往下层设置的回调
eventloop的doPendingFunctors();解决的上层回调就是这些
void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; } void setCloseCallback(const CloseCallback &cb) { closeCallback_ = cb; } void setHighWaterMarkCallback(const HighWaterMarkCallback &cb, size_t highWaterMark) { highWaterMarkCallback_ = cb; highWaterMark_ = highWaterMark; }
下面这四个是注册到channel中
void handleRead(Timestamp receiveTime); void handleWrite(); void handleClose(); void handleError();
成员变量,socket是与channel绑定在一起
// Socket Channel 这里和Acceptor类似 Acceptor => mainloop TcpConnection => subloop std::unique_ptr<Socket> socket_; std::unique_ptr<Channel> channel_;
const InetAddress localAddr_; const InetAddress peerAddr_; EventLoop *loop_; // 这里是baseloop还是subloop由TcpServer中创建的线程数决定 若为多Reactor 该loop_指向subloop 若为单Reactor 该loop_指向baseloop const std::string name_; std::atomic_int state_; bool reading_;
回调函数
// 这些回调TcpServer也有 用户通过写入TcpServer注册 TcpServer再将注册的回调传递给TcpConnection TcpConnection再将回调注册到Channel中 ConnectionCallback connectionCallback_; // 有新连接时的回调 MessageCallback messageCallback_; // 有读写消息时的回调 WriteCompleteCallback writeCompleteCallback_; // 消息发送完成以后的回调 HighWaterMarkCallback highWaterMarkCallback_; //控制收发速度 CloseCallback closeCallback_; size_t highWaterMark_;
// 数据缓冲区 Buffer inputBuffer_; // 接收数据的缓冲区 Buffer outputBuffer_; // 发送数据的缓冲区 用户send向outputBuffer_发
handleReader
// 将内核缓冲区中的数据读入到inputbuffer中,并调用用户注册的messageCallback // 读是相对服务器而言的 当对端客户端有数据到达 服务器端检测到EPOLLIN 就会触发该fd上的回调 handleRead取读走对端发来的数据 void TcpConnection::handleRead(Timestamp receiveTime) { int savedErrno = 0; //从fd上读数据到inputBuffer ssize_t n = inputBuffer_.readFd(channel_->fd(), &savedErrno); if (n > 0) // 有数据到达 { // 已建立连接的用户有可读事件发生了 调用用户传入的回调操作onMessage shared_from_this就是获取了TcpConnection的智能指针 messageCallback_(shared_from_this(), &inputBuffer_, receiveTime); } else if (n == 0) // 客户端断开 { handleClose(); } else // 出错了 { errno = savedErrno; LOG_ERROR("TcpConnection::handleRead"); handleError(); } }
//当内核缓冲区由满变为不满的时候,就会产生写事件 //只要有写事件,就会一直写,这个是重点,所以写完之后,一定要关闭写事件 void TcpConnection::handleWrite() { if (channel_->isWriting()) { int savedErrno = 0; ssize_t n = outputBuffer_.writeFd(channel_->fd(), &savedErrno); if (n > 0) { outputBuffer_.retrieve(n); //readableBytes等于0,说明buffer里的所有数据都写进去了 //如果不等于0,说明内核缓冲区里已经满了,写不进去了, if (outputBuffer_.readableBytes() == 0) { //写好了,就要关闭。因为只要有写事件,就会一直写下去。 channel_->disableWriting(); if (writeCompleteCallback_) { // TcpConnection对象在其所在的subloop中 向pendingFunctors_中加入回调 loop_->queueInLoop( std::bind(writeCompleteCallback_, shared_from_this())); } if (state_ == kDisconnecting) { shutdownInLoop(); // 在当前所属的loop中把TcpConnection删除掉 } } } else { LOG_ERROR("TcpConnection::handleWrite"); } } else { LOG_ERROR("TcpConnection fd=%d is down, no more writing", channel_->fd()); } }
下面这个函数并没有用到
void TcpConnection::send(const std::string &buf) { if (state_ == kConnected) { if (loop_->isInLoopThread()) // 这种是对于单个reactor的情况 用户调用conn->send时 loop_即为当前线程 { sendInLoop(buf.c_str(), buf.size()); } else { loop_->runInLoop( std::bind(&TcpConnection::sendInLoop, this, buf.c_str(), buf.size())); } } }
/** * 发送数据 应用写的快 而内核发送数据慢 需要把待发送数据写入缓冲区,而且设置了水位回调 **/ void TcpConnection::sendInLoop(const void *data, size_t len) { ssize_t nwrote = 0; size_t remaining = len; bool faultError = false; if (state_ == kDisconnected) // 之前调用过该connection的shutdown 不能再进行发送了 { LOG_ERROR("disconnected, give up writing"); } // 表示channel_第一次开始写数据或者缓冲区没有待发送数据 if (!channel_->isWriting() && outputBuffer_.readableBytes() == 0) { nwrote = ::write(channel_->fd(), data, len); if (nwrote >= 0) { remaining = len - nwrote; if (remaining == 0 && writeCompleteCallback_) { // 既然在这里数据全部发送完成,就不用再给channel设置epollout事件了 loop_->queueInLoop( std::bind(writeCompleteCallback_, shared_from_this())); } } else // nwrote < 0 { nwrote = 0; if (errno != EWOULDBLOCK) // EWOULDBLOCK表示非阻塞情况下没有数据后的正常返回 等同于EAGAIN { LOG_ERROR("TcpConnection::sendInLoop"); if (errno == EPIPE || errno == ECONNRESET) // SIGPIPE RESET { faultError = true; } } } } /** * 说明当前这一次write并没有把数据全部发送出去 剩余的数据需要保存到缓冲区当中 * 然后给channel注册EPOLLOUT事件,Poller发现tcp的发送缓冲区有空间后会通知 * 相应的sock->channel,调用channel对应注册的writeCallback_回调方法, * channel的writeCallback_实际上就是TcpConnection设置的handleWrite回调, * 把发送缓冲区outputBuffer_的内容全部发送完成 **/ if (!faultError && remaining > 0) { // 目前发送缓冲区剩余的待发送的数据的长度 size_t oldLen = outputBuffer_.readableBytes(); if (oldLen + remaining >= highWaterMark_ && oldLen < highWaterMark_ && highWaterMarkCallback_) { loop_->queueInLoop( std::bind(highWaterMarkCallback_, shared_from_this(), oldLen + remaining)); } outputBuffer_.append((char *)data + nwrote, remaining); if (!channel_->isWriting()) { channel_->enableWriting(); // 这里一定要注册channel的写事件 否则poller不会给channel通知epollout } } }
// 连接建立 void connectEstablished(); // 连接销毁 void connectDestroyed();
TcpServer
服务器
回调函数
//在callback.h文件里定义的有定义: using ConnectionCallback = std::function<void(const TcpConnectionPtr &)>; using CloseCallback = std::function<void(const TcpConnectionPtr &)>; using WriteCompleteCallback = std::function<void(const TcpConnectionPtr &)>;
这些回调函数是用户定义的,传递给TcpConnection,TcpConnection传递给Channel。因此需要用户写回调函数,然后层层传递下去
ConnectionCallback connectionCallback_; //有新连接时的回调 MessageCallback messageCallback_; // 有读写事件发生时的回调 WriteCompleteCallback writeCompleteCallback_; // 消息发送完成后的回调 void setThreadInitCallback(const ThreadInitCallback &cb) { threadInitCallback_ = cb; } void setConnectionCallback(const ConnectionCallback &cb) { connectionCallback_ = cb; } void setMessageCallback(const MessageCallback &cb) { messageCallback_ = cb; } void setWriteCompleteCallback(const WriteCompleteCallback &cb) { writeCompleteCallback_ = cb; }
setThreadNum,就是调用threadPool里的setThreadNum
// 设置底层subloop的个数 void TcpServer::setThreadNum(int numThreads) { threadPool_->setThreadNum(numThreads); }
// 开启服务器监听 void TcpServer::start() { if (started_++ == 0) // 防止一个TcpServer对象被start多次 { threadPool_->start(threadInitCallback_); // 启动底层的loop线程池 //这个loop是mainloop,用来监听连接 loop_->runInLoop(std::bind(&Acceptor::listen, acceptor_.get())); } }
成员变量:
using ConnectionMap = std::unordered_map<std::string, TcpConnectionPtr>; EventLoop *loop_; // baseloop 用户自定义的loop const std::string ipPort_; const std::string name_; std::unique_ptr<Acceptor> acceptor_; // 运行在mainloop 任务就是监听新连接事件 std::shared_ptr<EventLoopThreadPool> threadPool_; // one loop per thread std::atomic_int started_; int nextConnId_; ConnectionMap connections_; // 保存所有的连接
构造函数
TcpServer::TcpServer(EventLoop *loop, const InetAddress &listenAddr, const std::string &nameArg, Option option) : loop_(CheckLoopNotNull(loop)) , ipPort_(listenAddr.toIpPort()) , name_(nameArg) //比较重要的两个acceptor和threadPool, , acceptor_(new Acceptor(loop, listenAddr, option == kReusePort)) , threadPool_(new EventLoopThreadPool(loop, name_)) , connectionCallback_() , messageCallback_() , nextConnId_(1) , started_(0) { // 当有新用户连接时,Acceptor类中绑定的acceptChannel_会有读事件发生,执行handleRead()调用TcpServer::newConnection回调 acceptor_->setNewConnectionCallback( std::bind(&TcpServer::newConnection, this, std::placeholders::_1, std::placeholders::_2)); }
TcpServer::newConnection
先从线程池中调用getNextLoop(),拿到一个eventLoop来管理新连接fd对应的channel。
创建新的地址和新的socket,然后创建新的连接TcpConnection。
把TcpConnection放入到对应的map里,并设置对应的函数回调
// 有一个新用户连接,acceptor会执行这个回调操作,负责将mainLoop接收到的请求连接(acceptChannel_会有读事件发生)通过回调轮询分发给subLoop去处理 void TcpServer::newConnection(int sockfd, const InetAddress &peerAddr) { // 轮询算法 选择一个subLoop 来管理connfd对应的channel EventLoop *ioLoop = threadPool_->getNextLoop(); char buf[64] = {0}; snprintf(buf, sizeof buf, "-%s#%d", ipPort_.c_str(), nextConnId_); ++nextConnId_; // 这里没有设置为原子类是因为其只在mainloop中执行 不涉及线程安全问题 std::string connName = name_ + buf; LOG_INFO("TcpServer::newConnection [%s] - new connection [%s] from %s\n", name_.c_str(), connName.c_str(), peerAddr.toIpPort().c_str()); // 通过sockfd获取其绑定的本机的ip地址和端口信息 sockaddr_in local; ::memset(&local, 0, sizeof(local)); socklen_t addrlen = sizeof(local); if(::getsockname(sockfd, (sockaddr *)&local, &addrlen) < 0) { LOG_ERROR("sockets::getLocalAddr"); } InetAddress localAddr(local); TcpConnectionPtr conn(new TcpConnection(ioLoop, connName, sockfd, localAddr, peerAddr)); connections_[connName] = conn; // 下面的回调都是用户设置给TcpServer => TcpConnection的,至于Channel绑定的则是TcpConnection设置的四个,handleRead,handleWrite... 这下面的回调用于handlexxx函数中 conn->setConnectionCallback(connectionCallback_); conn->setMessageCallback(messageCallback_); conn->setWriteCompleteCallback(writeCompleteCallback_); // 设置了如何关闭连接的回调 conn->setCloseCallback( std::bind(&TcpServer::removeConnection, this, std::placeholders::_1)); ioLoop->runInLoop( std::bind(&TcpConnection::connectEstablished, conn)); }