阅读量:0
在 Java 中,ConcurrentLinkedQueue
、ArrayBlockingQueue
、LinkedBlockingQueue
等并发队列通常用于生产者-消费者模式。这些队列使用锁和条件(Condition
)来实现线程间的通信。
以下是基于 ArrayBlockingQueue
或 LinkedBlockingQueue
的生产者-消费者模式中,消费者如何知道队列中有元素可用的基本原理:
锁(Lock): 队列内部维护了一个锁,通常是
ReentrantLock
。这个锁用于同步对队列的访问,确保在任何时刻只有一个线程可以修改队列。条件(Condition): 锁关联了两个条件,通常称为
notEmpty
和notFull
。notEmpty
条件用于通知消费者队列非空,而notFull
条件用于通知生产者队列未满。消费者等待: 当队列为空时,消费者线程会调用条件
notEmpty
的await()
方法。这将导致消费者线程释放锁并等待,直到另一个线程(生产者)在队列中插入一个元素并调用signal()
或signalAll()
方法来唤醒等待的消费者线程。生产者通知: 当生产者向队列中添加一个元素时,它会调用
notEmpty
条件的signal()
方法来唤醒一个(或所有)等待的消费者线程。
以下是一个简化的例子:
import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notEmpty = lock.newCondition(); final Condition notFull = lock.newCondition(); final Object[] items = new Object[100]; // 假设缓冲区大小为100 int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) // 如果队列已满,则等待 notFull.await(); items[putptr] = x; // 在这里插入元素 if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); // 通知消费者队列非空 } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) // 如果队列为空,则等待 notEmpty.await(); Object x = items[takeptr]; // 在这里取出元素 if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); // 通知生产者队列未满 return x; } finally { lock.unlock(); } } }
在这个例子中,消费者在 notEmpty
条件上等待,而生产者在 notFull
条件上等待。当队列状态改变时(例如,生产者添加了一个元素或消费者取出一个元素),相应的条件会被信号唤醒,这样等待的线程就可以重新获取锁并继续执行。