阅读量:0
import java.util.LinkedList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class DdqThreadPool { private static final class DdqThread extends Thread { private Runnable runnable; private final ReentrantLock reentrantLock = new ReentrantLock(); private final Condition condition = reentrantLock.newCondition(); public void setRunnable(Runnable runnable) { try { reentrantLock.lock(); this.runnable = runnable; condition.signalAll(); } catch (Exception exception) { throw new RuntimeException(exception); } finally { reentrantLock.unlock(); } } @Override public void run() { while (true) { try { reentrantLock.lock(); while (runnable == null) condition.await(); runnable.run(); runnable = null; } catch (Exception exception) { throw new RuntimeException(exception); } finally { reentrantLock.unlock(); } } } } private List<DdqThread> threads; private AtomicInteger threadSize; private DdqThreadPool(List<DdqThread> ddqThreads) { this.threads = ddqThreads; this.threadSize = new AtomicInteger(ddqThreads.size()); } public static DdqThreadPool newDdqThreadPool(int maxThreadSize) { if (maxThreadSize < 1) throw new IllegalArgumentException("线程数量必须大于0"); List<DdqThread> ddqThreads = new LinkedList<>(); for (int idx = 0; idx < maxThreadSize; idx++) { DdqThread ddqThread = new DdqThread(); ddqThread.start(); ddqThreads.add(ddqThread); } return new DdqThreadPool(ddqThreads); } public void execute(Runnable runnable) { if (runnable == null) throw new IllegalArgumentException("runnable不能为null"); int oldThreadSize1 = threadSize.get(); if (oldThreadSize1 < 1) return; if (!threadSize.compareAndSet(oldThreadSize1, oldThreadSize1 - 1)) return; DdqThread idleThread = threads.remove(0); idleThread.setRunnable(runnable); //以下三段代码可能还存在bug,因为idleThread.setRunnable(runnable)后,由于是异步执行,可能会存在着线程没执行完,但是忙碌线程又被添加回了可用线程集合 int oldThreadSize2; while (!threadSize.compareAndSet((oldThreadSize2 = threadSize.get()), oldThreadSize2 + 1)) ; threads.add(idleThread); } }