项目实战--SpringBoot整合RabbitMQ:实现邮件大批量异步推送
一、背景
由于项目前期使用SpringBoot 整合 mail 实现各类邮件的自动推送服务,但是这个服务越来越不稳定,出现网络异常的时候,会导致邮件推送失败造成堆积,同时业务需要大批量的同步推送邮件,可靠性也不高。故采用RabbitMQ 消息队列来实现邮件 100% 被投递,内容涵盖 RabbitMQ 很多知识点:
生产者和消费者模型 消息发送确认机制 消费确认机制 消息的重新投递 消费幂等性 ......
RabbitMQ 处理流程图:
二、实现方案
2.1 环境:
springboot版本:2.1.5.RELEASE
RabbitMQ版本:3.6.5
SendMailUtil:发送邮件工具类
ProduceServiceImpl:生产者,发送消息
ConsumerMailService:消费者,消费消息,发送邮件
2.2 方案
1.搭建一台 Linux 服务器,并安装 RabbitMQ
2.开放 QQ 邮箱或者其它邮箱授权码,用于发送邮件
3.创建邮件发送项目并编写代码
4.发送邮件测试
5.消息发送失败处理
三、前期工作
获取邮箱授权码的目的,主要是为了通过代码进行发送邮件,例如 QQ 邮箱授权码获取方式,如下图:
点击开启按钮,然后发送短信,即可获取授权码,该授权码就是配置文件spring.mail.password需要的密码:
四、具体实现
4.1 创建spring boot项目
名为smail的 Springboot 项目,pom文件中加入amqp和mail:
<dependencies> <!--spring boot核心--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter</artifactId> </dependency> <!--spring boot 测试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!--springmvc web--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!--开发环境调试--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-devtools</artifactId> <optional>true</optional> </dependency> <!--mail 支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-mail</artifactId> </dependency> <!--amqp 支持--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- commons-lang3 --> <dependency> <groupId>org.apache.commons</groupId> <artifactId>commons-lang3</artifactId> <version>3.4</version> </dependency> <!--lombok--> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> <version>1.16.10</version> </dependency> </dependencies>
4.2 配置rabbitMQ、mail
在application.properties文件中,配置amqp和mail,其中,spring.mail.password前面获取的授权码,同时username和from要一致:
#rabbitmq spring.rabbitmq.host=192.168.0.103 spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest # 开启confirms回调 P -> Exchange spring.rabbitmq.publisher-confirms=true # 开启returnedMessage回调 Exchange -> Queue spring.rabbitmq.publisher-returns=true # 设置手动确认(ack) Queue -> C spring.rabbitmq.listener.simple.acknowledge-mode=manual spring.rabbitmq.listener.simple.prefetch=100 # mail spring.mail.default-encoding=UTF-8 spring.mail.host=smtp.qq.com spring.mail.username=xxxx@qq.com spring.mail.password=获取的邮箱授权码 spring.mail.from=xxxx@qq.com spring.mail.properties.mail.smtp.auth=true spring.mail.properties.mail.smtp.starttls.enable=true spring.mail.properties.mail.smtp.starttls.required=true
4.3 RabbitConfig配置类
@Configuration @Slf4j public class RabbitConfig { // 发送邮件 public static final String MAIL_QUEUE_NAME = "mail.queue"; public static final String MAIL_EXCHANGE_NAME = "mail.exchange"; public static final String MAIL_ROUTING_KEY_NAME = "mail.routing.key"; @Autowired private CachingConnectionFactory connectionFactory; @Bean public RabbitTemplate rabbitTemplate() { RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); rabbitTemplate.setMessageConverter(converter()); // 消息是否成功发送到Exchange rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息成功发送到Exchange"); } else { log.info("消息发送到Exchange失败, {}, cause: {}", correlationData, cause); } }); // 触发setReturnCallback回调必须设置mandatory=true, 否则Exchange没有找到Queue就会丢弃掉消息, 而不会触发回调 rabbitTemplate.setMandatory(true); // 消息是否从Exchange路由到Queue, 注意: 这是一个失败回调, 只有消息从Exchange路由到Queue失败才会回调这个方法 rabbitTemplate.setReturnCallback((message, replyCode, replyText, exchange, routingKey) -> { log.info("消息从Exchange路由到Queue失败: exchange: {}, route: {}, replyCode: {}, replyText: {}, message: {}", exchange, routingKey, replyCode, replyText, message); }); return rabbitTemplate; } @Bean public Jackson2JsonMessageConverter converter() { return new Jackson2JsonMessageConverter(); } @Bean public Queue mailQueue() { return new Queue(MAIL_QUEUE_NAME, true); } @Bean public DirectExchange mailExchange() { return new DirectExchange(MAIL_EXCHANGE_NAME, true, false); } @Bean public Binding mailBinding() { return BindingBuilder.bind(mailQueue()).to(mailExchange()).with(MAIL_ROUTING_KEY_NAME); } }
4.4 Mail 邮件实体类
@Getter @Setter @NoArgsConstructor @AllArgsConstructor public class Mail { @Pattern(regexp = "^([a-z0-9A-Z]+[-|\\.]?)+[a-z0-9A-Z]@([a-z0-9A-Z]+(-[a-z0-9A-Z]+)?\\.)+[a-zA-Z]{2,}$", message = "邮箱格式不正确") private String to; @NotBlank(message = "标题不能为空") private String title; @NotBlank(message = "正文不能为空") private String content; private String msgId;// 消息id }
4.5 Mail SendMailUtil邮件发送类
@Component @Slf4j public class SendMailUtil { @Value("${spring.mail.from}") private String from; @Autowired private JavaMailSender mailSender; /** * 发送简单邮件 * * @param mail */ public boolean send(Mail mail) { String to = mail.getTo();// 目标邮箱 String title = mail.getTitle();// 邮件标题 String content = mail.getContent();// 邮件正文 SimpleMailMessage message = new SimpleMailMessage(); message.setFrom(from); message.setTo(to); message.setSubject(title); message.setText(content); try { mailSender.send(message); log.info("邮件发送成功"); return true; } catch (MailException e) { log.error("邮件发送失败, to: {}, title: {}", to, title, e); return false; } } }
4.6 ProduceServiceImpl 生产者类
@Service public class ProduceServiceImpl implements ProduceService { @Autowired private RabbitTemplate rabbitTemplate; @Override public boolean send(Mail mail) { //创建uuid String msgId = UUID.randomUUID().toString().replaceAll("-", ""); mail.setMsgId(msgId); //发送消息到rabbitMQ CorrelationData correlationData = new CorrelationData(msgId); rabbitTemplate.convertAndSend(RabbitConfig.MAIL_EXCHANGE_NAME, RabbitConfig.MAIL_ROUTING_KEY_NAME, MessageHelper.objToMsg(mail), correlationData); return true; } }
4.7 ConsumerMailService 消费者类
@Component @Slf4j public class ConsumerMailService { @Autowired private SendMailUtil sendMailUtil; @RabbitListener(queues = RabbitConfig.MAIL_QUEUE_NAME) public void consume(Message message, Channel channel) throws IOException { //将消息转化为对象 String str = new String(message.getBody()); Mail mail = JsonUtil.strToObj(str, Mail.class); log.info("收到消息: {}", mail.toString()); MessageProperties properties = message.getMessageProperties(); long tag = properties.getDeliveryTag(); boolean success = sendMailUtil.send(mail); if (success) { channel.basicAck(tag, false);// 消费确认 } else { channel.basicNack(tag, false, true); } } }
4.8 TestController 控制层类
@RestController @RequestMapping("/test") @Slf4j public class TestController { @Autowired private ProduceService testService; @PostMapping("send") public boolean sendMail(Mail mail) { return testService.send(mail); } }
五、测试
启动 SpringBoot 服务之后,模拟请求接口。
六、消息发送失败处理
若 rabbitMQ 突然崩溃、邮件发送失败、重启 rabbitMQ 服务器出现消息重复消费,的怎处理。
6.1 创建消息投递日志表
CREATE TABLE `msg_log` ( `msg_id` varchar(255) NOT NULL DEFAULT '' COMMENT '消息唯一标识', `msg` text COMMENT '消息体, json格式化', `exchange` varchar(255) NOT NULL DEFAULT '' COMMENT '交换机', `routing_key` varchar(255) NOT NULL DEFAULT '' COMMENT '路由键', `status` int(11) NOT NULL DEFAULT '0' COMMENT '状态: 0投递中 1投递成功 2投递失败 3已消费', `try_count` int(11) NOT NULL DEFAULT '0' COMMENT '重试次数', `next_try_time` datetime DEFAULT NULL COMMENT '下一次重试时间', `create_time` datetime DEFAULT NULL COMMENT '创建时间', `update_time` datetime DEFAULT NULL COMMENT '更新时间', PRIMARY KEY (`msg_id`), UNIQUE KEY `unq_msg_id` (`msg_id`) USING BTREE ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='消息投递日志';
6.2 编写 MsgLog 相关服务类
public interface MsgLogService { /** * 插入消息日志 * @param msgLog */ void insert(MsgLog msgLog); /** * 更新消息状态 * @param msgId * @param status */ void updateStatus(String msgId, Integer status); /** * 查询消息 * @param msgId * @return */ MsgLog selectByMsgId(String msgId); }
6.3 改写服务逻辑
生产服务类中,新增数据写入:
在RabbitConfig服务配置,当消息发送成功之后,新增更新消息状态逻辑:
消费者ConsumerMailService,每次消费的时候,从数据库中查询,如果消息已经被消费,不用再重复发送数据:
6.4 使用定数任务对消息投递失败进行补偿
当 rabbitMQ 服务器突然挂掉之后,生成者就无法正常进行投递数据,此时因为消息已经被记录到数据库,因此我们可以利用定数任务查询出没有投递成功的消息,进行补偿投递:
利用定数任务,对投递失败的消息进行补偿投递,基本可以保证消息 100% 消费成功