线程池工具类 Executors源代码详解

avatar
作者
猴君
阅读量:0


快捷导航

一、提供了什么功能?

源码中的定义:

该类为ExecutorExecutorServiceScheduledExecutorServiceThreadFactoryCallable类定义了工厂和工具方法。

此类支持以下几种方法:

  • 创建并返回一个配置了常用设置的ExecutorService的方法。
  • 创建并返回一个配置了常用设置的ScheduledExecutorService的方法。
  • 创建并返回一个“包装”的ExecutorService的方法,该方法通过使实现特定的方法不可访问来禁用重新配置。
  • 创建并返回一个将新创建线程设置为已知状态的ThreadFactory的方法。
  • 创建并返回一个Callable的方法,该方法基于其他闭包形式,以便可以在需要Callable的执行方法中使用。

说了这么多的名词,那么这些名词之间的关系是什么?请看下图:
​​​​在这里插入图片描述

二、源码中是怎么实现的?

我们先来看看Executors类的结构:

在这里插入图片描述

1、创建并返回一个配置了常用设置的ExecutorService

newFixedThreadPool()

    /**      * 创建一个线程池,该线程池重用固定数量的线程,并使用共享的无界队列进行操作。      * 在任何时候,最多将有 nThreads 个线程处于活动状态以处理任务。      * 如果在所有线程都处于活动状态时提交了额外的任务,这些任务将等待在队列中,直到有线程可用。      * 如果在执行过程中任何线程因故障而终止,在关闭之前,如果需要执行后续任务,将会有一个新线程替代它。      * 线程池中的线程将一直存在,直到显式关闭。       *      * 参数:       * nThreads – 线程池中的线程数量       *      * 抛出:       * IllegalArgumentException – 如果 nThreads <= 0      */     public static ExecutorService newFixedThreadPool(int nThreads) {         return new ThreadPoolExecutor(nThreads, nThreads,                                       0L, TimeUnit.MILLISECONDS,                                       new LinkedBlockingQueue<Runnable>());     }  	/**      * 使用指定线程工厂创建新线程的重载方法      *      * 参数:      * nThreads – 线程池中的线程数量      * threadFactory – 创建新线程时使用的工厂      *      * 抛出:      * NullPointerException – 如果threadFactory为null      * IllegalArgumentException – 如果nThreads <= 0      */     public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {         return new ThreadPoolExecutor(nThreads, nThreads,                                       0L, TimeUnit.MILLISECONDS,                                       new LinkedBlockingQueue<Runnable>(),                                       threadFactory);     } 

newSingleThreadExecutor()

	/**      * 从代码上可以看出,该方法和newFixedThreadPool()大同小异,只有两点不同:      * 1. 线程池核心线程数不同,newFixedThreadPool的核心线程数是方法入参nThreads,而本方法的核心线程数是1。      * 2. 本方法在new ThreadPoolExecutor()之外加了一层封装new FinalizableDelegatedExecutorService()。      *    只提供了本类对接口ExecutorService实现的方法的访问接口,目的是防止ThreadPoolExecutor实例在某些情况下	对线程池配置的修改,      *    例如:使用setCorePoolSize()重新设置线程池核心线程数      */     public static ExecutorService newSingleThreadExecutor() {         return new FinalizableDelegatedExecutorService             (new ThreadPoolExecutor(1, 1,                                     0L, TimeUnit.MILLISECONDS,                                     new LinkedBlockingQueue<Runnable>()));     }     // 使用指定线程工厂创建新线程的重载方法     public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {         return new FinalizableDelegatedExecutorService             (new ThreadPoolExecutor(1, 1,                                     0L, TimeUnit.MILLISECONDS,                                     new LinkedBlockingQueue<Runnable>(),                                     threadFactory));     } 

newCachedThreadPool()

	/**      * 创建一个线程池,该线程池根据需要创建新线程,但会在可用时重用之前构造的线程。      * 这些线程池通常会提高执行许多短暂异步任务的程序的性能。      * 调用execute时,如果有可用的线程,将重用之前构造的线程。      * 如果没有现有线程可用,将创建一个新线程并添加到池中。      * 未使用超过六十秒的线程将被终止并从缓存中移除。因此,长时间保持空闲的池不会消耗任何资源。      * 请注意,可以使用ThreadPoolExecutor构造函数创建具有类似属性但不同细节(例如超时参数)的池。      *      */     public static ExecutorService newCachedThreadPool() {         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                       60L, TimeUnit.SECONDS,                                       new SynchronousQueue<Runnable>());     }     // 使用指定线程工厂创建新线程的重载方法     public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {         return new ThreadPoolExecutor(0, Integer.MAX_VALUE,                                       60L, TimeUnit.SECONDS,                                       new SynchronousQueue<Runnable>(),                                       threadFactory);     }     

newWorkStealingPool()

    /**      * 创建一个线程池,该线程池维护足够的线程以支持给定的并行级别,并可能使用多个队列来减少竞争。      * 并行级别对应于主动参与或可用于任务处理的最大线程数。      * 实际线程数量可能会动态增长和缩小。      * 工作窃取池不保证提交任务的执行顺序。       *      * 参数:       * parallelism – 目标并行级别       *      * 抛出:       * IllegalArgumentException – 如果 parallelism <= 0       */     public static ExecutorService newWorkStealingPool(int parallelism) {         return new ForkJoinPool             (parallelism,              ForkJoinPool.defaultForkJoinWorkerThreadFactory,              null, true);     }          // 创建一个工作窃取线程池,使用所有可用处理器作为其目标并行级别。     public static ExecutorService newWorkStealingPool() {         return new ForkJoinPool             (Runtime.getRuntime().availableProcessors(),              ForkJoinPool.defaultForkJoinWorkerThreadFactory,              null, true);     } 

