阅读量:0
当你在处理消息时,可能会遇到这样的问题:消息的生产速度远远大于消费速度,导致消息堆积。这时候,Work Queues(工作队列)模型就能派上用场。简单来说,Work Queues 让多个消费者绑定到一个队列,共同消费队列中的消息,从而加快消息处理速度。
1. 场景模拟
我们来模拟一个这样的场景。首先,在控制台创建一个名为 work.queue
的队列。
2. 消息发送
我们通过循环发送大量消息来模拟消息堆积的现象。在 publisher
服务中的 SpringAmqpTest
类中添加一个测试方法:
@Test public void testWorkQueue() throws InterruptedException { // 队列名称 String queueName = "simple.queue"; // 消息 String message = "hello, message_"; for (int i = 0; i < 50; i++) { // 发送消息,每20毫秒发送一次,相当于每秒发送50条消息 rabbitTemplate.convertAndSend(queueName, message + i); Thread.sleep(20); } }
3. 消息接收
为了模拟多个消费者绑定同一个队列,我们在 consumer
服务的 SpringRabbitListener
中添加两个新的方法:
@RabbitListener(queues = "work.queue") public void listenWorkQueue1(String msg) throws InterruptedException { System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(20); } @RabbitListener(queues = "work.queue") public void listenWorkQueue2(String msg) throws InterruptedException { System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now()); Thread.sleep(200); }
注意到这两个消费者都设置了 Thread.sleep
来模拟任务耗时:
- 消费者1:
Thread.sleep(20)
,相当于每秒处理50个消息。 - 消费者2:
Thread.sleep(200)
,相当于每秒处理5个消息。
4. 测试
启动 ConsumerApplication
后,执行 publisher
服务中编写的发送测试方法 testWorkQueue
。结果如下:
消费者1接收到消息:【hello, message_0】21:06:00.869555300 消费者2........接收到消息:【hello, message_1】21:06:00.884518 ... 消费者1接收到消息:【hello, message_48】21:06:01.920702500 消费者2........接收到消息:【hello, message_49】21:06:05.723106700
可以看到,消费者1和消费者2各自消费了25条消息:
- 消费者1快速完成了任务。
- 消费者2则缓慢处理任务。
消息是平均分配给每个消费者的,并没有考虑到各个消费者的处理能力,导致一个消费者空闲,另一个忙碌。这显然是低效的。
5. 能者多劳
在 spring
中,可以通过简单配置解决这个问题。修改 consumer
服务的 application.yml
文件,添加如下配置:
spring: rabbitmq: listener: simple: prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
再次测试,结果如下:
消费者1接收到消息:【hello, message_0】21:12:51.659664200 消费者2........接收到消息:【hello, message_1】21:12:51.680610 ... 消费者2........接收到消息:【hello, message_49】21:12:52.746299900
这次,消费者1处理了更多的消息,消费者2则处理了较少的消息,总耗时在1秒左右,大大提升了效率。这充分利用了每一个消费者的处理能力,有效避免了消息积压问题。
6. 总结
Work Queues 模型的使用要点:
- 多个消费者绑定到一个队列,同一条消息只会被一个消费者处理。
- 通过设置
prefetch
来控制消费者预取的消息数量。
这样可以更高效地利用资源,提高消息处理速度。