Springboot整合RocketMq

avatar
作者
猴君
阅读量:0

RocketMq相较于rabbitMq的优点(https://www.jianshu.com/p/56686af7eedd):
RocketMQ的优点:

  • 性能优越:RocketMQ在处理大量消息时,性能优于RabbitMQ。当面临每秒数万到数十万条消息的处理需求时,RocketMQ能够提供更好的性能。
  • 灵活的路由配置:RocketMQ在生产者和队列之间增加了一个Exchange模块,根据配置的路由规则将生产者发出的消息分发到不同的队列中,这提供了更大的灵活性。
  • 对在线业务的响应时延做了很多优化:RocketMQ对在线业务的响应时延做了很多优化,大多数情况下可以做到毫秒级响应。
  • 中文社区活跃:对于中文用户来说,RocketMQ的中文社区比较活跃,源代码易读,方便二次开发。

RocketMQ的缺点:

  • 大量消息堆积时,会导致性能急剧下降。
  • 和其它两种消息队列产品相比,性能是最差的。因此,如果业务对性能要求特别高,就不要选用RocketMQ。
  • Java开发,虽然学习成本相对较低,但仍然需要学习相关的开发技术。

RabbitMQ的优点:

  • 高并发、高吞吐量:由于使用了Erlang语言,RabbitMQ在消息处理性能和吞吐量方面表现优秀。它可以处理大量的并发消息,并保证高吞吐量。
  • 健壮、稳定、易用、跨平台、支持多种语言、文档齐全:RabbitMQ被认为是非常健壮、稳定和易用的消息队列产品。它支持多种编程语言,并且提供了丰富的文档和社区支持。
  • 开源提供的管理界面非常棒,用起来很好用:RabbitMQ提供了直观而易于使用的开源管理界面,使得管理和监控消息队列变得非常方便。
  • 社区活跃度高:RabbitMQ的社区非常活跃,有大量的用户和开发者在使用和贡献代码。这使得RabbitMQ具有很好的生态系统,便于获取支持和解决问题。

RabbitMQ的缺点:

  • Erlang开发,很难去看懂源码:对于一些开发者来说,Erlang语言可能比较陌生,学习成本较高,而且源代码可能难以理解。这可能会对二次开发和维护造成一定的困难。
  • 需要学习比较复杂的接口和协议:RabbitMQ使用了一些相对复杂的接口和协议,学习和维护成本较高。这可能会增加开发和维护的难度。
  • 相比其他消息队列产品,性能稍逊一筹:虽然RabbitMQ在性能方面表现不错,但相比一些其他消息队列产品(如RocketMQ),其性能可能稍逊一筹。如果业务对性能要求特别高,可能需要考虑其他选择。

综上所述,RocketMQ和RabbitMQ各有优缺点,需要根据具体业务需求进行选择。如果业务需要高性能、灵活的路由配置和对在线业务的响应时延有较高要求,可以考虑使用RocketMQ;如果业务需要高并发、高吞吐量、健壮稳定且易于使用和管理的消息队列产品,可以考虑使用RabbitMQ。

1.maven依赖

<dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-spring-boot-starter</artifactId>     <version>2.2.0</version>     <exclusions>         <exclusion>             <groupId>org.apache.rocketmq</groupId>             <artifactId>rocketmq-client</artifactId>         </exclusion>     </exclusions> </dependency> <dependency>     <groupId>org.apache.rocketmq</groupId>     <artifactId>rocketmq-client</artifactId>     <version>4.9.0</version> </dependency> 

2.配置文件

rocketmq:   name-server: xx.xx.xx.xx:9876    // 你的rocketmq服务器及端口   producer:     group: sms_producer_group 

3.配置类
这里只是对RocketMQTemplate中的一些方法进行了封装,更方便调用,其他方法可直接查看RocketMQTemplate类

package net.trueland.smart.marketing.config;  import org.springframework.context.annotation.Configuration;  /**  * RocketMq  *  * @author sxd  * @date 2023/9/8 10:30  */ @Configuration public class RocketMqConfig {     public final static String AI_CALL_CALLBACK_TOPIC = "tcloud_di_marketing_delay_passageway_topic";      public final static String AI_CALL_CALLBACK_GROUP = "tcloud_di_marketing_delay_passageway_group";  } 

4.工具类

package net.trueland.smart.marketing.util;  import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component;  /**  * RocketMqUtil  * @author sxd  * @date 2023/9/8 10:52  */ @Component @Slf4j public class RocketMqUtil {      @Autowired     private RocketMQTemplate rocketmqTemplate;      /**      * 同步发送      * @param topic      * @param content      * @return      */     public SendResult syncSend(String topic, String content) {         SendResult sendResult = rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(content).build());         log.info("RocketMqUtil-syncSend,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult));         return sendResult;     }      /**      * 异步发送      * @param topic      * @param content      * @return      */     public void aSyncSend(String topic, String content) {         SendCallback callback = new SendCallback() {             @Override             public void onSuccess(SendResult sendResult) {                 log.info("RocketMqUtil-aSyncSend-success,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult));             }              @Override             public void onException(Throwable throwable) {                 log.info("RocketMqUtil-aSyncSend-failed,topic:{},obj:{}, result:{}", topic, content, throwable.getMessage());             }         };         rocketmqTemplate.asyncSend(topic, content, callback);     }      /**      * 异步发送--延迟队列      * @param topic 主题      * @param content 内容      * @param delayTimeLevel 延迟等级      */     public void aSyncSendDelay(String topic, String content, Integer delayTimeLevel) {         SendCallback callback = new SendCallback() {             @Override             public void onSuccess(SendResult sendResult) {                 log.info("RocketMqUtil-aSyncSendDelay-success,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult));             }              @Override             public void onException(Throwable throwable) {                 log.info("RocketMqUtil-aSyncSendDelay-failed,topic:{},obj:{}, result:{}", topic, content, throwable.getMessage());             }         };         // 3000表示发送超时时间         rocketmqTemplate.asyncSend(topic, MessageBuilder.withPayload(content).build(), callback, 3000L, delayTimeLevel);     }  } 

5.生产者

// 这里以异步延迟消息为例 // 发送rocketmq。延迟等级使用14,即10分钟 rocketMqUtil.aSyncSendDelay(RocketMqConfig.AI_CALL_CALLBACK_TOPIC, JSON.toJSONString(req), 14); 

6.消费者

package net.trueland.smart.marketing.mq;  import cn.hutool.http.HttpRequest; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import net.trueland.smart.marketing.config.RocketMqConfig; import net.trueland.smart.marketing.model.aicall.CreateTaskRequest; import net.trueland.smart.marketing.util.AICallUtils; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component;  import javax.annotation.Resource;  /**  *  * @author sxd  * @date 2023/9/8 14:44  */ @Slf4j @Component @RocketMQMessageListener(topic = RocketMqConfig.AI_CALL_CALLBACK_TOPIC, consumerGroup = RocketMqConfig.AI_CALL_CALLBACK_GROUP) public class AICallCallbackConsumer implements RocketMQListener<String> {      @Resource     AICallUtils aiCallUtils;      @Override     public void onMessage(String data) {         log.info("AICallCallbackConsumer-onMessage-收到延迟队列消息:{}",  data);         try{             // 使用实体类解析消息(根据自己的消息来创建自己的实体类)             CreateTaskRequest createTaskRequest = JSON.parseObject(data, CreateTaskRequest.class);             // 业务逻辑。。。                      }catch (Exception e){             log.error("AICallCallbackConsumer-onMessage-异常", e);         }     } } 

解释一下消费者中的代码
@RocketMQMessageListener(topic = RocketMqConfig.AI_CALL_CALLBACK_TOPIC, consumerGroup = RocketMqConfig.AI_CALL_CALLBACK_GROUP)
@RocketMQMessageListener表示这是一个rocketmq的消费者,该注解只能用在类上面 topic:指定需要消费的队列
consumerGroup:定义消费组,消费组是指处理某一类消息的消费者的集合

注意事项
1.多个监听器使用同一个group,会报错Caused by: org.apache.rocketmq.client.exception.MQClientException: The consu
解决办法:https://blog.csdn.net/LLittleF/article/details/130689449
2.可视化平台:rocketmq服务器地址:18080

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!