阅读量:0
快捷导航
- 一、提供了什么功能?
- 二、源码中是怎么实现的?
- 三、总结:
一、提供了什么功能?
源码中的定义:
该类为
Executor
、ExecutorService
、ScheduledExecutorService
、ThreadFactory
和Callable
类定义了工厂和工具方法。
此类支持以下几种方法:
- 创建并返回一个配置了常用设置的
ExecutorService
的方法。 - 创建并返回一个配置了常用设置的
ScheduledExecutorService
的方法。 - 创建并返回一个“包装”的
ExecutorService
的方法,该方法通过使实现特定的方法不可访问来禁用重新配置。 - 创建并返回一个将新创建线程设置为已知状态的
ThreadFactory
的方法。 - 创建并返回一个
Callable
的方法,该方法基于其他闭包形式,以便可以在需要Callable
的执行方法中使用。
说了这么多的名词,那么这些名词之间的关系是什么?请看下图:
二、源码中是怎么实现的?
我们先来看看Executors
类的结构:
1、创建并返回一个配置了常用设置的ExecutorService
newFixedThreadPool()
/** * 创建一个线程池,该线程池重用固定数量的线程,并使用共享的无界队列进行操作。 * 在任何时候,最多将有 nThreads 个线程处于活动状态以处理任务。 * 如果在所有线程都处于活动状态时提交了额外的任务,这些任务将等待在队列中,直到有线程可用。 * 如果在执行过程中任何线程因故障而终止,在关闭之前,如果需要执行后续任务,将会有一个新线程替代它。 * 线程池中的线程将一直存在,直到显式关闭。 * * 参数: * nThreads – 线程池中的线程数量 * * 抛出: * IllegalArgumentException – 如果 nThreads <= 0 */ public static ExecutorService newFixedThreadPool(int nThreads) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>()); } /** * 使用指定线程工厂创建新线程的重载方法 * * 参数: * nThreads – 线程池中的线程数量 * threadFactory – 创建新线程时使用的工厂 * * 抛出: * NullPointerException – 如果threadFactory为null * IllegalArgumentException – 如果nThreads <= 0 */ public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) { return new ThreadPoolExecutor(nThreads, nThreads, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory); }
newSingleThreadExecutor()
/** * 从代码上可以看出,该方法和newFixedThreadPool()大同小异,只有两点不同: * 1. 线程池核心线程数不同,newFixedThreadPool的核心线程数是方法入参nThreads,而本方法的核心线程数是1。 * 2. 本方法在new ThreadPoolExecutor()之外加了一层封装new FinalizableDelegatedExecutorService()。 * 只提供了本类对接口ExecutorService实现的方法的访问接口,目的是防止ThreadPoolExecutor实例在某些情况下 对线程池配置的修改, * 例如:使用setCorePoolSize()重新设置线程池核心线程数 */ public static ExecutorService newSingleThreadExecutor() { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>())); } // 使用指定线程工厂创建新线程的重载方法 public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) { return new FinalizableDelegatedExecutorService (new ThreadPoolExecutor(1, 1, 0L, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<Runnable>(), threadFactory)); }
newCachedThreadPool()
/** * 创建一个线程池,该线程池根据需要创建新线程,但会在可用时重用之前构造的线程。 * 这些线程池通常会提高执行许多短暂异步任务的程序的性能。 * 调用execute时,如果有可用的线程,将重用之前构造的线程。 * 如果没有现有线程可用,将创建一个新线程并添加到池中。 * 未使用超过六十秒的线程将被终止并从缓存中移除。因此,长时间保持空闲的池不会消耗任何资源。 * 请注意,可以使用ThreadPoolExecutor构造函数创建具有类似属性但不同细节(例如超时参数)的池。 * */ public static ExecutorService newCachedThreadPool() { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>()); } // 使用指定线程工厂创建新线程的重载方法 public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) { return new ThreadPoolExecutor(0, Integer.MAX_VALUE, 60L, TimeUnit.SECONDS, new SynchronousQueue<Runnable>(), threadFactory); }
newWorkStealingPool()
/** * 创建一个线程池,该线程池维护足够的线程以支持给定的并行级别,并可能使用多个队列来减少竞争。 * 并行级别对应于主动参与或可用于任务处理的最大线程数。 * 实际线程数量可能会动态增长和缩小。 * 工作窃取池不保证提交任务的执行顺序。 * * 参数: * parallelism – 目标并行级别 * * 抛出: * IllegalArgumentException – 如果 parallelism <= 0 */ public static ExecutorService newWorkStealingPool(int parallelism) { return new ForkJoinPool (parallelism, ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); } // 创建一个工作窃取线程池,使用所有可用处理器作为其目标并行级别。 public static ExecutorService newWorkStealingPool() { return new ForkJoinPool (Runtime.getRuntime().availableProcessors(), ForkJoinPool.defaultForkJoinWorkerThreadFactory, null, true); }
2、创建并返回一个ScheduledExecutorService
newScheduledThreadPool()
/** * 创建一个可以在给定延迟后运行或定期执行的线程池。 * * 参数: * corePoolSize – 即使线程处于空闲状态,也要保持在池中的线程数量 * * 抛出: * IllegalArgumentException – 如果 corePoolSize < 0 */ public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) { return new ScheduledThreadPoolExecutor(corePoolSize); } // 使用指定线程工厂创建新线程的重载方法 public static ScheduledExecutorService newScheduledThreadPool( int corePoolSize, ThreadFactory threadFactory) { return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory); }
newSingleThreadScheduledExecutor()
// 返回一个类似newScheduledThreadPool(1)的单线程执行器,在任何情况下线程数量都不可更改 public static ScheduledExecutorService newSingleThreadScheduledExecutor() { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1)); } // 使用指定线程工厂创建新线程的重载方法 public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) { return new DelegatedScheduledExecutorService (new ScheduledThreadPoolExecutor(1, threadFactory)); }
3、创建并返回一个禁用重新配置的ExecutorService
unconfigurableExecutorService()
// 和newSingleThreadExecutor()、newSingleThreadScheduledExecutor()类似的套路 // 添加一个包装类,屏蔽返回的线程池对象对配置的修改 public static ExecutorService unconfigurableExecutorService(ExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedExecutorService(executor); }
unconfigurableScheduledExecutorService()
// 和unconfigurableExecutorService同样的套路 public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) { if (executor == null) throw new NullPointerException(); return new DelegatedScheduledExecutorService(executor); }
4、创建并返回一个ThreadFactory
defaultThreadFactory()
public static ThreadFactory defaultThreadFactory() { return new DefaultThreadFactory(); } // 静态内部类DefaultThreadFactory static class DefaultThreadFactory implements ThreadFactory { private static final AtomicInteger poolNumber = new AtomicInteger(1); private final ThreadGroup group; private final AtomicInteger threadNumber = new AtomicInteger(1); private final String namePrefix; DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; } public Thread newThread(Runnable r) { Thread t = new Thread(group, r, namePrefix + threadNumber.getAndIncrement(), 0); if (t.isDaemon()) t.setDaemon(false); if (t.getPriority() != Thread.NORM_PRIORITY) t.setPriority(Thread.NORM_PRIORITY); return t; } }
privilegedThreadFactory()
public static ThreadFactory privilegedThreadFactory() { return new PrivilegedThreadFactory(); } /** * Thread factory capturing access control context and class loader */ static class PrivilegedThreadFactory extends DefaultThreadFactory { private final AccessControlContext acc; private final ClassLoader ccl; PrivilegedThreadFactory() { super(); SecurityManager sm = System.getSecurityManager(); if (sm != null) { // Calls to getContextClassLoader from this class // never trigger a security check, but we check // whether our callers have this permission anyways. sm.checkPermission(SecurityConstants.GET_CLASSLOADER_PERMISSION); // Fail fast sm.checkPermission(new RuntimePermission("setContextClassLoader")); } this.acc = AccessController.getContext(); this.ccl = Thread.currentThread().getContextClassLoader(); } public Thread newThread(final Runnable r) { return super.newThread(new Runnable() { public void run() { AccessController.doPrivileged(new PrivilegedAction<Void>() { public Void run() { Thread.currentThread().setContextClassLoader(ccl); r.run(); return null; } }, acc); } }); } }
5、创建并返回一个Callable
callable()
public static <T> Callable<T> callable(Runnable task, T result) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<T>(task, result); } public static Callable<Object> callable(Runnable task) { if (task == null) throw new NullPointerException(); return new RunnableAdapter<Object>(task, null); } public static Callable<Object> callable(final PrivilegedAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() { return action.run(); }}; } public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) { if (action == null) throw new NullPointerException(); return new Callable<Object>() { public Object call() throws Exception { return action.run(); }}; } public static <T> Callable<T> privilegedCallable(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallable<T>(callable); } public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) { if (callable == null) throw new NullPointerException(); return new PrivilegedCallableUsingCurrentClassLoader<T>(callable); }
三、总结:
- 通过对以上源码的分析,可以发现虽然Executors工具类提供了很多快速创建线程池的方法,但归根结底还是对ThreadPoolExecutor、ScheduledThreadPoolExecutor的封装。我们可以很容易的根据自己的项目情况去自定义,这样的创建的线程池才是最符合业务场景的。
- 在Executors提供的快速创建线程池的方法中,若有贴合使用业务场景的,可以直接使用,这样可以提高开发效率,避免重复造轮子。
- Executors提供了从Runable到Callable的转换,这可能在需要线程提供返回值的时候是有用的。