从源码剖析Java线程池的工作机制

avatar
作者
筋斗云
阅读量:0

文章目录


从源码剖析Java线程池的工作机制

一、序言

在多核处理器环境下,多线程与并发编程已经成为提升程序响应速度和吞吐量的关键手段,对于Java工程师而言,深入理解多线程与并发编程内部机制,是构建高性能、高可用系统的基石。

本文小豪将带大家深入解读Java线程池ThreadPoolExecutor的工作原理,从源码分析其任务处理流程,掌握线程池内部机制,帮助我们更好地应用Java线程池。

文章最后附有流程图,进一步帮我们梳理业务逻辑

二、基础概念

在上一篇【从JDK源码探究Java线程与操作系统的交互】一文中,介绍到Java中线程的底层源码实现,而我们在开发中,通常不太会直接创建线程去使用,大多数仍是通过采用线程池来统一创建、管理线程。

线程池是一种用于管理和复用线程的机制,通过预创建一定数量的线程,来减少线程创建和销毁的开销,提高程序的性能和响应速度。

1、线程调度模型

在Java中,线程池的实现建立在Executor框架之上的,JDK为我们提供了一系列Executor框架的接口、实现类等,便于我们创建使用线程池。

同时,在上一篇我们了解到Java线程对应着操作系统内核级线程,且为一对一映射关系,由操作系统调度所有线程并分配给CPU去执行,这里我们可以认为,Java线程由两层线程调度模型组成:

  • 上层:Executor框架调度Java线程
  • 下层:操作系统调度内核级线程

这里的Executor框架是java.util.concurrent包的一部分,它是实现线程池的顶层接口,Executor接口只有一个方法execute(Runnable command),用于执行一个Runnable任务,其主要目的就是提供一个提交任务的机制,不需要我们直接管理线程的创建和生命周期。

Executor接口下层实现类结构关系如下:

在这里插入图片描述

2、线程池创建方式

基于Executor框架,我们一般创建线程池常用两种方式:通过Executors工具类ThreadPoolExecutor构造方法

(1)Executors工具类

Executors是JDK提供的创建线程池工具类,利于简化线程池的创建,内部提供了大量静态方法,这里列举几个常见的,源码如下:

