集成框架-RabbitMQ重试和确认
前言:
关于消息重试
在 Spring Boot
中,你可以使用 yml
格式的配置文件来配置 RabbitMQ
的重试机制。
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: retry: enabled: true initial-interval: 5000 # 初始重试间隔时间(毫秒) max-attempts: 3 # 最大重试次数 max-interval: 10000 # 最大重试间隔时间(毫秒) multiplier: 2.0 # 重试间隔时间倍数
在这个配置中,启用了 RabbitMQ 的重试机制
,并指定了重试的初始间隔时间
、最大重试次数
、最大重试间隔时间和重试间隔时间的倍数
。
关于消息确认
当消费者处理消息时,如果处理成功,可以使用确认机制告知RabbitMQ
已经成功消费了该消息。如果处理失败,则消息会被重新放回队列,等待重试。下面是一个演示确认机制的示例:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class MessageConsumer { @RabbitListener(queues = "retry_queue") public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { System.out.println("Received message: " + message); // 在这里进行消息处理 // 如果处理成功,手动确认消息 channel.basicAck(tag, false); } catch (Exception e) { // 如果处理失败,可以选择手动拒绝消息并重新放回队列,或者进行其他处理 System.out.println("Failed to process message: " + e.getMessage()); // 手动拒绝消息并重新放回队列 channel.basicReject(tag, true); } } }
正文
消息重试
当消费者报错时,RabbitMQ
将会使用 yml 配置
文件中设置的重试策略对消息进行重试。
添加 spring-boot-starter-amqp
依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
amqp
是什么在rabbitmq
中有什么作用呢,amqp是异步消息传输的协议 它定义了消息的传输格式、消息交换模式、队列管理方式等
下面是 AMQP
在 RabbitMQ
中的一些网络文章罗列的作用:
消息格式定义: 定义了消息的格式,包括消息的头部、属性、内容等。消息的格式化使得消息在不同的应用程序之间可以被正确地解析和处理。
消息交换模式: 定义了消息的交换模式,包括直接交换、扇出交换、主题交换等。这些交换模式使得消息可以被路由到不同的队列,以满足不同的业务需求。
队列管理: 定义了队列的管理方式,包括队列的创建、删除、绑定、解绑等操作。这些操作使得队列可以被动态地管理和调整,以适应不同的应用场景。
消息确认机制: 提供了消息确认机制,包括自动确认和手动确认两种模式。消息确认机制可以确保消息被正确地接收和处理,从而提高了消息传输的可靠性和稳定性。
事务支持: 支持事务,可以保证一组消息的原子性操作。事务机制可以确保消息的原子性,从而保证消息的一致性和可靠性。
然后,可以在配置文件中配置RabbitMQ
连接信息和重试策略:
spring: rabbitmq: host: localhost port: 5672 username: guest password: guest listener: simple: retry: enabled: true initial-interval: 5000 # 初始重试间隔时间(毫秒) max-attempts: 3 # 最大重试次数 max-interval: 10000 # 最大重试间隔时间(毫秒) multiplier: 2.0 # 重试间隔时间倍数
接下来,编写一个生产者,用于发送消息到 RabbitMQ
:
import org.springframework.amqp.core.AmqpTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer { @Autowired private AmqpTemplate amqpTemplate; public void produceMessage(String message) { amqpTemplate.convertAndSend("retry_exchange", "retry_key", message); } }
然后,编写一个消费者,用于消费消息并故意引发异常:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageConsumer { @RabbitListener(queues = "retry_queue") public void handleMessage(String message) throws Exception { System.out.println("Received message: " + message); // 模拟处理消息时发生异常 throw new RuntimeException("Simulated exception occurred"); } }
最后,在你的应用程序入口,创建一个简单的控制器,用于触发消息的发送:
import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RestController; @SpringBootApplication @RestController public class Application { @Autowired private MessageProducer messageProducer; public static void main(String[] args) { SpringApplication.run(Application.class, args); } @GetMapping("/send") public String sendMessage() { messageProducer.produceMessage("Hello RabbitMQ!"); return "Message sent to RabbitMQ"; } }
现在,当启动应用程序并访问 /send
路径时,生产者将发送一条消息到 RabbitMQ
,然后消费者将会接收到这条消息,并故意引发异常。Rabbitmq
将根据配置文件中的重试策略对消息进行重试,直到达到最大重试次数为止。
消息重试总结
以上就是消息重试的概念,关于如果不是一个服务,比如生产者配置3次指的是生产者发送失败重试的次数,消费者配置3次指的是消费者重试的次数,只要配置了,如果消费报错就会重试,生产报错也会重试,如果生产消费在一起,就是消费报错重试。
消息确认
当消费者处理消息时,如果处理成功,可以使用确认机制告知 RabbitMQ
已经成功消费了该消息。如果处理失败,则消息会被重新放回队列,等待重试。下面是一个确认机制的示例:
首先,确保你的消费者方法使用了@RabbitListener
注解,并且使用Channel
对象进行手动确认。这样,你就可以在消费者成功处理消息时手动确认消息:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; import com.rabbitmq.client.Channel; @Component public class MessageConsumer { @RabbitListener(queues = "retry_queue") public void handleMessage(String message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception { try { System.out.println("Received message: " + message); // 在这里进行消息处理 // 如果处理成功,手动确认消息 channel.basicAck(tag, false); } catch (Exception e) { // 如果处理失败,可以选择手动拒绝消息并重新放回队列,或者进行其他处理 System.out.println("Failed to process message: " + e.getMessage()); // 手动拒绝消息并重新放回队列 channel.basicReject(tag, true); } } }
这里使用 channel.basicAck(tag, false)
方法手动确认消息。如果消费者成功处理消息,则调用这个方法告知 RabbitMQ
已经成功消费了该消息。如果处理失败,则可以选择手动拒绝消息并重新放回队列,或者进行其他处理。如果你不用channel.basicAck(tag, false)
和没有引入@Header(AmqpHeaders.DELIVERY_TAG) long tag
默认的话是自动确认。
所以确保在消费者方法的参数列表中包含 Channel
对象和消息的DELIVERY_TAG
。这样,Spring AMQP
就会将 Channel
对象注入到消费者方法中,以便你可以使用它来手动确认消息。
这样,当消费者处理消息时,你就可以使用确认机制告知RabbitMQ
消息的处理结果。如果处理成功,则消息被消费,否则消息会被重新放回队列等待重试。
在 RabbitMQ
中,生产者并不会直接接收到消费者的确认请求。确认请求是由消费者向 RabbitMQ
服务器发送的,用于告知RabbitMQ
消息的处理结果。生产者可以选择监听确认事件,以便在消息被确认后执行相应的操作。
在 Spring Boot
中,可以使用 RabbitTemplate
的 ConfirmCallback
接口来监听确认事件。以下是一个生产者简单的示例:
import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer { @Autowired private RabbitTemplate rabbitTemplate; public void produceMessage(String message) { CorrelationData correlationData = new CorrelationData("unique-id"); // 设置消息的唯一标识符 rabbitTemplate.convertAndSend("retry_exchange", "retry_key", message, correlationData); // 设置 ConfirmCallback,用于监听消息的确认结果 rabbitTemplate.setConfirmCallback((correlation, ack, cause) -> { if (ack) { // 如果消息被确认 System.out.println("Message with correlation id " + correlation.getId() + " confirmed"); } else { // 如果消息被拒绝或超时 System.out.println("Message with correlation id " + correlation.getId() + " rejected: " + cause); } }); } }
在这个示例中,通过CorrelationData
设置了消息的唯一标识符,并将其作为参数传递给 convertAndSend
方法。然后,我们设置了 ConfirmCallback
,用于监听消息的确认结果。当消息被确认时,ConfirmCallback
的ack
参数将会为 true
,表示消息被成功确认;当消息被拒绝或超时时,ack
参数将会为 false
,同时 cause
参数将会包含拒绝的原因。
通过监听 ConfirmCallback
,生产者可以在消息被确认时执行相应的操作,例如记录日志、更新状态等。
确认机制和重试机制的概念
在 RabbitMQ
中,消息的确认机制和重试机制是两个不同的概念,它们可以结合使用,但并不互相排斥
。确认机制是用来告知 RabbitMQ
消息的处理结果,而重试机制则是在消息处理失败时将消息重新放回队列,等待后续的重试。
如果消息被消费者确认了(即消费者成功处理了消息),RabbitMQ
将会将该消息从队列中删除,不会再进行重试。这意味着即使消息在处理过程中出现了错误,但只要消费者成功确认了该消息,它就不会再次被放回队列进行重试。
然而,如果消费者拒绝了消息(即调用了 channel.basicReject(tag, true)
方法),或者处理消息时发生了超时等问题,RabbitMQ
将会将消息重新放回队列,等待后续的重试。这时,重试机制会起作用,根据配置的重试策略对消息进行重试,直到达到最大重试次数为止。
因此,确认机制和重试机制是可以结合使用的。确认机制用于告知 RabbitMQ
消息的处理结果,而重试机制则用于处理处理失败的消息,确保消息能够被成功处理。
如果按上文配置重试3次
,那么生产者方法,会在第一次报错他被拒绝了,但是任然还会在队列里面重试3次
才会结束,这就涉及到消息积压。这要注意