Sylar服务器框架——协程调度器

avatar
作者
猴君
阅读量:0

1、协程调度器介绍

  • 协程模块中,对于每个协程,都需要用户手动调用协程的resume方法将协程运行起来,然后等协程运行结束并返回,再运行下一个协程。这种运行协程的方式其实是用户自己在挑选协程执行,相当于用户在充当调度器,显然不够灵活。
  • 协程调度器应该具有自动消耗协程的功能,程序员只需要把需要执行的协程喂给调度器,调度器就可以自己消耗掉这些协程。
  • sylar框架中的协程调度器是一个N-M的协程调度器,N个线程上可以运行M个协程,协程可以在线程之间进行切换,也可以指定某个协程在哪个线程上执行。
  • 该模块的难点是当caller线程也参与调度时调度协程和主线程切换的情况
    2、设计思路
  • 调度器创建后,内部首先会创建一个调度线程池,调度开始后,所有调度线程按顺序从任务队列里取任务执行,调度线程数越多,能够同时调度的任务也就越多,当所有任务都调度完后,调度线程就停下来等新的任务进来。接下来是添加调度任务,添加调度任务的本质就是往调度器的任务队列里塞任务,但是,只添加调度任务是不够的,还应该有一种方式用于通知调度线程有新的任务加进来了,因为调度线程并不一定知道有新任务进来了。当然调度线程也可以不停地轮询有没有新任务,但是这样CPU占用率会很高。接下来是调度器的停止。调度器应该支持停止调度的功能,以便回收调度线程的资源,只有当所有的调度线程都结束后,调度器才算真正停止。
  • 首先是协程调度器的初始化。sylar的协程调度器在初始化时支持传入线程数和一个布尔型的use_caller参数,表示是否使用caller线程。在使用caller线程的情况下,线程数自动减一,并且调度器内部会初始化一个属于caller线程的调度协程并保存起来(比如,在main函数中创建的调度器,如果use_caller为true,那调度器会初始化一个属于main函数线程的调度协程)。调度器创建好后,即可调用调度器的schedule方法向调度器添加调度任务,但此时调度器并不会立刻执行这些任务,而是将它们保存到内部的一个任务队列中。接下来是调用start方法启动调度。start方法调用后会创建调度线程池,线程数量由初始化时的线程数和use_caller确定。调度线程一旦创建,就会立刻从任务队列里取任务执行。比较特殊的一点是,如果初始化时指定线程数为1且use_caller为true,那么start方法什么也不做,因为不需要创建新线程用于调度。并且,由于没有创建新的调度线程,那只能由caller线程的调度协程来负责调度协程,而caller线程的调度协程的执行时机与start方法并不在同一个地方。caller线程的调度协程的执行时机在stop函数中。接下来是调度协程,对应run方法。调度协程负责从调度器的任务队列中取任务执行。取出的任务即子协程,这里调度协程和子协程的切换模型即为前一章介绍的非对称模型,每个子协程执行完后都必须返回调度协程,由调度协程重新从任务队列中取新的协程并执行。如果任务队列空了,那么调度协程会切换到一个idle协程,这个idle协程什么也不做,等有新任务进来时,idle协程才会退出并回到调度协程,重新开始下一轮调度。接下来是添加调度任务,对应schedule方法,这个方法支持传入协程或函数,并且支持一个线程号参数,表示是否将这个协程或函数绑定到一个具体的线程上执行。如果任务队列为空,那么在添加任务之后,要调用一次tickle方法以通知各调度线程的调度协程有新任务来了。在执行调度任务时,还可以通过调度器的GetThis()方法获取到当前调度器,再通过schedule方法继续添加新的任务,这就变相实现了在子协程中创建并运行新的子协程的功能。**接下来是调度器的停止。**调度器的停止行为要分两种情况讨论,首先是use_caller为false的情况,这种情况下,由于没有使用caller线程进行调度,那么只需要简单地等各个调度线程的调度协程退出就行了。如果use_caller为true,表示caller线程也要参于调度,这时,调度器初始化时记录的属于caller线程的调度协程就要起作用了,在调度器停止前,应该让这个caller线程的调度协程也运行一次,让caller线程完成调度工作后再退出。如果调度器只使用了caller线程进行调度,那么所有的调度任务要在调度器停止时才会被调度。
    3、代码实现
  • 协程调度器成员变量
/// 协程调度器名称 std::string m_name; /// 互斥锁 MutexType m_mutex; /// 线程池 std::vector<Thread::ptr> m_threads; /// 任务队列 std::list<ScheduleTask> m_tasks; /// 线程池的线程ID数组 std::vector<int> m_threadIds; /// 工作线程数量,不包含use_caller的主线程 size_t m_threadCount = 0; /// 活跃线程数 std::atomic<size_t> m_activeThreadCount = {0}; /// idle线程数 std::atomic<size_t> m_idleThreadCount = {0};   /// 是否use caller bool m_useCaller; /// use_caller为true时,调度器所在线程的调度协程 Fiber::ptr m_rootFiber; /// use_caller为true时,调度器所在线程的id int m_rootThread = 0;   /// 是否正在停止 bool m_stopping = false; 
  • start() 开始调度
