RocketMq相较于rabbitMq的优点(https://www.jianshu.com/p/56686af7eedd):
RocketMQ的优点:
- 性能优越:RocketMQ在处理大量消息时,性能优于RabbitMQ。当面临每秒数万到数十万条消息的处理需求时,RocketMQ能够提供更好的性能。
- 灵活的路由配置:RocketMQ在生产者和队列之间增加了一个Exchange模块,根据配置的路由规则将生产者发出的消息分发到不同的队列中,这提供了更大的灵活性。
- 对在线业务的响应时延做了很多优化:RocketMQ对在线业务的响应时延做了很多优化,大多数情况下可以做到毫秒级响应。
- 中文社区活跃:对于中文用户来说,RocketMQ的中文社区比较活跃,源代码易读,方便二次开发。
RocketMQ的缺点:
- 大量消息堆积时,会导致性能急剧下降。
- 和其它两种消息队列产品相比,性能是最差的。因此,如果业务对性能要求特别高,就不要选用RocketMQ。
- Java开发,虽然学习成本相对较低,但仍然需要学习相关的开发技术。
RabbitMQ的优点:
- 高并发、高吞吐量:由于使用了Erlang语言,RabbitMQ在消息处理性能和吞吐量方面表现优秀。它可以处理大量的并发消息,并保证高吞吐量。
- 健壮、稳定、易用、跨平台、支持多种语言、文档齐全:RabbitMQ被认为是非常健壮、稳定和易用的消息队列产品。它支持多种编程语言,并且提供了丰富的文档和社区支持。
- 开源提供的管理界面非常棒,用起来很好用:RabbitMQ提供了直观而易于使用的开源管理界面,使得管理和监控消息队列变得非常方便。
- 社区活跃度高:RabbitMQ的社区非常活跃,有大量的用户和开发者在使用和贡献代码。这使得RabbitMQ具有很好的生态系统,便于获取支持和解决问题。
RabbitMQ的缺点:
- Erlang开发,很难去看懂源码:对于一些开发者来说,Erlang语言可能比较陌生,学习成本较高,而且源代码可能难以理解。这可能会对二次开发和维护造成一定的困难。
- 需要学习比较复杂的接口和协议:RabbitMQ使用了一些相对复杂的接口和协议,学习和维护成本较高。这可能会增加开发和维护的难度。
- 相比其他消息队列产品,性能稍逊一筹:虽然RabbitMQ在性能方面表现不错,但相比一些其他消息队列产品(如RocketMQ),其性能可能稍逊一筹。如果业务对性能要求特别高,可能需要考虑其他选择。
综上所述,RocketMQ和RabbitMQ各有优缺点,需要根据具体业务需求进行选择。如果业务需要高性能、灵活的路由配置和对在线业务的响应时延有较高要求,可以考虑使用RocketMQ;如果业务需要高并发、高吞吐量、健壮稳定且易于使用和管理的消息队列产品,可以考虑使用RabbitMQ。
1.maven依赖
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2.0</version> <exclusions> <exclusion> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> </exclusion> </exclusions> </dependency> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.0</version> </dependency>
2.配置文件
rocketmq: name-server: xx.xx.xx.xx:9876 // 你的rocketmq服务器及端口 producer: group: sms_producer_group
3.配置类
这里只是对RocketMQTemplate中的一些方法进行了封装,更方便调用,其他方法可直接查看RocketMQTemplate类
package net.trueland.smart.marketing.config; import org.springframework.context.annotation.Configuration; /** * RocketMq * * @author sxd * @date 2023/9/8 10:30 */ @Configuration public class RocketMqConfig { public final static String AI_CALL_CALLBACK_TOPIC = "tcloud_di_marketing_delay_passageway_topic"; public final static String AI_CALL_CALLBACK_GROUP = "tcloud_di_marketing_delay_passageway_group"; }
4.工具类
package net.trueland.smart.marketing.util; import com.alibaba.fastjson.JSONObject; import lombok.extern.slf4j.Slf4j; import org.apache.rocketmq.client.producer.SendCallback; import org.apache.rocketmq.client.producer.SendResult; import org.apache.rocketmq.common.message.Message; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Component; /** * RocketMqUtil * @author sxd * @date 2023/9/8 10:52 */ @Component @Slf4j public class RocketMqUtil { @Autowired private RocketMQTemplate rocketmqTemplate; /** * 同步发送 * @param topic * @param content * @return */ public SendResult syncSend(String topic, String content) { SendResult sendResult = rocketmqTemplate.syncSend(topic, MessageBuilder.withPayload(content).build()); log.info("RocketMqUtil-syncSend,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult)); return sendResult; } /** * 异步发送 * @param topic * @param content * @return */ public void aSyncSend(String topic, String content) { SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("RocketMqUtil-aSyncSend-success,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { log.info("RocketMqUtil-aSyncSend-failed,topic:{},obj:{}, result:{}", topic, content, throwable.getMessage()); } }; rocketmqTemplate.asyncSend(topic, content, callback); } /** * 异步发送--延迟队列 * @param topic 主题 * @param content 内容 * @param delayTimeLevel 延迟等级 */ public void aSyncSendDelay(String topic, String content, Integer delayTimeLevel) { SendCallback callback = new SendCallback() { @Override public void onSuccess(SendResult sendResult) { log.info("RocketMqUtil-aSyncSendDelay-success,topic:{},obj:{}, result:{}", topic, content, JSONObject.toJSONString(sendResult)); } @Override public void onException(Throwable throwable) { log.info("RocketMqUtil-aSyncSendDelay-failed,topic:{},obj:{}, result:{}", topic, content, throwable.getMessage()); } }; // 3000表示发送超时时间 rocketmqTemplate.asyncSend(topic, MessageBuilder.withPayload(content).build(), callback, 3000L, delayTimeLevel); } }
5.生产者
// 这里以异步延迟消息为例 // 发送rocketmq。延迟等级使用14,即10分钟 rocketMqUtil.aSyncSendDelay(RocketMqConfig.AI_CALL_CALLBACK_TOPIC, JSON.toJSONString(req), 14);
6.消费者
package net.trueland.smart.marketing.mq; import cn.hutool.http.HttpRequest; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONArray; import com.alibaba.fastjson.JSONObject; import com.google.common.collect.Sets; import lombok.extern.slf4j.Slf4j; import net.trueland.smart.marketing.config.RocketMqConfig; import net.trueland.smart.marketing.model.aicall.CreateTaskRequest; import net.trueland.smart.marketing.util.AICallUtils; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Component; import javax.annotation.Resource; /** * * @author sxd * @date 2023/9/8 14:44 */ @Slf4j @Component @RocketMQMessageListener(topic = RocketMqConfig.AI_CALL_CALLBACK_TOPIC, consumerGroup = RocketMqConfig.AI_CALL_CALLBACK_GROUP) public class AICallCallbackConsumer implements RocketMQListener<String> { @Resource AICallUtils aiCallUtils; @Override public void onMessage(String data) { log.info("AICallCallbackConsumer-onMessage-收到延迟队列消息:{}", data); try{ // 使用实体类解析消息(根据自己的消息来创建自己的实体类) CreateTaskRequest createTaskRequest = JSON.parseObject(data, CreateTaskRequest.class); // 业务逻辑。。。 }catch (Exception e){ log.error("AICallCallbackConsumer-onMessage-异常", e); } } }
解释一下消费者中的代码
@RocketMQMessageListener(topic = RocketMqConfig.AI_CALL_CALLBACK_TOPIC, consumerGroup = RocketMqConfig.AI_CALL_CALLBACK_GROUP)
@RocketMQMessageListener表示这是一个rocketmq的消费者,该注解只能用在类上面 topic:指定需要消费的队列
consumerGroup:定义消费组,消费组是指处理某一类消息的消费者的集合
注意事项
1.多个监听器使用同一个group,会报错Caused by: org.apache.rocketmq.client.exception.MQClientException: The consu
解决办法:https://blog.csdn.net/LLittleF/article/details/130689449
2.可视化平台:rocketmq服务器地址:18080