2、创建并返回一个ScheduledExecutorService

newScheduledThreadPool()

	/**      * 创建一个可以在给定延迟后运行或定期执行的线程池。       *      * 参数:       * corePoolSize – 即使线程处于空闲状态,也要保持在池中的线程数量       *      * 抛出:       * IllegalArgumentException – 如果 corePoolSize < 0      */     public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {         return new ScheduledThreadPoolExecutor(corePoolSize);     }     // 使用指定线程工厂创建新线程的重载方法     public static ScheduledExecutorService newScheduledThreadPool(             int corePoolSize, ThreadFactory threadFactory) {         return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);     } 

newSingleThreadScheduledExecutor()

	// 返回一个类似newScheduledThreadPool(1)的单线程执行器,在任何情况下线程数量都不可更改     public static ScheduledExecutorService newSingleThreadScheduledExecutor() {         return new DelegatedScheduledExecutorService             (new ScheduledThreadPoolExecutor(1));     } 	// 使用指定线程工厂创建新线程的重载方法     public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {         return new DelegatedScheduledExecutorService             (new ScheduledThreadPoolExecutor(1, threadFactory));     } 

3、创建并返回一个禁用重新配置的ExecutorService

unconfigurableExecutorService()

	// 和newSingleThreadExecutor()、newSingleThreadScheduledExecutor()类似的套路 	// 添加一个包装类,屏蔽返回的线程池对象对配置的修改     public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {         if (executor == null)             throw new NullPointerException();         return new DelegatedExecutorService(executor);     } 

unconfigurableScheduledExecutorService()

	// 和unconfigurableExecutorService同样的套路     public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {         if (executor == null)             throw new NullPointerException();         return new DelegatedScheduledExecutorService(executor);     } 

4、创建并返回一个ThreadFactory

defaultThreadFactory()

    public static ThreadFactory defaultThreadFactory() {         return new DefaultThreadFactory();     }          // 静态内部类DefaultThreadFactory     static class DefaultThreadFactory implements ThreadFactory {         private static final AtomicInteger poolNumber = new AtomicInteger(1);         private final ThreadGroup group;         private final AtomicInteger threadNumber = new AtomicInteger(1);         private final String namePrefix;          DefaultThreadFactory() {             SecurityManager s = System.getSecurityManager();             group = (s != null) ? s.getThreadGroup() :                                   Thread.currentThread().getThreadGroup();             namePrefix = "pool-" +                           poolNumber.getAndIncrement() +                          "-thread-";         }          public Thread newThread(Runnable r) {             Thread t = new Thread(group, r,                                   namePrefix + threadNumber.getAndIncrement(),                                   0);             if (t.isDaemon())                 t.setDaemon(false);             if (t.getPriority() != Thread.NORM_PRIORITY)                 t.setPriority(Thread.NORM_PRIORITY);             return t;         }     }  

privilegedThreadFactory()

         public static ThreadFactory privilegedThreadFactory() {         return new PrivilegedThreadFactory();     }          /**      * Thread factory capturing access control context and class loader      */     static class PrivilegedThreadFactory extends DefaultThreadFactory {         private final AccessControlContext acc;         private final ClassLoader ccl;          PrivilegedThreadFactory() {             super();             SecurityManager sm = System.getSecurityManager();             if (sm != null) {                 // Calls to getContextClassLoader from this class                 // never trigger a security check, but we check                 // whether our callers have this permission anyways.                 sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION);                  // Fail fast                 sm.checkPermission(new RuntimePermission("setContextClassLoader"));             }             this.acc = AccessController.getContext();             this.ccl = Thread.currentThread().getContextClassLoader();         }          public Thread newThread(final Runnable r) {             return super.newThread(new Runnable() {                 public void run() {                     AccessController.doPrivileged(new PrivilegedAction<Void>() {                         public Void run() {                             Thread.currentThread().setContextClassLoader(ccl);                             r.run();                             return null;                         }                     }, acc);                 }             });         }     } 

5、创建并返回一个Callable

callable()

     public static <T> Callable<T> callable(Runnable task, T result) {         if (task == null)             throw new NullPointerException();         return new RunnableAdapter<T>(task, result);     }      public static Callable<Object> callable(Runnable task) {         if (task == null)             throw new NullPointerException();         return new RunnableAdapter<Object>(task, null);     }      public static Callable<Object> callable(final PrivilegedAction<?> action) {         if (action == null)             throw new NullPointerException();         return new Callable<Object>() {             public Object call() { return action.run(); }};     }      public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {         if (action == null)             throw new NullPointerException();         return new Callable<Object>() {             public Object call() throws Exception { return action.run(); }};     }      public static <T> Callable<T> privilegedCallable(Callable<T> callable) {         if (callable == null)             throw new NullPointerException();         return new PrivilegedCallable<T>(callable);     }      public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {         if (callable == null)             throw new NullPointerException();         return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);     }  

三、总结:

  1. 通过对以上源码的分析,可以发现虽然Executors工具类提供了很多快速创建线程池的方法,但归根结底还是对ThreadPoolExecutor、ScheduledThreadPoolExecutor的封装。我们可以很容易的根据自己的项目情况去自定义,这样的创建的线程池才是最符合业务场景的。
  2. 在Executors提供的快速创建线程池的方法中,若有贴合使用业务场景的,可以直接使用,这样可以提高开发效率,避免重复造轮子。
  3. Executors提供了从Runable到Callable的转换,这可能在需要线程提供返回值的时候是有用的。

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!