目录
在前一篇中,我们讲到了单例模式中的饿汉模式和懒汉模式,本篇我们来讲在多线程常用的阻塞队列。
一.阻塞队列(Blocking Queue)
1.什么是阻塞队列
阻塞队列是一种特殊的队列,也有着“先进先出”的性质。
2.特性
阻塞队列是一种线程安全的数据结构,具有以下的特性:
- 当队列满时,继续入队列就会进行阻塞等待,直到其他线程从队列中取出元素
- 当队列为空时,继续出队列也会进行阻塞对待,直到队列中的元素不为空
阻塞在实际开发应用中,一个典型的应用场景就是“生产者消费者模型”。
二.生产者消费者模型
1.什么是生产者消费者模型?
生产者消费者模型就是通过一个容器来解决生产者和消费者之间的强耦合问题。生产者和消费者之间不直接联系,而是通过阻塞队列来进行通讯,所以生产者生产完数据之后不用等待消费者处理,而是直接给队列,消费者就不直接找生产者,而是直接从阻塞队列里取。
2.生产者消费模型的好处
- 解耦合:生产者和消费者之间不再直接联系,而是通过阻塞队列。
- 削峰填谷:阻塞队列就相当于一个缓冲区,平衡了生产者和消费者之间的处理能力。
2.1解耦合
我们在开发中, 有些时候可能需要两个或者多个模块或者组件之间的依赖关系不要太紧密,那我们就需要进行解耦合。
解耦合的好处:
- 易于维护:当一个模块发生改变时,不会影响到其他模块,减少了修改一处bug而导致多重地方出现bug的风险。
- 可扩展性:可以轻松地将新的功能或者模块添加到系统中,无需对系统重构。
- 可重用性:独立的模块可以中不同的场景或者项目中使用,提高了代码的复用率。
- 简化测试:独立的模块更容易进行单元测试,因为它们不需要依赖复杂的外部环境或状态。
示例:假设现有服务器A和服务器B是直接相联的
服务器A和服务器B是交互是直接的,之间的耦合度是非常高的,若服务器A或者B崩溃,会直接导致服务器B或A也崩溃。
对于上述这种情况,我们就可以使用生产者消费者模型来解决,通过一个阻塞队列,来间接地传递和接受信息。
当服务器A想要向服务器B传递信息时,就将信息传递到阻塞队列中,服务器B再从阻塞队列中将信息取出。通过这个方式,服务器之间的耦合度就大大降低,服务器A若崩溃,服务器B也不会崩溃。
若我们想要添加一个服务器C,那么也不需要对其他服务器做任何修改,直接向阻塞队列请求即可。
2.2削峰填谷
在高并发的情况下,系统短时间内可能会面临大量数据请求,导致系统资源耗尽。可以使用阻塞队列来削峰填谷,将请求存储在阻塞队列中,按照系统处理能力逐渐消费信息,从而达到削峰填谷的效果,避免系统崩溃或性能降低。
示例:依旧是以上图为例,假设现有大量请求涌进服务器A中,那么此时服务器A不会直接将请求传递给服务器B,若直接传输,会导致服务器B处理不过来直接崩溃。服务器A会将请求传递到阻塞队列中,当阻塞队列满之后,此时阻塞队列不再会接收服务器A的入队请求,而是要等阻塞队列中不为满时才接收服务器A的请求;服务器B从阻塞队列中接收请求并进行处理,直到队列中的请求被服务器B处理完。
三.如何在java中使用阻塞队列
在java中给我们提供BlockingQueue接口
由于BlockingQueue使一个接口,不能直接去实例化,java中有以下类实现了该接口。
同时我们可以看到,BlockingQueue继承了Queue,所以Queue中offer和poll在BlockingQueue中也有,但我们不使用这两个,需要使用put和take方法才具有阻塞。
这里我们使用第一个类来做演示:
实现一对一的生产者消费者模型。
public class Demo12 { /** * 程序的入口点。 * 创建一个ArrayBlockingQueue,并启动两个线程。一个线程负责生产数字字符串,另一个线程负责消费这些字符串。 * 这个示例展示了生产者-消费者模式的使用,其中ArrayBlockingQueue作为共享资源,用于在生产者和消费者之间同步数据。 * * @param args 命令行参数 * @throws InterruptedException 如果线程在执行过程中被中断 */ public static void main(String[] args) throws InterruptedException { // 创建一个容量为1000的ArrayBlockingQueue,用于生产者和消费者之间的通信 BlockingQueue<String> BQ = new ArrayBlockingQueue<>(1000); // 创建生产者线程,负责向队列中添加数字字符串 Thread t1 = new Thread(() -> { int i = 0; while (true) { try { // 将数字字符串添加到队列中,如果队列满,则阻塞直到有空间可用 BQ.put("" + i); i++; } catch (InterruptedException e) { // 如果线程被中断,抛出运行时异常 throw new RuntimeException(e); } System.out.println("生产了" + i); } }); // 创建消费者线程,负责从队列中取出数字字符串并消费 Thread t2 = new Thread(() -> { while (true) { try { // 从队列中取出一个数字字符串,如果队列为空,则阻塞直到有元素可用 String s = BQ.take(); System.out.println("消费了" + s); // 模拟消费过程,让线程休眠1秒 Thread.sleep(1000); } catch (InterruptedException e) { // 如果线程被中断,抛出运行时异常 throw new RuntimeException(e); } } }); // 启动生产者和消费者线程 t1.start(); t2.start(); } }
四.模拟实现阻塞队列
阻塞队列的容量是有一定大小的,所以我们首先需要实现一个循环队列。
判空判满条件:
当size==0时,说明队列为空
当size==q.length时,说明队列满了。
如何解决当head或者ptail走到队列末尾,但size不等于队列长度的情况?
当head走到末尾之后,若head等于q.length时,让head=0即可。
同理,当ptail等于q.length时,让ptail=0即可。
/** * MyBlockingQueues 类实现了阻塞队列的数据结构。 * 阻塞队列是一种线程安全的队列,当队列为空时,take 方法会阻塞,直到队列有元素可用; * 当队列满时,put 方法会阻塞,直到队列有空闲位置。 */ class MyBlockingQueues { // 存储队列元素的数组 private String[] q; // 指向队列头部的索引 private int head; // 指向队列尾部的索引 private int ptail; // 当前队列中元素的数量 private int size; /** * 构造函数初始化阻塞队列。 * * @param capacity 队列的容量,即队列最多可以存储的元素数量。 */ public MyBlockingQueues(int capacity) { q = new String[capacity]; size = 0; } /** * 将元素添加到队列中。 * 如果队列已满,则此方法会阻塞,直到队列有空闲位置。 * * @param s 要添加到队列的元素。 */ public void put(String s){ // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 if(size==q.length){ return; } q[ptail++]=s; // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if(ptail>=q.length){ ptail=0; } size++; } /** * 从队列中取出一个元素。 * 如果队列为空,则此方法会阻塞,直到队列有元素可用。 * * @return 取出的元素。 */ public String take(){ String ret=""; // 判断队列是否为空,如果为空则返回空字符串,否则继续取出元素 if(size==0){ return ret; } ret=q[head++]; // 如果头部索引超过数组长度,则重置为0,实现循环队列 if(head>=q.length){ head=0; } size--; return ret; } }
1.加锁
当我们实现玩一个循环之后,接下来要实现的就是阻塞队列,阻塞队列,顾名思义就是在队列空/满的时候进行等待,但阻塞队列通常是在多线程下使用的,所以我们需要考虑线程安全问题。
在入队列和出队列的时候,代码块中的代码基本都涉及到了修改操作,因此,我们可以直接对整个代码块进行加锁。
class MyBlockingQueues { // 存储队列元素的数组 private String[] q; // 指向队列头部的索引 private int head; // 指向队列尾部的索引 private int ptail; // 当前队列中元素的数量 private int size; /** * 构造函数初始化阻塞队列。 * * @param capacity 队列的容量,即队列最多可以存储的元素数量。 */ public MyBlockingQueues(int capacity) { q = new String[capacity]; size = 0; } /** * 将元素添加到队列中。 * 如果队列已满,则此方法会阻塞,直到队列有空闲位置。 * * @param s 要添加到队列的元素。 */ public void put(String s){ synchronized(this) { // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 if (size == q.length) { return; } q[ptail++] = s; // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if (ptail >= q.length) { ptail = 0; } size++; } } /** * 从队列中取出一个元素。 * 如果队列为空,则此方法会阻塞,直到队列有元素可用。 * * @return 取出的元素。 */ public String take(){ String ret=""; synchronized (this) { // 判断队列是否为空,如果为空则返回空字符串,否则继续取出元素 if (size == 0) { return ret; } ret = q[head++]; // 如果头部索引超过数组长度,则重置为0,实现循环队列 if (head >= q.length) { head = 0; } size--; } return ret; } }
2.阻塞等待实现
当线程安全问题解决之后,我们现在要考虑的就是如何让队列能够进行阻塞等待,我们需要用到wait和notify方法来实现。
在put方法中,当size==q.length时,说明此时队列中已满,需要进行阻塞等待。若没有满则进行入队操作,当入完队之后就可以通知take进行出队操作。
在take方法中,当size==0时,说明此时队列为空,此时同样需要进行阻塞等待(等put通知),若队列不为空,则进行出队操作,当出完队之后就可以通知put进行入队操作。
/** * MyBlockingQueues 类实现了阻塞队列的数据结构。 * 阻塞队列是一种线程安全的队列,当队列为空时,take 方法会阻塞,直到队列有元素可用; * 当队列满时,put 方法会阻塞,直到队列有空闲位置。 */ class MyBlockingQueues { // 存储队列元素的数组 private String[] q; // 指向队列头部的索引 private int head; // 指向队列尾部的索引 private int ptail; // 当前队列中元素的数量 private int size; /** * 构造函数初始化阻塞队列。 * * @param capacity 队列的容量,即队列最多可以存储的元素数量。 */ public MyBlockingQueues(int capacity) { q = new String[capacity]; size = 0; } /** * 将元素添加到队列中。 * 如果队列已满,则此方法会阻塞,直到队列有空闲位置。 * * @param s 要添加到队列的元素。 */ public void put(String s) throws InterruptedException { synchronized(this) { // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 if (size == q.length) { this.wait(); } q[ptail++] = s; // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if (ptail >= q.length) { ptail = 0; } size++; } } /** * 从队列中取出一个元素。 * 如果队列为空,则此方法会阻塞,直到队列有元素可用。 * * @return 取出的元素。 */ public String take() throws InterruptedException { String ret=""; synchronized (this) { // 判断队列是否为空,如果为空则返回空字符串,否则继续取出元素 if (size == 0) { this.wait(); } ret = q[head++]; // 如果头部索引超过数组长度,则重置为0,实现循环队列 if (head >= q.length) { head = 0; } size--; } return ret; } }
3.解决interrupt唤醒waitting问题
唤醒wait的不仅有notify,而且还有interrupt,能够唤醒阻塞之后中断线程并且抛出InterruptedException。
在put方法中,如果size==q.length,此时调用put的线程进入阻塞等待(waitting)状态,如果此时使用interrupt会唤醒阻塞并将线程终止。
在take方法中,如果size==0,此时调用take的线程就也会进入阻塞等待(waitting)状态,如果此时使用interrupt会唤醒阻塞并将线程终止。
若对于我们上面的代码中,调用interrupt,线程确实会终止。
我们可以查看一下wait方法的文档说明。
我们可以看到java中建议我们使用while循环来检查等待条件。
为什么可以使用if,还建议使用while?
假设在判断队列是否为空/满的时候。调用wait的方法的时候使用try-catch语句来抛异常,但在catch中忘记添加抛出异常的语句,再调用interrupt会发生什么?
同理,在take方法中,当size=0时,此时wait正在等待,若被唤醒,就会进行出队操作,但由于此时队列中没有数据,出队列操作没有意义。
为了防止这些bug出现,所以我们使用while更加稳妥,当wait被interrupt唤醒之后,会再次判断当前size是否满足判断条件,若满足则继续执行wait,而不会直接执行后序的代码。反之,若size不为0或者q.length,则会跳出循环,执行后序代码。
public void put(String s) { synchronized(this) { // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 while (size == q.length) { try { this.wait(); } catch (InterruptedException e) { } } q[ptail++] = s; // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if (ptail >= q.length) { ptail = 0; } size++; } }
修改后的完整代码:
/** * 自定义阻塞队列类,用于实现具有固定容量的线程安全队列。 * 队列满时,生产者线程会被阻塞,直到队列有空位;队列空时,消费者线程会被阻塞,直到队列有元素。 */ class MyBlockingQueues { // 存储队列元素的数组 private String[] q; // 指向队列头部的索引 private int head; // 指向队列尾部的索引 private int ptail; // 当前队列中元素的数量 private int size; /** * 构造函数初始化阻塞队列。 * * @param capacity 队列的容量,即队列最多可以存储的元素数量。 */ public MyBlockingQueues(int capacity) { q = new String[capacity]; size = 0; } /** * 将一个字符串元素添加到队列中。 * 如果队列已满,则当前线程被阻塞,直到队列有空位。 * * @param s 要添加到队列的字符串元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public void put(String s) throws InterruptedException { synchronized(this) { // 当队列满时,等待直到有空位 // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 while (size == q.length) { this.wait(); } q[ptail++] = s; // 当尾部索引达到数组长度时,重置为0 // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if (ptail >= q.length) { ptail = 0; } size++; // 唤醒等待的消费者线程 this.notifyAll(); } } /** * 从队列中取出一个字符串元素。 * 如果队列为空,则当前线程被阻塞,直到队列有元素。 * * @return 从队列中取出的字符串元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public String take() throws InterruptedException { String ret=""; synchronized (this) { // 当队列空时,等待直到有元素 // 判断队列是否为空,如果为空则返回空字符串,否则继续取出元素 while (size == 0) { this.wait(); } ret = q[head++]; // 当头部索引达到数组长度时,重置为0 // 如果头部索引超过数组长度,则重置为0,实现循环队列 if (head >= q.length) { head = 0; } size--; // 唤醒等待的生产者线程 this.notifyAll(); } return ret; } }
4.处理因指令重排序而导致的线程安全问题
在编译器和处理器层面,为了提高性能,可能会对指令进行重排序。由于阻塞队列常用于多线程中,且在put和take要对字段进行频繁的读写操作,为了防止因指令重排序而导致的问题,这里我们需要使用volatie来修饰成员变量,来防止编译器的优化。
/** * 自定义阻塞队列类,用于实现具有固定容量的线程安全队列。 * 队列满时,生产者线程会被阻塞,直到队列有空位;队列空时,消费者线程会被阻塞,直到队列有元素。 */ class MyBlockingQueues { // 存储队列元素的数组 private volatile String[] q; // 指向队列头部的索引 private volatile int head; // 指向队列尾部的索引 private volatile int ptail; // 当前队列中元素的数量 private volatile int size; /** * 构造函数初始化阻塞队列。 * * @param capacity 队列的容量,即队列最多可以存储的元素数量。 */ public MyBlockingQueues(int capacity) { q = new String[capacity]; size = 0; } /** * 将一个字符串元素添加到队列中。 * 如果队列已满,则当前线程被阻塞,直到队列有空位。 * * @param s 要添加到队列的字符串元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public void put(String s) throws InterruptedException { synchronized(this) { // 当队列满时,等待直到有空位 // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 while (size == q.length) { this.wait(); } q[ptail++] = s; // 当尾部索引达到数组长度时,重置为0 // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if (ptail >= q.length) { ptail = 0; } size++; // 唤醒等待的消费者线程 this.notifyAll(); } } /** * 从队列中取出一个字符串元素。 * 如果队列为空,则当前线程被阻塞,直到队列有元素。 * * @return 从队列中取出的字符串元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public String take() throws InterruptedException { String ret=""; synchronized (this) { // 当队列空时,等待直到有元素 // 判断队列是否为空,如果为空则返回空字符串,否则继续取出元素 while (size == 0) { this.wait(); } ret = q[head++]; // 当头部索引达到数组长度时,重置为0 // 如果头部索引超过数组长度,则重置为0,实现循环队列 if (head >= q.length) { head = 0; } size--; // 唤醒等待的生产者线程 this.notifyAll(); } return ret; } }
测试
这里我们设置阻塞队列长度为100.
先让生产者生成100个产品再进行消费。
/** * 自定义阻塞队列类,用于实现具有固定容量的线程安全队列。 * 队列满时,生产者线程会被阻塞,直到队列有空位;队列空时,消费者线程会被阻塞,直到队列有元素。 */ class MyBlockingQueues { // 存储队列元素的数组 private volatile String[] q; // 指向队列头部的索引 private volatile int head; // 指向队列尾部的索引 private volatile int ptail; // 当前队列中元素的数量 private volatile int size; /** * 构造函数初始化阻塞队列。 * * @param capacity 队列的容量,即队列最多可以存储的元素数量。 */ public MyBlockingQueues(int capacity) { q = new String[capacity]; size = 0; } /** * 将一个字符串元素添加到队列中。 * 如果队列已满,则当前线程被阻塞,直到队列有空位。 * * @param s 要添加到队列的字符串元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public void put(String s) throws InterruptedException { synchronized(this) { // 当队列满时,等待直到有空位 // 判断队列是否已满,如果已满则返回,否则继续添加元素 //判断队列是否满了 while (size == q.length) { this.wait(); } q[ptail++] = s; // 当尾部索引达到数组长度时,重置为0 // 如果尾部索引超过数组长度,则重置为0,实现循环队列 if (ptail >= q.length) { ptail = 0; } size++; // 唤醒等待的消费者线程 this.notifyAll(); } } /** * 从队列中取出一个字符串元素。 * 如果队列为空,则当前线程被阻塞,直到队列有元素。 * * @return 从队列中取出的字符串元素。 * @throws InterruptedException 如果线程在等待时被中断。 */ public String take() throws InterruptedException { String ret=""; synchronized (this) { // 当队列空时,等待直到有元素 // 判断队列是否为空,如果为空则返回空字符串,否则继续取出元素 while (size == 0) { this.wait(); } ret = q[head++]; // 当头部索引达到数组长度时,重置为0 // 如果头部索引超过数组长度,则重置为0,实现循环队列 if (head >= q.length) { head = 0; } size--; // 唤醒等待的生产者线程 this.notifyAll(); } return ret; } } class Demos{ /** * 程序的入口点。 * 创建一个容量为100的MyBlockingQueues实例,用于生产者和消费者之间的通信。 * 分别启动两个线程,一个负责生产物品,另一个负责消费物品。 * @param args 命令行参数 */ public static void main(String[] args) { // 创建一个容量为100的阻塞队列 MyBlockingQueues BQ=new MyBlockingQueues(100); // 创建生产者线程,负责向队列中添加物品 Thread t1=new Thread(()->{ int i=0; while(true){ try { // 将物品添加到队列中,并打印生产信息 BQ.put(""+i); System.out.println("生产者生产了"+i); i++; } catch (InterruptedException e) { // 打印异常信息 e.printStackTrace(); } } }); // 创建消费者线程,负责从队列中取出物品 Thread t2=new Thread(()->{ while(true){ try { // 模拟消费者等待一段时间 Thread.sleep(1000); // 从队列中取出物品,并打印消费信息 System.out.println("消费者消费了"+BQ.take()); } catch (InterruptedException e) { // 打印异常信息 e.printStackTrace(); } } }); // 启动生产者和消费者线程 t1.start(); t2.start(); } }
阻塞队列就先到这里了~
若有不足,欢迎指正~