public class Executors {      // 创建一个固定大小的线程池     public static ExecutorService newFixedThreadPool(int nThreads) {         return new ThreadPoolExecutor(nThreads, nThreads,                 0L, TimeUnit.MILLISECONDS,                 new LinkedBlockingQueue<Runnable>());     }      // 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行     public static ExecutorService newSingleThreadExecutor() {         return new FinalizableDelegatedExecutorService                 (new ThreadPoolExecutor(1, 1,                         0L, TimeUnit.MILLISECONDS,                         new LinkedBlockingQueue<Runnable>()));     }      // 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程     public static ExecutorService newCachedThreadPool() {         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                 60L, TimeUnit.SECONDS,                 new SynchronousQueue<Runnable>());     }      // 创建一个可以执行延迟任务的线程池,支持定时及周期性任务执行     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {         return new ScheduledThreadPoolExecutor(corePoolSize);     }      } 

但现在大家早已不用Executors工具类来创建线程池了,太死板而且有内存溢出OOM风险,相信没有小伙伴还不知道这一点,具体原因为何大家可阅读阿里的Java开发手册。

内存溢出及内存泄漏小豪在之前有写过一篇【深入JVM:线上内存泄漏问题诊断与处理】,里面详细介绍了内存泄漏的检测方法和解决策略。

(2)ThreadPoolExecutor构造方法

根据Executor接口的类结构关系,以及Executors工具类中静态方法的源码,不难发现底层都是通过ThreadPoolExecutor对象的去实现的。

一般开发中,也基本都是通过实例化ThreadPoolExecutor类的方式,通过其构造方法设置线程池参数,自定义创建线程池,这种方法相对来说更灵活,也规避资源耗尽的风险。

public ThreadPoolExecutor(int corePoolSize,                           int maximumPoolSize,                           long keepAliveTime,                           TimeUnit unit,                           BlockingQueue<Runnable> workQueue,                           ThreadFactory threadFactory,                           RejectedExecutionHandler handler) {     // 参数校验     if (corePoolSize < 0 ||             maximumPoolSize <= 0 ||             maximumPoolSize < corePoolSize ||             keepAliveTime < 0)         throw new IllegalArgumentException();     if (workQueue == null || threadFactory == null || handler == null)         throw new NullPointerException();     this.acc = System.getSecurityManager() == null ?             null :             AccessController.getContext();     // 入参赋值给成员变量     this.corePoolSize = corePoolSize;     this.maximumPoolSize = maximumPoolSize;     this.workQueue = workQueue;     this.keepAliveTime = unit.toNanos(keepAliveTime);     this.threadFactory = threadFactory;     this.handler = handler; } 
2.1 核心参数

ThreadPoolExecutor构造方法一共有7个参数,分别是:

  • ① 核心线程数(corePoolSize)

    线程池中的核心线程数量,用于定义最少的线程个数,即使空闲默认也不会被销毁(除非设置allowCoreThreadTimeOut属性,允许核心线程超时销毁)。

    创建线程池后,线程池中默认也没有核心线程的,等提交任务之后才会优先创建核心线程,另外,也可以设置创建线程池之后立即初始化核心线程

  • ② 最大线程数(maximumPoolSize)

    线程池中的允许的最大线程数量,当核心线程全部在繁忙且任务队列存满之后,线程池会临时追加非核心线程,直到总线程数达到该值上限

  • ③ 线程空闲等待时间(keepAliveTime)

    线程的空闲等待时间,当线程空闲时间超过配置的等待时间,线程就会被销毁,默认销毁的是非核心线程,如果设置核心线程也可被销毁,则核心线程空闲时间超过等待时间也会被销毁

  • ④ 空闲等待时间单位(unit)

    线程空闲等待时间的单位,秒、分、小时等

  • ⑤ 任务队列(workQueue)

    存放待执行的任务的队列(采用阻塞队列),当核心线程均繁忙时,新提交的任务会存放到该任务队列中,等待被线程执行

  • ⑥ 线程工厂(threadFactory)

    线程工厂,用来创建线程,通常用它来自定义线程名称

  • ⑦ 拒绝策略(handler)

    线程池中的拒绝(饱和)策略,当任务太多无法处理时(任务队列workQueue已满,正在执行的线程数达到maximumPoolSize),采用哪种拒绝策略去处理新任务。

2.3 拒绝策略实现

ThreadPoolExecutor构造方法中的拒绝策略参数,我们需要额外注意一下。

JDK提供了四种内置的拒绝策略:

  • AbortPolicy:丢弃任务并抛出RejectedExecutionException异常,线程池默认的拒绝策略

    public static class AbortPolicy implements RejectedExecutionHandler {     public AbortPolicy() {     }     // 直接抛出异常     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {         throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString());     } } 
  • DiscardPolicy:直接丢弃任务

    public static class DiscardPolicy implements RejectedExecutionHandler {     public DiscardPolicy() {     }     // 什么都没有处理     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {     } } 
  • DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务

    public static class DiscardOldestPolicy implements RejectedExecutionHandler {     public DiscardOldestPolicy() {     }     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {         if (!e.isShutdown()) {             // 弹出最先进入阻塞队列中的任务(即时间最长的)             e.getQueue().poll();             // 执行当前新的任务             e.execute(r);         }     } } 
  • CallerRunsPolicy:由提交任务的线程去执行当前任务

    public static class CallerRunsPolicy implements RejectedExecutionHandler {     public CallerRunsPolicy() {     }     public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {         if (!e.isShutdown()) {             // 由调度线程执行该任务             r.run();         }     } } 

同时也支持我们定制化拒绝策略,实现RejectedExecutionHandler接口,自定义拒绝策略:

public class CustomRejectedExecution implements RejectedExecutionHandler {     @Override     public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {         // 自定义拒绝策略         // 存放至缓存...     } } 

自定义拒绝策略一般使用较多,当任务量较大时,我们可以先将拒绝的任务放入缓存或数据库,待压力较小时再依次消费处理。

三、源码剖析

熟悉过线程池的基础概念之后,我们开始分析ThreadPoolExecutor线程池处理任务流程的源码实现。

在上文我们了解到,顶层Executor接口的execute()方法用来执行Runnable任务,在ThreadPoolExecutor中,也是通过execute()方法提交任务并执行,因此,我们从execute()方法作为入口,分析其源码。

在此之前,还有一个概念需要了解,一个成员变量ctl

1、状态控制变量ctl

ThreadPoolExecutor中,将线程池状态和工作线程数采用一个原子整数成员变量ctl进行存储:

public class ThreadPoolExecutor extends AbstractExecutorService {      // (重要)线程池状态控制变量     // 两个含义:高3位代表当前线程池状态、低29位代表当前工作线程数     private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));     // 1字节等于8位,int类型是占用4字节,COUNT_BITS = 32 - 4 = 29     private static final int COUNT_BITS = Integer.SIZE - 3;     // 线程池容量,1左移29位再减1,CAPACITY约等于5亿多     private static final int CAPACITY = (1 << COUNT_BITS) - 1;      // 线程池状态     // RUNNING状态,可以处理新的任务和阻塞队列中的任务     // 位图111...     private static final int RUNNING = -1 << COUNT_BITS;     // SHUTDOWN状态,不再处理新任务,但会处理阻塞队列中的任务     // 位图000...     private static final int SHUTDOWN = 0 << COUNT_BITS;     // STOP状态,不再处理新任务,且不处理阻塞队列中的任务     // 位图001...     private static final int STOP = 1 << COUNT_BITS;     // TIDYING状态,过度状态,所有任务都已经执行完毕,即将关闭线程池     // 位图010...     private static final int TIDYING = 2 << COUNT_BITS;     // TERMINATED状态,线程终止     // 位图011...     private static final int TERMINATED = 3 << COUNT_BITS;      // 获取线程状态,根据&运算特点,始终获取ctl高3位,即当前线程池状态     private static int runStateOf(int c) {         return c & ~CAPACITY;     }      // 计算工作线程数,根据&运算特点,始终获取ctl低29位,即当前工作线程数     private static int workerCountOf(int c) {         return c & CAPACITY;     }      private static int ctlOf(int rs, int wc) {         return rs | wc;     }          // 其它代码省略...... } 

这里的状态控制变量ctl设计的十分巧妙,通过一个AtomicInteger类型存储了两个变量值,AtomicInteger类型占用4字节32位,操作时都是基于位运算,高3位代表当前线程池状态、低29位代表当前工作线程数

同时这里也确保了共享变量的线程安全,利用AtomicInteger的原子操作,确保线程池状态和工作线程数始终是保持统一的。

不懂位运算以及这里为何可以通过runStateOf()workerCountOf()方法获取到线程状态和工作线程数的小伙伴,可以自己写个Demo验证一下这个结论,本篇着重分析线程池工作流程的源码,这里不做过多赘述。

小豪读到这里时也不得不感叹Java并发编程大师Doug Lea确实流弊!

2、线程执行execute方法

通过状态控制变量ctl,我们得到两个方法:

  1. runStateOf(int c):获取当前线程状态
  2. workerCountOf(int c):获取当前工作线程数

接着我们正式进入ThreadPoolExecutorexecute方法,源码如下:

public void execute(Runnable command) {     // 参数验证,判断任务是否为空,为空则抛出空指针异常     if (command == null)         throw new NullPointerException();      // 获取状态控制变量clt的值,用于后续获取线程池状态和工作线程数量     int c = ctl.get();     // 阶段一:启动核心线程 -----------------------------------------     // 判断当前工作线程数小于核心线程数corePoolSize     if (workerCountOf(c) < corePoolSize) {         // 创建一个核心线程,并把任务添加到该线程中运行         // addWorker()方法:参数二为true,代表添加核心线程;参数二为false,代表添加非核心线程         if (addWorker(command, true))             return;         // 若添加失败,则重新获取控制变量clt的值;         c = ctl.get();     }     // 走到这里,说明workerCountOf(c) >= corePoolSize,核心线程数已满      // 阶段二:核心线程数已满,任务添加至阻塞队列 -----------------------     // 判断当前线程池是否为运行状态 且 新任务添加到阻塞队列成功(即阻塞队列未满)     if (isRunning(c) && workQueue.offer(command)) {         // 重新获取ctl的值         int recheck = ctl.get();         // 再次检验线程池是否是运行状态         // 如果不是运行状态,由于已经把任务添加到workQueue阻塞队列中了,此时需移除该任务         if (!isRunning(recheck) && remove(command))             // 线程池不是运行状态 且 移除任务成功,触发拒绝策略             reject(command);         // 线程池是运行状态         // 获取线程池中的有效线程数,如果是0,则新建一个非核心线程(确保线程池在RUNNING状态下至少有一个线程执行任务)         else if (workerCountOf(recheck) == 0)             // addWorker()方法:参数一为null,代表在线程池中新创建一个线程,但没有传入任务             // 任务已经被添加到workQueue阻塞队列中,新建的非核心线程会从workQueue中获取任务来执行             addWorker(null, false);     }     // 走到这里,说明两种情况:     // 1.线程池已经不是RUNNING状态     // 2.线程池是RUNNING状态,但workQueue阻塞队列已满(核心线程数在之前也以及满了)      // 阶段三:阻塞队列已满,启动非核心线程 -----------------------------     // 再次调用addWorker()方法,参数二为false,代表创建一个非核心线程     else if (!addWorker(command, false))         // 创建非核心线程失败,执行拒绝策略(工作线程数已达到maximumPoolSize最大线程数的限制)         reject(command); } 

在源码中,execute方法大致包含三个阶段:

  • 阶段一:当前工作线程数小于核心线程数,创建核心线程
  • 阶段二:核心线程数已满(当前工作线程数大于等于核心线程数),任务添加至阻塞队列
  • 阶段三:阻塞队列已满(且核心线程数已满),创建非核心线程,若非核心线程已满,则执行拒绝策略

execute方法的整体逻辑比较清晰,其中多次出现addWorker()方法,我们继续往下走。

3、新增线程addWorker方法

addWorker()方法主要用于新增工作线程,其主要参数有两个:

  1. 参数一:Runnable firstTask 任务实例
    • 不为null:创建线程并启动传入的任务实例
    • 为null:创建线程,但不启动任务,后续从阻塞队列中获取任务
  2. 参数二:boolean core 是否核心线程
    • true:创建核心线程
    • false:创建非核心线程

具体源码如下:

private boolean addWorker(Runnable firstTask, boolean core) {     // 标志位     retry:     // 外层死循环     for (; ; ) {         // 获取状态控制变量clt的值         int c = ctl.get();         // 获取当前线程池状态         int rs = runStateOf(c);          // 判断是否需要新创建工作线程         // 如果当前线程池非运行状态(rs >= SHUTDOWN,代表状态为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一)         // 并且         // 当前线程池状态>SHUTDOWN 或 新任务不为空 或 阻塞队列为空         // 均不需要创建新的工作线程,直接返回false         if (rs >= SHUTDOWN &&                 !(rs == SHUTDOWN &&                         firstTask == null &&                         !workQueue.isEmpty()))             return false;         // !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())         // 这段代码逻辑有点绕,其实就是线程池为SHUTDOWN时,会停止接收新任务,但还是会处理完阻塞队列中的任务,如过是这种情况,这里就不会返回false          // 内层死循环         // 采用CAS,将工作线程个数+1         for (; ; ) {             // 获取当前线程池工作线程数             int wc = workerCountOf(c);             // 判断当前工作线程数是否符合要求             // 当前工作线程数是否>=线程池容量 或 根据参数二core判断当前工作线程数是否>=核心线程数/最大线程数             if (wc >= CAPACITY ||                     wc >= (core ? corePoolSize : maximumPoolSize))                 return false;             // 尝试将线程池工作线程数量+1             if (compareAndIncrementWorkerCount(c))                 // 成功则退回标志位,即结束外层循环                 break retry;             // workerCount线程数量添加失败,重新获取控制变量ctl的值             c = ctl.get();             // 如果当前线程池的运行状态不等于上面rs,代表在此期间线程池运行状态已经发生改变             if (runStateOf(c) != rs)                 //返回外层for循环继续执行                 continue retry;             // 若仍未添加数量成功,则CAS自旋重新尝试添加         }     }      // 创建两个状态标记     // 标记线程是否启动     boolean workerStarted = false;     // 标记线程是否添加     boolean workerAdded = false;     Worker w = null;     try {         // 将firstTask当前任务封装为Worker对象         w = new Worker(firstTask);         // 拿到worker对象创建的线程         final Thread t = w.thread;         if (t != null) {             // 加锁ReentrantLock,确保线程安全             final ReentrantLock mainLock = this.mainLock;             mainLock.lock();             try {                 // 获取当前线程池状态                 int rs = runStateOf(ctl.get());                  // 校验当前线程池状态                 // rs < SHUTDOWN 代表当前线程是否为RUNNING运行状态                 // 或                 // 线程池状态为SHUTDOWN状态 且 firstTask新任务为null(线程池状态SHUTDOWN时,仍会执行workQueue中的任务)                 if (rs < SHUTDOWN ||                         (rs == SHUTDOWN && firstTask == null)) {                     // 判断当前线程已经还存活                     if (t.isAlive())                         throw new IllegalThreadStateException();                     // 添加到工作线程集合workers中                     // workers是一个HashSet集合,线程不安全,因此上面需要加锁                     workers.add(w);                     int s = workers.size();                     // 统计largestPoolSize线程池中的最大线程数量                     if (s > largestPoolSize)                         // 更新最大值                         largestPoolSize = s;                     // worker线程添加成功,标记线程已添加                     workerAdded = true;                 }             } finally {                 // 解锁                 mainLock.unlock();             }             // 判断线程是否添加成功             if (workerAdded) {                 // 成功则启动线程                 t.start();                 // 标记worker线程已启动                 workerStarted = true;             }         }     } finally {         // 判断线程是否未启动成功         if (!workerStarted)             // 在workers集合中移除该worker             addWorkerFailed(w);     }     // 返回worker线程是否启动成功     return workerStarted; } 

观察源码发现,addWorker()方法主要分为两步:

  • 校验线程池状态和工作线程数

    • 首先判断当前线程池状态是否非运行状态,并且当前线程池状态不为SHUTDOWN或新任务不为空或阻塞队列为空,这两种情况均不需要创建新的工作线程,直接返回false
    • 紧接着判断当前工作线程数是否符合要求,是否超过线程池容量或核心/最大线程数(根据参数二core判断),超过则直接返回false
    • 否则通过CAS不断尝试将工作线程数量+1,成功则跳出循环,执行下一步
  • 创建worker线程对象并启动线程

    • 创建worker对象,封装firstTask任务,采用独占锁将worker工作线程对象放入工作队列workers集合,同时更新最大线程数的值,最后启动线程。

      // 将firstTask当前任务封装为Worker对象 w = new Worker(firstTask) 
    • 若线程启动失败则调用addWorkerFailed()方法在workers集合中移除该worker对象

4、工作线程Worker内部类

ThreadPoolExecutor中,线程统一被封装为Worker内部类,即具体的工作线程类,Worker继承自AQS并实现了Runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {      private static final long serialVersionUID = 6138294804551838833L;      // 当前的线程实例     final Thread thread;      // 当前的任务实例     Runnable firstTask;      // 记录线程完成的任务数     volatile long completedTasks;      // 构造方法,入参为firstTask任务实例     Worker(Runnable firstTask) {         // (重要)设置worker线程状态为-1,表示禁止线程中断         setState(-1);         // 赋值成员变量,保存firstTask任务实例         this.firstTask = firstTask;         // 通过线程工厂创建新线程(传入当前对象),真正的线程         this.thread = getThreadFactory().newThread(this);     }      public void run() {         // (重要)线程启动时执行的任务         runWorker(this);     }      // 是否持有独占锁     protected boolean isHeldExclusively() {         return getState() != 0;     }      // 尝试获取独占锁     protected boolean tryAcquire(int unused) {         if (compareAndSetState(0, 1)) {             setExclusiveOwnerThread(Thread.currentThread());             return true;         }         return false;     }      // 尝试释放独占锁     protected boolean tryRelease(int unused) {         setExclusiveOwnerThread(null);         setState(0);         return true;     }      public void lock() {acquire(1);}      public boolean tryLock() {return tryAcquire(1);}      public void unlock() {release(1);}      public boolean isLocked() {return isHeldExclusively();}      // 线程启动后,进行线程中断时执行方法     void interruptIfStarted() {         Thread t;         // worker中状态 >= 0 且 线程不为空 且 当前线程未中断         if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {             try {                 // 中断线程                 t.interrupt();             } catch (SecurityException ignore) {             }         }     } } 

Worker类的构造方法首先会通过setState(-1)设置线程状态state-1,当state-1时,代表线程此时不能被中断,这里采用的是**AQS的独占锁模式**,只有当state不为-1(独占锁被释放),线程才被允许中断,具体中断逻辑在interruptIfStarted()方法。

在创建完worker工作线程对象后,会启动该工作线程,调用wokerrun()方法,run()方法内部又调用了ThreadPoolExecutor类的runWorker()方法。

5、启动线程runWorker方法

runWorker()方法会执行具体的任务,源码如下:

final void runWorker(Worker w) {     // 获取当前线程     Thread wt = Thread.currentThread();     // 获取worker构造方法传入的任务实例     Runnable task = w.firstTask;     w.firstTask = null;     // 解锁,设置worker状态state为0,表示允许线程中断     w.unlock();     // 标识线程退出原因,true代表线程异常退出,false代表线程正常结束     boolean completedAbruptly = true;     try {         // 判断当前任务是否不为空 或 从阻塞队列获取成功         // getTask()方法从阻塞队列获取一个任务         while (task != null || (task = getTask()) != null) {             // 加锁处理并发问题,防止shutdown()时终止正在执行的worker             w.lock();             // 判断如果线程池状态 >= STOP 且线程未被中断             if ((runStateAtLeast(ctl.get(), STOP) ||                     (Thread.interrupted() &&                             runStateAtLeast(ctl.get(), STOP))) &&                     !wt.isInterrupted())                 // 中断线程                 wt.interrupt();             try {                 // 可自定义实现的扩展点,执行前的扩展方法                 beforeExecute(wt, task);                 Throwable thrown = null;                 try {                     // 执行任务,对应任务的run()方法                     task.run();                 } catch (RuntimeException x) {                     thrown = x;                     throw x;                 } catch (Error x) {                     thrown = x;                     throw x;                 } catch (Throwable x) {                     thrown = x;                     throw new Error(x);                 } finally {                     // 扩展点,执行后的扩展方法                     afterExecute(task, thrown);                 }             } finally {                 // 任务对象置空                 task = null;                 // 完成的工作任务数+1                 w.completedTasks++;                 // 解锁                 w.unlock();             }         }         // 标识线程为正常结束退出         completedAbruptly = false;     } finally {         // 处理worker线程退出         processWorkerExit(w, completedAbruptly);     } } 

其内部流程主要如下:

  1. 首先解锁worker,允许线程中断
  2. 接着通过while循环判断当前任务是否为空,若为空则调用getTask()方法从阻塞队列中获取待处理的任务
  3. 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程
  4. 执行对象任务的run()方法,在任务执行前后均提供扩展点可自定义扩展
  5. 重复第2步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用processWorkerExit()方法处理worker线程退出

这里我们也发现,线程池中的线程复用实际上就是通过runWorker()方法中while循环实现的,worker工作线程不断的从阻塞队列中获取新的任务,直接调用任务的run()方法,避免去创建新线程,这一过程实现了线程复用

6、获取阻塞队列getTask方法

getTask()方法用来获取阻塞队列中的任务,源码如下:

private Runnable getTask() {     // 标记上一次从阻塞队列获取任务是否超时     boolean timedOut = false;      // 死循环     for (; ; ) {         // 获取ctl值         int c = ctl.get();         // 获取当前线程池状态         int rs = runStateOf(c);          // 判断是否需获取任务         // 若线程池状态 >= STOP 或 (线程池状态为SHUTDOWN 且 阻塞队列为空),则不需要再处理任务         if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {             // 工作线程数-1             decrementWorkerCount();             // 直接返回             return null;         }          // 获取当前工作线程数         int wc = workerCountOf(c);          // timed标记用于判断是否需进行超时控制         // allowCoreThreadTimeOut是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁)         // wc > corePoolSize当前工作线程数大于核心线程数         boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;          // 工作线程数 > maximumPoolSize         // 或         // timed && timedOut,timedOut首次进入默认为false         if ((wc > maximumPoolSize || (timed && timedOut))                 // 工作线程数大于1                 // 或                 // 阻塞队列为空                 && (wc > 1 || workQueue.isEmpty())) {             // 采用CAS,工作线程数-1,并且返回null             if (compareAndDecrementWorkerCount(c))                 return null;             continue;         }         // 执行到这,说明前面线程池状态和工作线程数校验已通过,开始从阻塞队列获取任务         try {             // timed为true代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程)             // timed = true:超时等待获取任务poll(),超时未获取到则返回null             // timed = false:阻塞获取任务take(),一直等着直到获取到任务             Runnable r = timed ?                     workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                     workQueue.take();             // 判断是否获取到任务             if (r != null)                 // 获取到直接返回                 return r;             // 未获取到,代表等待超时也没有拿到任务,设置timedOut值为true             timedOut = true;         } catch (InterruptedException retry) {             // 即使异常也循环重试             timedOut = false;         }     } } 

大致过程其实就是从workQueue阻塞队列中获取任务,在获取之前也会校验线程池状态和工作线程数是否合规,最后根据条件选择超时等待获取还是阻塞获取。

7、退出线程processWorkerExit方法

runWorker()方法执行结束之后在finally代码块中会调用processWorkerExit()方法,主要用来销毁空闲线程,具体源码如下:

private void processWorkerExit(Worker w, boolean completedAbruptly) {     // completedAbruptly:true代表线程异常退出,false代表线程正常结束     if (completedAbruptly)         // 异常退出,工作线程数-1         decrementWorkerCount();      // 加锁     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         // 统计当前worker完成的任务数,累加到线程池总完成任务数         completedTaskCount += w.completedTasks;         // 从线程池HastSet集合中移除当前工作线程         workers.remove(w);     } finally {         // 解锁         mainLock.unlock();     }      // 判断是否终止线程池     tryTerminate();      // 获取ctl值     int c = ctl.get();     // 当前线程池状态 < STOP,即RUNNING或SHUTDOWN状态时     if (runStateLessThan(c, STOP)) {         // 判断线程是否为正常结束         if (!completedAbruptly) {             // 正常结束退出             // allowCoreThreadTimeOut是否运行核心线程超时销毁             // 若允许,min最小线程数为0,不允许则为corePoolSize核心线程数             int min = allowCoreThreadTimeOut ? 0 : corePoolSize;             // 判断workQueue阻塞队列是否不为空             if (min == 0 && !workQueue.isEmpty())                 // 确保至少有一个worker线程                 min = 1;             //判断工作线程数 >= min最小线程数             if (workerCountOf(c) >= min)                 // 符合条件直接返回                 return;         }         // 若线程为异常退出或不符合条件(工作线程数 < min最小线程数)         // 创建一个非核心线程执行任务         addWorker(null, false);     } } 

源码中,会将worker工作线程从线程池HastSet集合中移除,最后也会确保当前工作线程数不小于最小线程数,通过processWorkerExit()方法,控制了线程池中线程数量的大小变化

8、线程池关闭

线程池关闭有两种方法,分别是shutdown()shutdownNow()

(1)shutdown方法

shutdown()方法首先会将线程池状态设置为SHUTDOWN,不再接收新的任务。

内部正在执行的任务和阻塞队列中的任务都会继续执行,待均执行完毕之后会关闭线程池。

具体源码如下:

public void shutdown() {     // 加锁     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         // 安全检查         checkShutdownAccess();         // 设置线程池状态为SHUTDOWN         advanceRunState(SHUTDOWN);         // 中断线程池中空闲的线程         interruptIdleWorkers();         onShutdown();     } finally {         mainLock.unlock();     }     // 判断是否终止线程池     tryTerminate(); } 
(2)shutdownNow方法

shutdownNow()方法则是将线程池状态设置为STOP,不再接收新的任务。

但所有线程都会被中断(已经拿到任务正在执行的不会被中断),且阻塞队列中的任务会被丢弃,不再执行了,并且返回未执行的任务队列。

public List<Runnable> shutdownNow() {     List<Runnable> tasks;     final ReentrantLock mainLock = this.mainLock;     mainLock.lock();     try {         checkShutdownAccess();         // 设置线程池状态为STOP         advanceRunState(STOP);         // 中断线程池中所有线程         interruptWorkers();         // 清空阻塞队列,赋值给tasks返回         tasks = drainQueue();     } finally {         mainLock.unlock();     }     // 判断是否终止线程池     tryTerminate();     // 返回阻塞队列中未被执行的任务     return tasks; } 

四、流程图

在这里插入图片描述

五、后记

本文着重带大家分析了ThreadPoolExecutor线程池处理任务流程的源码实现,过程中额外介绍了线程池的创建方式以及核心参数,详细通过本文,大家更深入的理解了线程池的原理和工作机制。

但在我们实际开发中,使用线程池主要关注线程池大小的设置,合理配置线程池大小能够更高的提升程序性能和稳定性,国内通用的共识即根据CPU密集型和IO密集型来设定线程数,这里面其实是基于任务的阻塞系数来商榷增减线程数,一般阻塞时间占比越高,设置越多线程,CPU计算时间所占越高,设置越少线程

网上有很多配置线程池大小的公式,但这些公式无法适用于所有业务场景,我们只能根据实际场景,通过压测及性能监控等手段,不断的调整线程池大小,确定出合适的线程数量,实现最高的任务处理效率。

下一篇,小豪将会继续更新Java多线程与并发编程相关内容,创作不易,如果大家觉得内容对你有收获,不妨考虑关注关注小豪~

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!