文章目录
从源码剖析Java线程池的工作机制
一、序言
在多核处理器环境下,多线程与并发编程已经成为提升程序响应速度和吞吐量的关键手段,对于Java工程师而言,深入理解多线程与并发编程内部机制,是构建高性能、高可用系统的基石。
本文小豪将带大家深入解读Java线程池ThreadPoolExecutor
的工作原理,从源码分析其任务处理流程,掌握线程池内部机制,帮助我们更好地应用Java线程池。
文章最后附有流程图,进一步帮我们梳理业务逻辑
二、基础概念
在上一篇【从JDK源码探究Java线程与操作系统的交互】一文中,介绍到Java中线程的底层源码实现,而我们在开发中,通常不太会直接创建线程去使用,大多数仍是通过采用线程池来统一创建、管理线程。
线程池是一种用于管理和复用线程的机制,通过预创建一定数量的线程,来减少线程创建和销毁的开销,提高程序的性能和响应速度。
1、线程调度模型
在Java中,线程池的实现建立在Executor
框架之上的,JDK为我们提供了一系列Executor
框架的接口、实现类等,便于我们创建使用线程池。
同时,在上一篇我们了解到Java线程对应着操作系统内核级线程,且为一对一映射关系,由操作系统调度所有线程并分配给CPU去执行,这里我们可以认为,Java线程由两层线程调度模型组成:
- 上层:Executor框架调度Java线程
- 下层:操作系统调度内核级线程
这里的Executor
框架是java.util.concurrent
包的一部分,它是实现线程池的顶层接口,Executor
接口只有一个方法execute(Runnable command)
,用于执行一个Runnable
任务,其主要目的就是提供一个提交任务的机制,不需要我们直接管理线程的创建和生命周期。
Executor
接口下层实现类结构关系如下:
2、线程池创建方式
基于Executor
框架,我们一般创建线程池常用两种方式:通过Executors工具类或ThreadPoolExecutor构造方法。
(1)Executors工具类
Executors
是JDK提供的创建线程池工具类,利于简化线程池的创建,内部提供了大量静态方法,这里列举几个常见的,源码如下:
public class Executors { // 创建一个固定大小的线程池 public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } // 创建一个单线程的线程池,它只会用唯一的工作线程来执行任务,保证所有任务按照指定顺序(FIFO)执行 public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 创建一个可缓存的线程池,如果线程池长度超过处理需要,可灵活回收空闲线程,若无可回收,则新建线程 public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 创建一个可以执行延迟任务的线程池,支持定时及周期性任务执行 public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } }
但现在大家早已不用Executors
工具类来创建线程池了,太死板而且有内存溢出OOM风险,相信没有小伙伴还不知道这一点,具体原因为何大家可阅读阿里的Java开发手册。
内存溢出及内存泄漏小豪在之前有写过一篇【深入JVM:线上内存泄漏问题诊断与处理】,里面详细介绍了内存泄漏的检测方法和解决策略。
(2)ThreadPoolExecutor构造方法
根据Executor
接口的类结构关系,以及Executors
工具类中静态方法的源码,不难发现底层都是通过ThreadPoolExecutor
对象的去实现的。
一般开发中,也基本都是通过实例化ThreadPoolExecutor
类的方式,通过其构造方法设置线程池参数,自定义创建线程池,这种方法相对来说更灵活,也规避资源耗尽的风险。
public ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) { // 参数校验 if (corePoolSize < 0 || maximumPoolSize <= 0 || maximumPoolSize < corePoolSize || keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.acc = System.getSecurityManager() == null ? null : AccessController.getContext(); // 入参赋值给成员变量 this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }
2.1 核心参数
ThreadPoolExecutor
构造方法一共有7个参数,分别是:
① 核心线程数(corePoolSize)
线程池中的核心线程数量,用于定义最少的线程个数,即使空闲默认也不会被销毁(除非设置
allowCoreThreadTimeOut
属性,允许核心线程超时销毁)。创建线程池后,线程池中默认也没有核心线程的,等提交任务之后才会优先创建核心线程,另外,也可以设置创建线程池之后立即初始化核心线程
② 最大线程数(maximumPoolSize)
线程池中的允许的最大线程数量,当核心线程全部在繁忙且任务队列存满之后,线程池会临时追加非核心线程,直到总线程数达到该值上限
③ 线程空闲等待时间(keepAliveTime)
线程的空闲等待时间,当线程空闲时间超过配置的等待时间,线程就会被销毁,默认销毁的是非核心线程,如果设置核心线程也可被销毁,则核心线程空闲时间超过等待时间也会被销毁
④ 空闲等待时间单位(unit)
线程空闲等待时间的单位,秒、分、小时等
⑤ 任务队列(workQueue)
存放待执行的任务的队列(采用阻塞队列),当核心线程均繁忙时,新提交的任务会存放到该任务队列中,等待被线程执行
⑥ 线程工厂(threadFactory)
线程工厂,用来创建线程,通常用它来自定义线程名称
⑦ 拒绝策略(handler)
线程池中的拒绝(饱和)策略,当任务太多无法处理时(任务队列
workQueue
已满,正在执行的线程数达到maximumPoolSize
),采用哪种拒绝策略去处理新任务。
2.3 拒绝策略实现
ThreadPoolExecutor
构造方法中的拒绝策略参数,我们需要额外注意一下。
JDK提供了四种内置的拒绝策略:
AbortPolicy:丢弃任务并抛出
RejectedExecutionException
异常,线程池默认的拒绝策略public static class AbortPolicy implements RejectedExecutionHandler { public AbortPolicy() { } // 直接抛出异常 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { throw new RejectedExecutionException("Task " + r.toString() + " rejected from " + e.toString()); } }
DiscardPolicy:直接丢弃任务
public static class DiscardPolicy implements RejectedExecutionHandler { public DiscardPolicy() { } // 什么都没有处理 public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { } }
DiscardOldestPolicy:丢弃阻塞队列中最靠前的任务
public static class DiscardOldestPolicy implements RejectedExecutionHandler { public DiscardOldestPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 弹出最先进入阻塞队列中的任务(即时间最长的) e.getQueue().poll(); // 执行当前新的任务 e.execute(r); } } }
CallerRunsPolicy:由提交任务的线程去执行当前任务
public static class CallerRunsPolicy implements RejectedExecutionHandler { public CallerRunsPolicy() { } public void rejectedExecution(Runnable r, ThreadPoolExecutor e) { if (!e.isShutdown()) { // 由调度线程执行该任务 r.run(); } } }
同时也支持我们定制化拒绝策略,实现RejectedExecutionHandler
接口,自定义拒绝策略:
public class CustomRejectedExecution implements RejectedExecutionHandler { @Override public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) { // 自定义拒绝策略 // 存放至缓存... } }
自定义拒绝策略一般使用较多,当任务量较大时,我们可以先将拒绝的任务放入缓存或数据库,待压力较小时再依次消费处理。
三、源码剖析
熟悉过线程池的基础概念之后,我们开始分析ThreadPoolExecutor
线程池处理任务流程的源码实现。
在上文我们了解到,顶层Executor
接口的execute()
方法用来执行Runnable
任务,在ThreadPoolExecutor
中,也是通过execute()
方法提交任务并执行,因此,我们从execute()
方法作为入口,分析其源码。
在此之前,还有一个概念需要了解,一个成员变量ctl
1、状态控制变量ctl
在ThreadPoolExecutor
中,将线程池状态和工作线程数采用一个原子整数成员变量ctl
进行存储:
public class ThreadPoolExecutor extends AbstractExecutorService { // (重要)线程池状态控制变量 // 两个含义:高3位代表当前线程池状态、低29位代表当前工作线程数 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 1字节等于8位,int类型是占用4字节,COUNT_BITS = 32 - 4 = 29 private static final int COUNT_BITS = Integer.SIZE - 3; // 线程池容量,1左移29位再减1,CAPACITY约等于5亿多 private static final int CAPACITY = (1 << COUNT_BITS) - 1; // 线程池状态 // RUNNING状态,可以处理新的任务和阻塞队列中的任务 // 位图111... private static final int RUNNING = -1 << COUNT_BITS; // SHUTDOWN状态,不再处理新任务,但会处理阻塞队列中的任务 // 位图000... private static final int SHUTDOWN = 0 << COUNT_BITS; // STOP状态,不再处理新任务,且不处理阻塞队列中的任务 // 位图001... private static final int STOP = 1 << COUNT_BITS; // TIDYING状态,过度状态,所有任务都已经执行完毕,即将关闭线程池 // 位图010... private static final int TIDYING = 2 << COUNT_BITS; // TERMINATED状态,线程终止 // 位图011... private static final int TERMINATED = 3 << COUNT_BITS; // 获取线程状态,根据&运算特点,始终获取ctl高3位,即当前线程池状态 private static int runStateOf(int c) { return c & ~CAPACITY; } // 计算工作线程数,根据&运算特点,始终获取ctl低29位,即当前工作线程数 private static int workerCountOf(int c) { return c & CAPACITY; } private static int ctlOf(int rs, int wc) { return rs | wc; } // 其它代码省略...... }
这里的状态控制变量ctl
设计的十分巧妙,通过一个AtomicInteger
类型存储了两个变量值,AtomicInteger
类型占用4字节32位,操作时都是基于位运算,高3位代表当前线程池状态、低29位代表当前工作线程数。
同时这里也确保了共享变量的线程安全,利用AtomicInteger
的原子操作,确保线程池状态和工作线程数始终是保持统一的。
不懂位运算以及这里为何可以通过
runStateOf()
、workerCountOf()
方法获取到线程状态和工作线程数的小伙伴,可以自己写个Demo验证一下这个结论,本篇着重分析线程池工作流程的源码,这里不做过多赘述。小豪读到这里时也不得不感叹Java并发编程大师Doug Lea确实流弊!
2、线程执行execute方法
通过状态控制变量ctl
,我们得到两个方法:
- runStateOf(int c):获取当前线程状态
- workerCountOf(int c):获取当前工作线程数
接着我们正式进入ThreadPoolExecutor
的execute
方法,源码如下:
public void execute(Runnable command) { // 参数验证,判断任务是否为空,为空则抛出空指针异常 if (command == null) throw new NullPointerException(); // 获取状态控制变量clt的值,用于后续获取线程池状态和工作线程数量 int c = ctl.get(); // 阶段一:启动核心线程 ----------------------------------------- // 判断当前工作线程数小于核心线程数corePoolSize if (workerCountOf(c) < corePoolSize) { // 创建一个核心线程,并把任务添加到该线程中运行 // addWorker()方法:参数二为true,代表添加核心线程;参数二为false,代表添加非核心线程 if (addWorker(command, true)) return; // 若添加失败,则重新获取控制变量clt的值; c = ctl.get(); } // 走到这里,说明workerCountOf(c) >= corePoolSize,核心线程数已满 // 阶段二:核心线程数已满,任务添加至阻塞队列 ----------------------- // 判断当前线程池是否为运行状态 且 新任务添加到阻塞队列成功(即阻塞队列未满) if (isRunning(c) && workQueue.offer(command)) { // 重新获取ctl的值 int recheck = ctl.get(); // 再次检验线程池是否是运行状态 // 如果不是运行状态,由于已经把任务添加到workQueue阻塞队列中了,此时需移除该任务 if (!isRunning(recheck) && remove(command)) // 线程池不是运行状态 且 移除任务成功,触发拒绝策略 reject(command); // 线程池是运行状态 // 获取线程池中的有效线程数,如果是0,则新建一个非核心线程(确保线程池在RUNNING状态下至少有一个线程执行任务) else if (workerCountOf(recheck) == 0) // addWorker()方法:参数一为null,代表在线程池中新创建一个线程,但没有传入任务 // 任务已经被添加到workQueue阻塞队列中,新建的非核心线程会从workQueue中获取任务来执行 addWorker(null, false); } // 走到这里,说明两种情况: // 1.线程池已经不是RUNNING状态 // 2.线程池是RUNNING状态,但workQueue阻塞队列已满(核心线程数在之前也以及满了) // 阶段三:阻塞队列已满,启动非核心线程 ----------------------------- // 再次调用addWorker()方法,参数二为false,代表创建一个非核心线程 else if (!addWorker(command, false)) // 创建非核心线程失败,执行拒绝策略(工作线程数已达到maximumPoolSize最大线程数的限制) reject(command); }
在源码中,execute
方法大致包含三个阶段:
- 阶段一:当前工作线程数小于核心线程数,创建核心线程
- 阶段二:核心线程数已满(当前工作线程数大于等于核心线程数),任务添加至阻塞队列
- 阶段三:阻塞队列已满(且核心线程数已满),创建非核心线程,若非核心线程已满,则执行拒绝策略
execute
方法的整体逻辑比较清晰,其中多次出现addWorker()
方法,我们继续往下走。
3、新增线程addWorker方法
addWorker()
方法主要用于新增工作线程,其主要参数有两个:
- 参数一:Runnable firstTask 任务实例
- 不为null:创建线程并启动传入的任务实例
- 为null:创建线程,但不启动任务,后续从阻塞队列中获取任务
- 参数二:boolean core 是否核心线程
- true:创建核心线程
- false:创建非核心线程
具体源码如下:
private boolean addWorker(Runnable firstTask, boolean core) { // 标志位 retry: // 外层死循环 for (; ; ) { // 获取状态控制变量clt的值 int c = ctl.get(); // 获取当前线程池状态 int rs = runStateOf(c); // 判断是否需要新创建工作线程 // 如果当前线程池非运行状态(rs >= SHUTDOWN,代表状态为SHUTDOWN、STOP、TIDYING、TERMINATED其中之一) // 并且 // 当前线程池状态>SHUTDOWN 或 新任务不为空 或 阻塞队列为空 // 均不需要创建新的工作线程,直接返回false if (rs >= SHUTDOWN && !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty())) return false; // !(rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()) // 这段代码逻辑有点绕,其实就是线程池为SHUTDOWN时,会停止接收新任务,但还是会处理完阻塞队列中的任务,如过是这种情况,这里就不会返回false // 内层死循环 // 采用CAS,将工作线程个数+1 for (; ; ) { // 获取当前线程池工作线程数 int wc = workerCountOf(c); // 判断当前工作线程数是否符合要求 // 当前工作线程数是否>=线程池容量 或 根据参数二core判断当前工作线程数是否>=核心线程数/最大线程数 if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; // 尝试将线程池工作线程数量+1 if (compareAndIncrementWorkerCount(c)) // 成功则退回标志位,即结束外层循环 break retry; // workerCount线程数量添加失败,重新获取控制变量ctl的值 c = ctl.get(); // 如果当前线程池的运行状态不等于上面rs,代表在此期间线程池运行状态已经发生改变 if (runStateOf(c) != rs) //返回外层for循环继续执行 continue retry; // 若仍未添加数量成功,则CAS自旋重新尝试添加 } } // 创建两个状态标记 // 标记线程是否启动 boolean workerStarted = false; // 标记线程是否添加 boolean workerAdded = false; Worker w = null; try { // 将firstTask当前任务封装为Worker对象 w = new Worker(firstTask); // 拿到worker对象创建的线程 final Thread t = w.thread; if (t != null) { // 加锁ReentrantLock,确保线程安全 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 获取当前线程池状态 int rs = runStateOf(ctl.get()); // 校验当前线程池状态 // rs < SHUTDOWN 代表当前线程是否为RUNNING运行状态 // 或 // 线程池状态为SHUTDOWN状态 且 firstTask新任务为null(线程池状态SHUTDOWN时,仍会执行workQueue中的任务) if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { // 判断当前线程已经还存活 if (t.isAlive()) throw new IllegalThreadStateException(); // 添加到工作线程集合workers中 // workers是一个HashSet集合,线程不安全,因此上面需要加锁 workers.add(w); int s = workers.size(); // 统计largestPoolSize线程池中的最大线程数量 if (s > largestPoolSize) // 更新最大值 largestPoolSize = s; // worker线程添加成功,标记线程已添加 workerAdded = true; } } finally { // 解锁 mainLock.unlock(); } // 判断线程是否添加成功 if (workerAdded) { // 成功则启动线程 t.start(); // 标记worker线程已启动 workerStarted = true; } } } finally { // 判断线程是否未启动成功 if (!workerStarted) // 在workers集合中移除该worker addWorkerFailed(w); } // 返回worker线程是否启动成功 return workerStarted; }
观察源码发现,addWorker()
方法主要分为两步:
校验线程池状态和工作线程数
- 首先判断当前线程池状态是否非运行状态,并且当前线程池状态不为
SHUTDOWN
或新任务不为空或阻塞队列为空,这两种情况均不需要创建新的工作线程,直接返回false
- 紧接着判断当前工作线程数是否符合要求,是否超过线程池容量或核心/最大线程数(根据参数二
core
判断),超过则直接返回false
- 否则通过
CAS
不断尝试将工作线程数量+1
,成功则跳出循环,执行下一步
- 首先判断当前线程池状态是否非运行状态,并且当前线程池状态不为
创建worker线程对象并启动线程
创建
worker
对象,封装firstTask
任务,采用独占锁将worker
工作线程对象放入工作队列workers
集合,同时更新最大线程数的值,最后启动线程。// 将firstTask当前任务封装为Worker对象 w = new Worker(firstTask)
若线程启动失败则调用
addWorkerFailed()
方法在workers
集合中移除该worker
对象
4、工作线程Worker内部类
在ThreadPoolExecutor
中,线程统一被封装为Worker
内部类,即具体的工作线程类,Worker
继承自AQS
并实现了Runnable
接口:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable { private static final long serialVersionUID = 6138294804551838833L; // 当前的线程实例 final Thread thread; // 当前的任务实例 Runnable firstTask; // 记录线程完成的任务数 volatile long completedTasks; // 构造方法,入参为firstTask任务实例 Worker(Runnable firstTask) { // (重要)设置worker线程状态为-1,表示禁止线程中断 setState(-1); // 赋值成员变量,保存firstTask任务实例 this.firstTask = firstTask; // 通过线程工厂创建新线程(传入当前对象),真正的线程 this.thread = getThreadFactory().newThread(this); } public void run() { // (重要)线程启动时执行的任务 runWorker(this); } // 是否持有独占锁 protected boolean isHeldExclusively() { return getState() != 0; } // 尝试获取独占锁 protected boolean tryAcquire(int unused) { if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); return true; } return false; } // 尝试释放独占锁 protected boolean tryRelease(int unused) { setExclusiveOwnerThread(null); setState(0); return true; } public void lock() {acquire(1);} public boolean tryLock() {return tryAcquire(1);} public void unlock() {release(1);} public boolean isLocked() {return isHeldExclusively();} // 线程启动后,进行线程中断时执行方法 void interruptIfStarted() { Thread t; // worker中状态 >= 0 且 线程不为空 且 当前线程未中断 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { try { // 中断线程 t.interrupt(); } catch (SecurityException ignore) { } } } }
Worker
类的构造方法首先会通过setState(-1)
设置线程状态state
为-1
,当state
为-1
时,代表线程此时不能被中断,这里采用的是**AQS
的独占锁模式**,只有当state
不为-1
(独占锁被释放),线程才被允许中断,具体中断逻辑在interruptIfStarted()
方法。
在创建完worker
工作线程对象后,会启动该工作线程,调用woker
的run()
方法,run()
方法内部又调用了ThreadPoolExecutor
类的runWorker()
方法。
5、启动线程runWorker方法
runWorker()
方法会执行具体的任务,源码如下:
final void runWorker(Worker w) { // 获取当前线程 Thread wt = Thread.currentThread(); // 获取worker构造方法传入的任务实例 Runnable task = w.firstTask; w.firstTask = null; // 解锁,设置worker状态state为0,表示允许线程中断 w.unlock(); // 标识线程退出原因,true代表线程异常退出,false代表线程正常结束 boolean completedAbruptly = true; try { // 判断当前任务是否不为空 或 从阻塞队列获取成功 // getTask()方法从阻塞队列获取一个任务 while (task != null || (task = getTask()) != null) { // 加锁处理并发问题,防止shutdown()时终止正在执行的worker w.lock(); // 判断如果线程池状态 >= STOP 且线程未被中断 if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) // 中断线程 wt.interrupt(); try { // 可自定义实现的扩展点,执行前的扩展方法 beforeExecute(wt, task); Throwable thrown = null; try { // 执行任务,对应任务的run()方法 task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { // 扩展点,执行后的扩展方法 afterExecute(task, thrown); } } finally { // 任务对象置空 task = null; // 完成的工作任务数+1 w.completedTasks++; // 解锁 w.unlock(); } } // 标识线程为正常结束退出 completedAbruptly = false; } finally { // 处理worker线程退出 processWorkerExit(w, completedAbruptly); } }
其内部流程主要如下:
- 首先解锁
worker
,允许线程中断 - 接着通过
while
循环判断当前任务是否为空,若为空则调用getTask()
方法从阻塞队列中获取待处理的任务 - 获取到任务后工作线程直接加锁,同时根据线程池状态判断是否中断当前线程
- 执行对象任务的
run()
方法,在任务执行前后均提供扩展点可自定义扩展 - 重复第
2
步的逻辑,直到阻塞队列中的任务全部执行完毕,最后调用processWorkerExit()
方法处理worker
线程退出
这里我们也发现,线程池中的线程复用实际上就是通过runWorker()
方法中while
循环实现的,worker
工作线程不断的从阻塞队列中获取新的任务,直接调用任务的run()方法,避免去创建新线程,这一过程实现了线程复用。
6、获取阻塞队列getTask方法
getTask()
方法用来获取阻塞队列中的任务,源码如下:
private Runnable getTask() { // 标记上一次从阻塞队列获取任务是否超时 boolean timedOut = false; // 死循环 for (; ; ) { // 获取ctl值 int c = ctl.get(); // 获取当前线程池状态 int rs = runStateOf(c); // 判断是否需获取任务 // 若线程池状态 >= STOP 或 (线程池状态为SHUTDOWN 且 阻塞队列为空),则不需要再处理任务 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { // 工作线程数-1 decrementWorkerCount(); // 直接返回 return null; } // 获取当前工作线程数 int wc = workerCountOf(c); // timed标记用于判断是否需进行超时控制 // allowCoreThreadTimeOut是否运行核心线程超时销毁(默认为false,核心线程不允许超时销毁) // wc > corePoolSize当前工作线程数大于核心线程数 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; // 工作线程数 > maximumPoolSize // 或 // timed && timedOut,timedOut首次进入默认为false if ((wc > maximumPoolSize || (timed && timedOut)) // 工作线程数大于1 // 或 // 阻塞队列为空 && (wc > 1 || workQueue.isEmpty())) { // 采用CAS,工作线程数-1,并且返回null if (compareAndDecrementWorkerCount(c)) return null; continue; } // 执行到这,说明前面线程池状态和工作线程数校验已通过,开始从阻塞队列获取任务 try { // timed为true代表核心线程允许超时销毁 或 当前工作线程数大于核心线程数(存在非核心线程) // timed = true:超时等待获取任务poll(),超时未获取到则返回null // timed = false:阻塞获取任务take(),一直等着直到获取到任务 Runnable r = timed ? workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : workQueue.take(); // 判断是否获取到任务 if (r != null) // 获取到直接返回 return r; // 未获取到,代表等待超时也没有拿到任务,设置timedOut值为true timedOut = true; } catch (InterruptedException retry) { // 即使异常也循环重试 timedOut = false; } } }
大致过程其实就是从workQueue
阻塞队列中获取任务,在获取之前也会校验线程池状态和工作线程数是否合规,最后根据条件选择超时等待获取还是阻塞获取。
7、退出线程processWorkerExit方法
runWorker()
方法执行结束之后在finally
代码块中会调用processWorkerExit()
方法,主要用来销毁空闲线程,具体源码如下:
private void processWorkerExit(Worker w, boolean completedAbruptly) { // completedAbruptly:true代表线程异常退出,false代表线程正常结束 if (completedAbruptly) // 异常退出,工作线程数-1 decrementWorkerCount(); // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 统计当前worker完成的任务数,累加到线程池总完成任务数 completedTaskCount += w.completedTasks; // 从线程池HastSet集合中移除当前工作线程 workers.remove(w); } finally { // 解锁 mainLock.unlock(); } // 判断是否终止线程池 tryTerminate(); // 获取ctl值 int c = ctl.get(); // 当前线程池状态 < STOP,即RUNNING或SHUTDOWN状态时 if (runStateLessThan(c, STOP)) { // 判断线程是否为正常结束 if (!completedAbruptly) { // 正常结束退出 // allowCoreThreadTimeOut是否运行核心线程超时销毁 // 若允许,min最小线程数为0,不允许则为corePoolSize核心线程数 int min = allowCoreThreadTimeOut ? 0 : corePoolSize; // 判断workQueue阻塞队列是否不为空 if (min == 0 && !workQueue.isEmpty()) // 确保至少有一个worker线程 min = 1; //判断工作线程数 >= min最小线程数 if (workerCountOf(c) >= min) // 符合条件直接返回 return; } // 若线程为异常退出或不符合条件(工作线程数 < min最小线程数) // 创建一个非核心线程执行任务 addWorker(null, false); } }
源码中,会将worker工作线程从线程池HastSet集合中移除,最后也会确保当前工作线程数不小于最小线程数,通过processWorkerExit()
方法,控制了线程池中线程数量的大小变化。
8、线程池关闭
线程池关闭有两种方法,分别是shutdown()
和shutdownNow()
(1)shutdown方法
shutdown()
方法首先会将线程池状态设置为SHUTDOWN
,不再接收新的任务。
内部正在执行的任务和阻塞队列中的任务都会继续执行,待均执行完毕之后会关闭线程池。
具体源码如下:
public void shutdown() { // 加锁 final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // 安全检查 checkShutdownAccess(); // 设置线程池状态为SHUTDOWN advanceRunState(SHUTDOWN); // 中断线程池中空闲的线程 interruptIdleWorkers(); onShutdown(); } finally { mainLock.unlock(); } // 判断是否终止线程池 tryTerminate(); }
(2)shutdownNow方法
shutdownNow()
方法则是将线程池状态设置为STOP
,不再接收新的任务。
但所有线程都会被中断(已经拿到任务正在执行的不会被中断),且阻塞队列中的任务会被丢弃,不再执行了,并且返回未执行的任务队列。
public List<Runnable> shutdownNow() { List<Runnable> tasks; final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { checkShutdownAccess(); // 设置线程池状态为STOP advanceRunState(STOP); // 中断线程池中所有线程 interruptWorkers(); // 清空阻塞队列,赋值给tasks返回 tasks = drainQueue(); } finally { mainLock.unlock(); } // 判断是否终止线程池 tryTerminate(); // 返回阻塞队列中未被执行的任务 return tasks; }
四、流程图
五、后记
本文着重带大家分析了ThreadPoolExecutor
线程池处理任务流程的源码实现,过程中额外介绍了线程池的创建方式以及核心参数,详细通过本文,大家更深入的理解了线程池的原理和工作机制。
但在我们实际开发中,使用线程池主要关注线程池大小的设置,合理配置线程池大小能够更高的提升程序性能和稳定性,国内通用的共识即根据CPU密集型和IO密集型来设定线程数,这里面其实是基于任务的阻塞系数来商榷增减线程数,一般阻塞时间占比越高,设置越多线程,CPU计算时间所占越高,设置越少线程。
网上有很多配置线程池大小的公式,但这些公式无法适用于所有业务场景,我们只能根据实际场景,通过压测及性能监控等手段,不断的调整线程池大小,确定出合适的线程数量,实现最高的任务处理效率。
下一篇,小豪将会继续更新Java多线程与并发编程相关内容,创作不易,如果大家觉得内容对你有收获,不妨考虑关注关注小豪~