文章目录
对 RabbitMQ 不是很了解的同学,可以看一下我的另一篇博文:RabbitMQ快速入门(MQ的概念、安装RabbitMQ、在 SpringBoot 项目中集成 RabbitMQ )
1. 消息丢失的情况
消息丢失的情况主要有以下三种:
- 生产者向消息代理传递消息的过程中,消息丢失了
- 消息代理( RabbitMQ )把消息弄丢了
- 消费者把消息弄丢了
那怎么保证消息的可靠性呢,我们可以从消息丢失的情况入手——从生产者、消息代理( RabbitMQ )、消费者三个方面来保证消息的可靠性
2. 生产者的可靠性
2.1 生产者重连
由于网络问题,可能会出现客户端连接 RabbitMQ 失败的情况,我们可以通过配置开启连接 RabbitMQ 失败后的重连机制
application.yml(将 host 更改为部署 RabbitMQ 的服务器的地址)
spring: rabbitmq: host: 127.0.0.1 port: 5672 virtual-host: /blog username: CaiXuKun password: T1rhFXMGXIOYCoyi connection-timeout: 1s # 连接超时时间 template: retry: enabled: true # 开启连接超时重试机制 initial-interval: 1000ms # 连接失败后的初始等待时间 multiplier: 1 # 连接失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier max-attempts: 3 # 最大重试次数
填写完配置信息后,我们手动停止 RabbitMQ ,模拟生产者连接 RabbitMQ 失败的情况
sudo docker stop rabbitmq
启动测试类
@Test void testSendMessageToQueue() { String queueName = "simple.queue"; String msg = "Hello, SpringAMQP!"; rabbitTemplate.convertAndSend(queueName, msg); }
可以在控制台看到,总共有三次重新连接 RabbitMQ 的记录,三次连接都失败后,就直接抛异常了
注意事项:
- 当网络不稳定的时候,利用重试机制可以有效提高消息发送的成功率,但 SpringAMOP 提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,线程会被阻塞,影响业务性能
- 如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长(比如 200 ms)和重试次数,也
可以考虑使用异步线程来执行发送消息的代码
2.2 生产者确认
RabbitMQ 提供了 Publisher Confirm
和 Publisher Return
两种确认机制。开启确机制认后,如果 MQ 成功收到消息后,会返回确认消息给生产者,返回的结果有以下几种情况:
- 消息投递到了 MQ,但是路由失败,此时会通过 PublisherReturn 机制返回路由异常的原因,然后返回 ACK,告知生产者消息投递成功
- 临时消息投递到了 MQ,并且入队成功,返回 ACK,告知生产者消息投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回 ACK,告知生产者消息投递成功
- 其它情况都会返回 NACK,告知生产者消息投递失败
2.3 生产者确认机制的代码实现
在 publisher 服务中编写与生产者确认机制有关的配置信息( application.yml 文件)
spring: rabbitmq: publisher-returns: true publisher-confirm-type: correlated
publisher-confirm-type 有三种模式:
- none:关闭 confirm 机制
- simple:以同步阻塞等待的方式返回 MQ 的回执消息
- correlated:以异步回调方式的方式返回 MQ 的回执消息
每个 RabbitTemplate 只能配置一个 ReturnCallback
在 publisher 模块新增一个名为 RabbitMQConfig
的配置类,并让该类实现 ApplicationContextAware
接口
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.BeansException; import org.springframework.context.ApplicationContext; import org.springframework.context.ApplicationContextAware; import org.springframework.context.annotation.Configuration; @Configuration public class RabbitMQConfig implements ApplicationContextAware { @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { RabbitTemplate rabbitTemplate = applicationContext.getBean(RabbitTemplate.class); // 配置回调 rabbitTemplate.setReturnsCallback((returnedMessage) -> { System.out.println("收到消息的return callback, " + "exchange = " + returnedMessage.getExchange() + ", " + "routingKey = " + returnedMessage.getRoutingKey() + ", " + "replyCode = " + returnedMessage.getReplyCode() + ", " + "replyText = " + returnedMessage.getReplyText() + ", " + "message = " + returnedMessage.getMessage()); }); } }
测试前先运行 RabbitMQ
sudo docker start rabbitmq
在 publisher 模块添加一个测试类,测试 ReturnCallback 的效果
@Test void testConfirmCallback() throws InterruptedException { CorrelationData correlationData = new CorrelationData(); correlationData.getFuture().whenCompleteAsync((confirm, throwable) -> { if (confirm.isAck()) { // 消息发送成功 System.out.println("消息发送成功,收到ack"); } else { // 消息发送失败 System.err.println("消息发送失败,收到nack,原因是" + confirm.getReason()); } if (throwable != null) { // 消息回调失败 System.err.println("消息回调失败"); } }); rabbitTemplate.convertAndSend("blog.direct", "red", "Hello, confirm callback", correlationData); // 测试方法执行结束后程序就结束了,所以这里需要阻塞线程,否则程序看不到回调结果 Thread.sleep(2000); }
发送成功后可以看到消息发送成功的回调信息
如果交换机不存在会怎么样呢,我们故意使用一个不存在的交换机,观察控制台的输出结果
如果 routingKey 不存在会怎么样呢,我们故意使用一个不存在的 routingKey ,观察控制台的输出结果
可以看到,confirmCallback 和 ReturnCallback 都返回了回调信息(deliveryTag
为 0
表示消息无法路由到队列)
2.4 如何看待和处理生产者的确认信息
- 生产者确认需要额外的网络开销和系统资源开销,尽量不要使用
- 如果一定要使用,无需开启 Publisher-Return 机制,因为路由失败一般是业务出了问题
- 对于返回 nack 的消息,可以尝试重新投递,如果依然失败,则记录异常消息
3. 消息代理(RabbitMQ)的可靠性
在默认情况下,RabbitMQ 会将接收到的信息保存在内存中以降低消息收发的延迟,这样会导致两个问题:
- 一旦 RabbitMQ 宕机,内存中的消息会丢失
- 内存空间是有限的,当消费者处理过慢或者消费者出现故障或时,会导致消息积压,引发 MQ 阻塞( Paged Out 现象)
怎么理解 MQ 阻塞呢,当队列的空间被消息占满了之后,RabbitMQ 会先把老旧的信息存到磁盘,为新消息腾出空间,在这个过程中,整个 MQ 是被阻塞的,也就是说,在 MQ 完成这一系列工作之前,无法处理已有的消息和接收新的消息
我们来测试一下消息丢失的情况,在 RabbitMQ 的控制台中向 simple.queue 队列发送一条信息,发送后重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况
测试前,记得先把监听 simple.queue 队列的代码注释掉
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String message) { System.out.println("消费者收到了simple.queue的消息:【" + message + "】"); }
第一步:先发送一条消息
第二步:查看消息的情况
第三步:重启 RabbitMQ ,模拟 RabbitMQ 宕机后重启的情况
sudo docker restart rabbitmq
第四步:查看消息的情况(可以看到,RabbitMQ 重启后,消息丢失了)
3.1 数据持久化
RabbitMQ 实现数据持久化包括 3 个方面:
- 交换机持久化
- 队列持久化
- 消息持久化
注意事项:
- 利用 SpringAMQP 创建的交换机、队列、消息,默认都是持久化的
- 在 RabbitMQ 控制台创建的交换机、队列默认是持久化的,而消息默认是存在内存中( 3.12 版本之前默认存放在内存,3.12 版本及之后默认先存放在磁盘,消费者处理消息时才会将消息取出来放到内存中)
我们来演示一下 RabbitMQ 发生 Paged Out 现象(也就是队列的空间被消息占满了之后,将老旧消息移到磁盘,为新消息腾出空间的情况)
我们编写一个测试类,向 simple.queue 一次性发送一百万条消息
在发送消息之前,先把生产者确认机制关闭,提高消息发送的速度
spring: rabbitmq: publisher-returns: false publisher-confirm-type: none
先测试发送非持久化信息
@Test void testPagedOut() { Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT) .build(); for (int i = 0; i < 1; i++) { rabbitTemplate.convertAndSend("simple.queue", message); } }
测试结果
再测试发送持久化信息
@Test void testPagedOut() { Message message = MessageBuilder.withBody("Hello, paged out".getBytes(StandardCharsets.UTF_8)) .setDeliveryMode(MessageDeliveryMode.PERSISTENT) .build(); for (int i = 0; i < 1; i++) { rabbitTemplate.convertAndSend("simple.queue", message); } }
3.2 LazyQueue( 3.12 版本后所有队列都是 Lazy Queue 模式)
从 RabbitMQ 的 3.6.0
版本开始,增加了 Lazy Queue 的概念,也就是惰性队列,惰性队列的特征如下:
- 接收到消息后直接存入磁盘而非内存(内存中只保留最近的消息,默认 2048条 )
- 消费者要处理消息时才会从磁盘中读取并加载到内存
- 支持数百万条的消息存储,在 3.12 版本后,所有队列都是 Lazy Queue 模式,无法更改
开启持久化和生产者确认时,RabbitMQ 只有在消息持久化完成后才会给生产者返回 ACK 回执
在 RabbitMQ 的控制台可以看到 RabbitMQ 的版本
在 RabbitMQ 控制台中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可
x-queue-mode
在 Java 代码中,要创建一个惰性队列,只需要在声明队列时,指定 x-queue-mode 属性为 lazy 即可
编程式创建
@Bean public org.springframework.amqp.core.Queue lazeQueue() { return QueueBuilder.durable("lazy.queue1") .lazy() .build(); }
注解式创建
@RabbitListener(queuesToDeclare = @org.springframework.amqp.rabbit.annotation.Queue( name = "lazy.queue2", durable = "true", arguments = @Argument( name = "x-queue-mode", value = "lazy" ) )) public void listenLazeQueue(String message) { System.out.println("消费者收到了 laze.queue2的消息: " + message); }
4. 消费者的可靠性
4.1 消费者确认机制
为了确认消费者是否成功处理消息,RabbitMQ 提供了消费者确认机制(Consumer Acknowledgement)。处理消息后,消费者应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 消息的处理状态,回执有三种可选值:
- ack:成功处理消息,RabbitMQ 从队列中删除该消息
- nack:消息处理失败,RabbitMQ 需要再次投递消息
- reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
SpringAMQP 已经实现了消息确认功能,并允许我们通过配置文件选择 ACK 的处理方式,有三种方式:
- none:不处理,即消息投递给消费者后立刻 ack,消息会会立刻从 MQ 中删除,非常不安全,不建议使用
- manual:手动模式。需要自己在业务代码中调用 api,发送 ack 或 reject ,存在业务入侵,但更灵活
- auto:自动模式,SpringAMQP 利用 AOP 对我们的消息处理逻辑做了环绕增强,当业务正常执行时则自动返回 ack,当业务出现异常时,会根据异常的类型返回不同结果:
- 如果是业务异常,会自动返回 nack
- 如果是消息处理或校验异常,自动返回 reject
开启消息确认机制,需要在 application.yml
文件中编写相关的配置
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: none
先测试处理模式为 none 的情况,向 simple.queue 队列发送一条消息,同时监听 simple.queue 队列的消息,监听到队列中的消息后手动抛出一个异常
publisher 服务
@Test void testSendMessageToQueue() { String queueName = "simple.queue"; String msg = "Hello, SpringAMQP!"; rabbitTemplate.convertAndSend(queueName, msg); }
consumer 服务
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String message) { System.out.println("消费者收到了simple.queue的消息:【" + message + "】"); throw new RuntimeException("故意抛出异常"); }
不出意外,程序报错了
但在 RabbitMQ 的控制台可以看到,消息也丢失了
再测试处理模式为 none 的情况
可以看到,控制台一直在报错,报错之后一直在尝试重新发送消息
在 RabbitMQ 的控制台可以看到,simple.queue 一直在收发消息,速率达到了 97 次每秒(状态为 running ,消息的状态为 Unacked )
此时,我们手动关闭 consumer 服务,查看 RabbitMQ 的控制台,可以看到消息恢复到正常的状态了
再来测试异常类型为 MessageConversionException 的情况
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String message) { System.out.println("消费者收到了simple.queue的消息:【" + message + "】"); throw new MessageConversionException("故意抛出异常"); }
在控制台可以看到,消息被拒绝了,而且消息也没有重新发送
查看 RabbitMQ 的控制台,可以发现消息已经从队列中移除了
4.2 失败重试机制
当消费者出现异常后,消息会不断重新入队,重新发送给消费者,然后再次发生异常,再次 requeue(重新入队),陷入 无限循环,给 RabbitMQ 带来不必要的压力
我们可以利用 Spring 提供的 retry 机制,在消费者出现异常时利用本地重试,而不是无限制地重新入队
在 application.yml 配置文件中开启失败重试机制
spring: rabbitmq: listener: simple: prefetch: 1 acknowledge-mode: auto retry: enabled: true # 开启消息消费失败重试机制 initial-interval: 1000ms # 消息消费失败后的初始等待时间 multiplier: 1 # 消息消费失败后的等待时长倍数,下次等待时长 = (initial-interval) * multiplier max-attempts: 3 # 最大重试次数 stateless: true # true表示无状态,false表示有状态,如果业务中包含事务,需要设置为false
我们将抛出的异常类型改回 RuntimeException
@RabbitListener(queues = "simple.queue") public void listenSimpleQueue(String message) { System.out.println("消费者收到了simple.queue的消息:【" + message + "】"); throw new RuntimeException("故意抛出异常"); }
在控制台可以看出,消息的重新发送次数已经耗尽了
查看 RabbitMQ 的控制台,发现消息也丢失了
正常情况下,消息丢失都不是我们想看到的,该怎么解决这个问题呢
4.3 失败消息的处理策略
开启重试模式后,如果重试次数耗尽后消息依然处理失败,则需要由 MessageRecoverer 接口来处理, MessageRecoverer 有三个实现类:
RejectAndDontRequeueRecoverer
:重试次数耗尽后,直接 reject,丢弃消息,默认就是这种方式ImmediateRequeueMessageRecoverer
:重试次数耗尽后,返回 nack,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
我们来演示一下使用 RepublishMessageRecoverer 类的情况
第一步:定义一个名为 blog.error 的交换机、一个名为 error.queue 的队列,并将队列和交换机进行绑定
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.DirectExchange; import org.springframework.amqp.core.Queue; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.amqp.rabbit.retry.MessageRecoverer; import org.springframework.amqp.rabbit.retry.RepublishMessageRecoverer; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; @Configuration @ConditionalOnProperty(prefix = "spring.rabbitmq.listener.simple.retry", name = "enabled", havingValue = "true") public class ErrorConfiguration { @Bean public DirectExchange errorExchange() { return new DirectExchange("error.direct", true, false); } @Bean public Queue errorQueue() { return new Queue("error.queue", true, false, false); } @Bean public Binding errorBinding(Queue errorQueue, DirectExchange errorExchange) { return BindingBuilder.bind(errorQueue).to(errorExchange).with("error"); } }
第二步:将失败处理策略改为 RepublishMessageRecoverer (开起了消费者重试机制才会生效)
@Bean public MessageRecoverer messageRecoverer(RabbitTemplate rabbitTemplate) { return new RepublishMessageRecoverer(rabbitTemplate, "error.direct", "error"); }
在控制台中可以看到,消息的重试次数耗尽后,消息被放入了 error.queue 队列
在 RabbitMQ 的控制塔也可以看到, error.direct 交换机 和 error.queue 队列成功创建,消息也成功放入了 error.queue 队列
总结:消费者如何保证消息一定被消费?
- 开启消费者确认机制为 auto ,由 Spring 帮我们确认,消息处理成功后返回 ack,异常时返回 nack
- 开启消费者失败重试机制,并设置
MessageRecoverer
,多次重试失败后将消息投递到异常交换机,交由人工处理
4.4 业务幂等性
幂等是一个数学概念,用函数表达式来描述是这样的:f(x) = f(f(x)),绝对值函数具有幂等性
在程序开发中,幂等是指同一个业务,执行一次或多次对业务状态的影响是一致的
那么有什么方法能够确保业务的幂等性呢
4.4.1 方案一:为每条消息设置一个唯一的 id
给每个消息都设置一个唯一的 id,利用 id 区分是否是重复消息:
- 为每条消息都生成一个唯一的 id,与消息一起投递给消费者
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息 id 保存到数据库
- 如果消费者下次又收到相同消息,先去数据库查询该消息对应的 id 是否存在,如果存在则为重复消息,放弃处理
可以在指定 MessageConverter 的具体类型时,同时为 MessageConverter 设置自动创建一个 messageId
@Bean public MessageConverter jacksonMessageConvertor() { Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); jackson2JsonMessageConverter.setCreateMessageIds(true); return jackson2JsonMessageConverter; }
发送消息后,在 RabbitMQ 的控制台可以看到,消息的 properties 属性附带了 messageId 信息
但这种方式对业务有一定的侵入性
4.4.2 方案二:结合业务判断
结合业务逻辑,基于业务本身做判断。以支付业务为例:我们要在支付后修改订单状态为已支付,应该在修改订单状态前先查询订单状态,判断状态是否是未支付,只有未支付订单才需要修改,其它状态的订单不做处理
总结:如何保证支付服务与交易服务之间的订单状态一致性?
- 首先,支付服务会正在用户支付成功以后利用 MQ 发送消息通知交易服务,完成订单状态同步
- 其次,为了保证 MQ 消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递和处理的可靠性,同时也开启了MQ的持久化,避免因服务宕机导致消息丢失
- 最后,我们还在交易服务更新订单状态时做了业务幕等判断,避免因消息重复消费导致订单状态异常
4.5 兜底的解决方案
如果交易服务消息处理失败,支付服务和交易服务出现了数据不一致的情况,有没有什么兜底的解决方案?
我们可以在交易服务设置定时任务,定期查询订单支付状态,这样即便 MQ 通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性
5. 延迟消息
5.1 什么是延迟消息
延迟消息:生产者发送消息时指定一个时间,消费者不会立刻收到消息,而是在指定时间之后才会收到消息
延迟任务:一定时间之后才会执行的任务
5.2 死信交换机
当一个队列中的某条消息满足下列情况之一时,就会成为死信(dead letter):
- 消费者使用 basic.reject 或 basic.nack 声明消费失败,并且消息的 requeue 参数设置为 false
- 过期消息(达到了队列或消息本身设置的过期时间),消息超时后无人消费
- 要投递的队列消息堆积满了,最早的消息可能成为死信
如果队列通过 dead-letter-exchange 属性指定了一个交换机,那么该队列中的死信就会投递到这个交换机中,这个交换机称为死信交换机(Dead Letter Exchange,简称DLX)
利用死信交换机的特点,可以实现发送延迟消息的功能
5.3 延迟消息插件(推荐使用)
5.3.1 下载并安装延迟插件
RabbitMQ 的官方推出了一个插件,原生支持延迟消息功能。该插件的原理是设计了一种支持延迟消息功能的交换机,当消息投递到交换机后,可以将消息暂存一段时间,时间到了之后再将消息投递到队列中
插件的下载地址:rabbitmq-delayed-message-exchange
下载完插件后,运行以下指令,在输出信息中找到 Mounts ,再找到 RabbitMQ 的插件的安装目录
sudo docker inspect rabbitmq
然后进入 RabbitMQ 的插件的安装目录,将刚才下载的插件上传到该目录下
一般与 docker 相关的目录只有 root 用户才有权限访问,所以我们需要先打开 docker 目录的部分权限(耗时可能较长)
sudo chmod +rx -R /var/lib/docker
接着打开/var/lib/docker/volumes/rabbitmq-plugins/_data
目录的写权限(如果修改权限不生效,请切换到 root 用户执行指令)
sudo chmod 777 /var/lib/docker/volumes/rabbitmq-plugins/_data
将刚才下载的插件上传到/var/lib/docker/volumes/rabbitmq-plugins/_data
目录
上传成功后将/var/lib/docker/volumes/rabbitmq-plugins/_data
目录的权限复原
sudo chmod 755 /var/lib/docker/volumes/rabbitmq-plugins/_data
最后进入容器内部,运行指令安装插件,安装完成后退出容器内部
sudo docker exec -it rabbitmq bash
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
exit
看到以下信息,说明插件安装成功了
5.3.2 安装插件时可能遇到的问题
如果你遇到了以下错误,在执行rabbitmq-plugins enable rabbitmq_delayed_message_exchange
指令前先执行以下指令
chmod 400 /var/lib/rabbitmq/.erlang.cookie
5.3.3 在 Java 代码中发送延迟消息
声明延迟交换机
@Bean public DirectExchange delayExchange() { return ExchangeBuilder.directExchange("delay.direct").delayed().build(); }
声明队列和延迟交换机,并将队列和延迟交换机绑定在一起
@RabbitListener(bindings = @QueueBinding( value = @Queue(name = "delay.queue"), exchange = @Exchange(name = "delay.direct", delayed = "true", type = ExchangeTypes.DIRECT), key = "delay" )) public void listenDelayQueue(String message) { SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS"); System.out.println("消费者收到了 delay.queue的消息: " + message + ",时间:" + simpleDateFormat.format(System.currentTimeMillis())); }
编写测试方法,测试发送延迟消息
@Test void testSendDelayMessage() { rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(10000); // 毫秒 return message; } }); SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS"); System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis())); }
发送延迟消息的本质是在消息头属性中添加 x-delay 属性
5.3.4 延迟消息的原理和缺点
RabbitMQ 的延迟消息是怎么实现的呢?RabbitMQ 会自动维护一个时钟,这个时钟每隔一秒就跳动一次,如果对时钟的精度要求比较高的,可能还要精确到毫秒,甚至纳秒
RabbitMQ 会为发送到交换机的每一条延迟消息创建一个时钟,时钟运行的过程中需要 CPU 不断地进行计算。发送到交换机的延迟消息数越多,RabbitMQ 需要维护的时钟就越多,对 CPU 的占用率就越高(Spring 提供的定时任务的原理也是类似)
定时任务属于 CPU 密集型任务,中间涉及到的计算过程对 CPU 来说压力是很大的,所以说,采用延迟消息会给服务器的 CPU 带来更大的压力。当交换机中有非常多的延迟消息时,对 CPU 的压力就会特别大
所以说,延迟消息适用于延迟时间较短的场景
5.4 取消超时订单
设置 30 分钟后检测订单支付状态实现起来非常简单,但是存在两个问题:
- 如果并发较高,30分钟可能堆积消息过多,对 MQ 压力很大
- 大多数订单在下单后 1 分钟内就会支付,但消息需要在 MQ 中等待30分钟,浪费资源
5.5 发送延迟检测订单的消息
我们定义一个实体类,用于记录延迟消息的内容和延迟消息的延迟时间列表(该实体类也是延迟消息的类型)
import java.util.ArrayList; import java.util.Arrays; import java.util.List; public class MultipleDelayMessage<T> { private T data; private List<Long> delayMillis; public MultipleDelayMessage() { } public MultipleDelayMessage(T data, Long... delayMillis) { this.data = data; this.delayMillis = new ArrayList<>(Arrays.asList(delayMillis)); } public MultipleDelayMessage(T data, List<Long> delayMillis) { this.data = data; this.delayMillis = delayMillis; } public static <T> MultipleDelayMessage<T> of(T data, Long... delayMillis) { return new MultipleDelayMessage<>(data, new ArrayList<>(Arrays.asList(delayMillis))); } public static <T> MultipleDelayMessage<T> of(T data, List<Long> delayMillis) { return new MultipleDelayMessage<>(data, delayMillis); } public boolean hasNextDelay() { return !delayMillis.isEmpty(); } public Long removeNextDelay() { return delayMillis.remove(0); } public T getData() { return data; } public void setData(T data) { this.data = data; } public List<Long> getDelayMillis() { return delayMillis; } public void setDelayMillis(List<Long> delayMillis) { this.delayMillis = delayMillis; } @Override public String toString() { return "MultipleDelayMessage{" + "data=" + data + ", delayMillis=" + delayMillis + '}'; } }
我们再定义一个发送延迟消息的消息处理器,供所有服务使用
import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; public class DelayMessagePostProcessor implements MessagePostProcessor { private final Integer delay; public DelayMessagePostProcessor(Integer delay) { this.delay = delay; } @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setDelay(delay); return message; } }
改造后的发送延迟消息的测试方法
@Test void testSendDelayMessage() { rabbitTemplate.convertAndSend("delay.direct", "delay", "Hello, DelayQueue!", new DelayMessagePostProcessor(10000)); SimpleDateFormat simpleDateFormat = new SimpleDateFormat(); simpleDateFormat.applyPattern("yyyy-MM-dd HH:mm:ss SSS"); System.out.println("发送消息成功!发送时间:" + simpleDateFormat.format(System.currentTimeMillis())); }