void Scheduler::start() {     SYLAR_LOG_DEBUG(g_logger) << "start";     MutexType::Lock lock(m_mutex);     if (m_stopping) {         SYLAR_LOG_ERROR(g_logger) << "Scheduler is stopped";         return;     }     SYLAR_ASSERT(m_threads.empty());     m_threads.resize(m_threadCount);     for (size_t i = 0; i < m_threadCount; i++) {         m_threads[i].reset(new Thread(std::bind(&Scheduler::run, this),                                       m_name + "_" + std::to_string(i)));         m_threadIds.push_back(m_threads[i]->getId());     } } 
  • 调度方法
 void Scheduler::run() {         SYLAR_LOG_DEBUG(g_logger) << m_name << " run";         setThis();         set_hook_enable(true);         // 少用系统调用         // if(sylar::GetThreadId() != m_rootThread) {         // 第二个条件是将use_caller为true时,非caller线程中的t_scheduler_fiber赋值,caller线程中的t_scheduler_fiber在Scheduler初始化时已经赋值了         // if (!m_useCaller || (t_scheduler_fiber == nullptr && m_useCaller)) {             t_scheduler_fiber = Fiber::GetThis();         // }         //初始话一个没有任务时执行的协程         Fiber::ptr idle_fiber(new Fiber(std::bind(&Scheduler::idle, this)));         Fiber::ptr cb_fiber;         ScheduleTask task;         while(true) {             task.reset();             bool need_tickle = false;             {                 MutexType::Lock lock(m_lock);                 //这个for的作用是找到一个可以给当前线程执行的任务                 auto it = m_tasks.begin();                 for(; it != m_tasks.end(); ++it) {                     if (it ->threadid != -1 && it->threadid != sylar::GetThreadId()) {                         need_tickle = true;                         continue;                     }                      SYLAR_ASSERT(it->cb || it->fiber);                     // 状态可以再加一个assert                     task = *it;                     ++m_activeThreadCount;                     m_tasks.erase(it++);                     break;                 }                 // 通知其他线程                 need_tickle |= (it  != m_tasks.end());             }              if(need_tickle) {                 tickle();             }              if(task.fiber) {                 // 这个任务是fiber                 task.fiber->swapIn();                 --m_activeThreadCount;                  if(task.fiber->getState() == Fiber::READY) {                     schedule(task.fiber);                 } else if(task.fiber->getState() != Fiber::TERM                     && task.fiber->getState() != Fiber::EXCEPT) {                     task.fiber->setState(Fiber::HOLD);                 }                 // 让fiber资源尽早释放,不用等到下个fiber来的时候再释放                 task.reset();             } else if(task.cb) {                 // 这个任务是callback                 if(cb_fiber) {                     cb_fiber->reset(task.cb, m_useCaller);                 } else {                     cb_fiber.reset(new Fiber(task.cb));                 }                 task.reset();                 cb_fiber->swapIn();                 --m_activeThreadCount;                 if(cb_fiber->getState() == Fiber::READY) {                     schedule(task.fiber);                 } else if(cb_fiber->getState() != Fiber::TERM                     && cb_fiber->getState() != Fiber::EXCEPT) {                     cb_fiber->setState(Fiber::HOLD);                 }                 // 让fiber资源尽早释放,不用等到下个cb来的时候再释放                 cb_fiber.reset();             } else {                 // 进到这个分支情况一定是任务队列空了,调度idle协程即可                 if (idle_fiber->getState() == Fiber::TERM) {                     // 如果调度器没有调度任务,那么idle协程会不停地切换调度协程和idle协程,不会结束,                     //如果idle协程结束了,那一定是调度器停止了,在stop函数中控制                     SYLAR_LOG_DEBUG(g_logger) << "idle fiber term";                     break;                 }                  ++m_idleThreadCount;                 idle_fiber->swapIn();                 --m_idleThreadCount;                 if(idle_fiber->getState() != Fiber::TERM                     && idle_fiber->getState() != Fiber::EXCEPT) {                     idle_fiber->setState(Fiber::HOLD);                 }             }         }         // SYLAR_LOG_DEBUG(g_logger) << t_scheduler_fiber.use_count();         // t_scheduler_fiber = nullptr;     } 
  • stop() 停止调度器
void Scheduler::stop() {         SYLAR_LOG_DEBUG(g_logger) << "stop";         if (stopping()) {             return;         }         // 这里控制会控制idle协程的退出         m_stopping = true;         /// 只能由创建线程发起scheduler的线程发起stop         // 当m_useCaller为true时,创建线程发起scheduler的线程中GetThis() == this         // 当m_useCaller为false时,创建线程发起scheduler的线程中GetThis() == nullptr         if (m_useCaller) {             SYLAR_ASSERT(GetThis() == this);         } else {             SYLAR_ASSERT(GetThis() != this);         }          for (size_t i = 0; i < m_thread_nums; i++) {             tickle();         }          // 这里的主要目的,use_caller是ture的时候,在这里让caller的调度协程进行任务调度         // 如果将这里删除,那么caller线程永远不会进行任务调度         if (m_scheduleFiber) {             tickle();             m_scheduleFiber->swapIn();         }          // 目的是防止调度线程先执行完,销毁了Scheduler对象         // 即使使用detach也是不可以的,这里涉及到多个线程使用Scheduler对象,如果线程还没退出对象销毁了,这个时候就会造成段错误         std::vector<Thread::ptr> thrs;         {             MutexType::Lock lock(m_lock);             thrs.swap(m_threads);         }         for (auto &i : thrs) {             i->join();         }         SYLAR_LOG_DEBUG(g_logger) << t_scheduler_fiber.use_count();     } 

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!