一、概述
1.1介绍
RocketMQ是阿里巴巴2016年MQ中间件,使用Java语言开发,RocketMQ 是一款开源的分布式消息系统,基于高可用分布式集群技术,提供低延时的、高可靠的消息发布与订阅服务。同时,广泛应用于多个领域,包括异步通信解耦、企业解决方案、金融支付、电信、电子商务、快递物流、广告营销、社交、即时通信、移动应用、手游、视频、物联网、车联网等。
官网: http://rocketmq.apache.org/
1.2相关概念
Producer:消息的发送者,生产者;举例:发件人
Consumer:消息接收者,消费者;举例:收件人
Broker:临时保存生产者发送消息的服务器;举例:快递
NameServer:管理Broker;举例:各个快递公司的管理机构 相当于broker的注册中心,保留了broker的信息
Queue:队列,消息存放的位置,一个Broker中可以有多个队列
Topic:主题,消息的分类
ProducerGroup:生产者组
ConsumerGroup:消费者组,多个消费者组可以同时消费一个主题的消息
消息发送的流程是,Producer询问NameServer,NameServer分配一个broker 然后Consumer也要询问NameServer,得到一个具体的broker,然后消费消息
单机版
集群
概述
- 生产者:就是用于生产消息的应用程序。
- 消息:就是要传输的数据,可以是最简单的文本字符串,也可以是自定义的复杂格式(只要能按预定格式解析出来即可)。
- 队列:大家应该再熟悉不过了,是一种先进先出数据结构。它是存放消息的容器,消息从队尾入队,从队头出队,入队即发消息的过程,出队即收消息的过程。
- 消费者:就是用于读取队列中消息的应用程序。
1.3部署
https://blog.csdn.net/Acloasia/article/details/130548105
version: '3' services: namesrv: image: apacherocketmq/rocketmq:4.6.0 ports: - 9876:9876 volumes: - ./data/namesrv/logs:/home/rocketmq/logs command: sh mqnamesrv broker: image: apacherocketmq/rocketmq:4.6.0 ports: - 10909:10909 - 10911:10911 - 10912:10912 user: "${UID}:3000" volumes: - ./data/broker/logs:/home/rocketmq/logs - ./data/broker/store:/home/rocketmq/store - ./broker.conf:/home/rocketmq/rocketmq-4.6.0/conf/broker.conf command: sh mqbroker -n namesrv:9876 -c ../conf/broker.conf depends_on: - namesrv
1.4应用场景
使用消息中间件最主要的目的:
- [1] 应用解耦
- [2] 异步处理
- [3] 流量削峰
[1] 应用解耦
场景:双11是购物狂节,用户下单后,订单系统需要通知库存系统,传统的做法就是订单系统调用库存系统的接口. `
问题:`当库存系统出现故障时,订单就会失败。 订单系统和库存系统高耦合。如何解决这个问题? 引入消息队列之后
订单系统:
用户下单后,订单系统完成持久化处理,将消息写入消息队列,返回用户订单下单成功。库存系统:
订阅下单的消息,获取下单消息,进行减库操作。 就算库存系统出现故障,消息队列也能保证消息的可靠投递,不会导致消息丢失。
[2] 异步处理
场景说明:用户注册后,需要发注册邮件和注册短信,传统的做法有两种 1.串行的方式 2.并行的方式`
串行方式:
将注册信息写入数据库后,发送注册邮件,再发送注册短信,以上三个任务全部完成后才返回给客户端。 这有一个问题是,邮件,短信并不是必须的,它只是一个通知,而这种做法让客户端等待没有必要等待的东西.
- 并行方式: `将注册信息写入数据库后,发送邮件的同时,发送短信,以上三个任务完成后,返回给客户端,并行的方式能提高处理的时间。
- 消息队列:
假设三个业务节点分别使用50ms,串行方式使用时间150ms,并行使用时间100ms。虽然并行已经提高的处理时间,但是,前面说过,邮件和短信对我正常的使用网站没有任何影响,客户端没有必要等着其发送完成才显示注册成功,应该是写入数据库后就返回。
消息队列`: 引入消息队列后,把发送邮件、短信不是必须的业务逻辑异步处理。
结论: 由此可以看出,引入消息队列后,用户的响应时间就等于写入数据库的时间+写入消息队列的时间(可以忽略不计),引入消息队列后处理后,响应时间是串行的3倍,是并行的2倍。
[3] 流量削峰
场景:
秒杀活动,一般会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。
作用:`
可以控制活动人数,超过此一定阀值的订单直接丢弃(我为什么秒杀一次都没有成功过呢^^) 。
可以缓解短时间的高流量压垮应用(应用程序按自己的最大处理能力获取订单) 。
用户的请求,服务器收到之后,首先写入消息队列,加入消息队列长度超过最大值,则直接抛弃用户请求或跳转到错误页面;
秒杀业务根据消息队列中的请求信息,再做后续处理;
二、快速入门
2.1现有角色分析与流程
1 相关角色
消息生产者:
producer
,消息生产者,web-service
中web
是生产者。消息服务器:
broker
,经纪人。实现接收、提供、持久化、过滤消息。消息消费者:
consumer
。消费消息,web-service
中service
是消费者。上述三个角色都可以搭建集群,实现高可用;
监听器监听
broker
,消费者监听broker
,有消息就消费偏移量(
offset
):消费者需要从代理服务器中获取消息,消费使用;消费完之后并没有删除,而是打了一个已经消费完的标签;偏移量记录的就是所有已经消费过的数据的编码。命名服务器:NameServer [cluster],统筹管理前前三个角色
消息组成:消息体(body)、主题(Topic)、标签(tag子主题)
broker
组成:内含多个不同主题(Topic)
,每个topic
中包含多个队列(默认4个)
2 工作流程
常见概念
消息(Message)
消息
是 Apache RocketMQ 中的最小数据传输单元
。生产者将业务数据的负载和拓展属性包装成消息发送到服务端,服务端按照相关语义将消息投递到消费端进行消费。
通俗理解: 消息就是自己想要传递业务数据
主题(Topic)
主题 是Apache RocketMQ 中消息传输和存储的顶层容器
,用于标识同一类业务逻辑的消息。主题通过TopicName来做唯一标识和区分。
通俗理解: 就是用来给发送消息进行分类。一个消息发送者可以发送消息到一个或多个主题,一个消息消费者也可以消费一个或多个主题的消息。
消息类型(MessageType)
Apache RocketMQ 支持的消息类型有普通消息、顺序消息、事务消息和定时/延时消息。
注意:Apache RocketMQ 从5.0版本开始,支持强制校验消息类型,即每个主题Topic只允许发送一种消息类型的消息,这样可以更好的运维和管理生产系统,避免混乱。但同时保证向下兼容4.x版本行为,强制校验功能默认开启。
消息队列(MessageQueue)
队列
是 Apache RocketMQ 中消息存储和传输的实际容器
,也是消息的最小存储单元
。 Apache RocketMQ 的所有主题都是由多个队列组成
,以此实现队列数量的水平拆分和队列内部的流式存储。队列通过QueueId来做唯一标识和区分。
通俗理解: 就是 topic 的分区,用来更好实现队列数量的水平拆分和队列内部的流式存储。(水平拆分意味着可以通过增加更多的队列来提高系统的并行处理能力,而流式存储则是指队列可以持续接收和发送消息,适用于高吞吐量的场景。)
消费者分组(ConsumerGroup)
消费者分组是Apache RocketMQ 系统中承载多个消费行为一致的消费者的负载均衡分组
。和消费者不同,消费者分组并不是运行实体,而是一个逻辑资源。在 Apache RocketMQ 中,通过消费者分组内初始化多个消费者实现消费性能的水平扩展以及高可用容灾。
订阅关系(Subscription)
Apache RocketMQ 发布订阅模型中消息过滤、重试、消费进度的规则配置
。订阅关系以消费组粒度进行管理,消费组通过定义订阅关系控制指定消费组下的消费者如何实现消息过滤、消费重试及消费进度恢复等。
2.2消息发送和监听的流程
消息生产者
1.创建消息生产者producer,并制定生产者组名
2.指定Nameserver地址
3.启动producer
4.创建消息对象,指定主题Topic、Tag和消息体等
5.发送消息
6.关闭生产者producer
消息消费者
1.创建消费者consumer,制定消费者组名
2.指定Nameserver地址
3.创建监听订阅主题Topic和Tag等
4.处理消息
5.启动消费者consumer
2.3原生API的spring整合
<dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-client</artifactId> <version>4.9.2</version> </dependency>
生产者
@Test public void testProducer() throws Exception { // 创建默认的生产者,并设置生产者组名为"test-group" DefaultMQProducer producer = new DefaultMQProducer("test-group"); // 设置nameServer地址为本地的9876端口 producer.setNamesrvAddr("host:9876"); // 启动生产者实例 producer.start(); // 循环发送10条消息 for (int i = 0; i < 10; i++) { // 创建消息对象,指定主题为"TopicTest",内容为"Hello RocketMQ "加上当前循环次数 Message msg = new Message("TopicTest", ("Hello RocketMQ " + i).getBytes()); // 发送消息,并获取发送结果 SendResult send = producer.send(msg); // 打印发送结果 System.out.println(send); } // 关闭生产者实例 producer.shutdown(); }
消费者
@Test public void testConsumer() throws Exception { // 创建默认消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); // 设置nameServer地址 consumer.setNamesrvAddr("host:9876"); // 订阅一个主题来消费,*表示没有过滤参数,表示这个主题的任何消息 consumer.subscribe("TopicTest", "*"); // 注册一个消费监听,MessageListenerConcurrently是多线程消费,默认20个线程,可以参看consumer.setConsumeThreadMax() consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 打印当前线程名和接收到的消息 System.out.println(Thread.currentThread().getName() + "----" + msgs); // 返回消费的状态,如果是CONSUME_SUCCESS则成功,若为RECONSUME_LATER则该条消息会被重回队列,重新被投递 // 重试的时间为messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h // 也就是第一次1s,第二次5s,第三次10s,.... 如果重试了18次,那么这个消息就会被终止发送给消费者 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); // 这个start一定要写在registerMessageListener下面 consumer.start(); // 阻塞等待用户输入,防止程序立即退出 System.in.read(); }
2.4消费模式
MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。
Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。
Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式
RocketMQ发送同步消息
上面的快速入门就是发送同步消息,发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方
RocketMQ发送异步消息
异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知
生产者
@Test public void testAsyncProducer() throws Exception { // 创建默认的生产者,指定生产者组名为"test-group" DefaultMQProducer producer = new DefaultMQProducer("test-group"); // 设置nameServer地址为本地的9876端口 producer.setNamesrvAddr("localhost:9876"); // 启动生产者实例 producer.start(); // 创建一个消息对象,主题为"TopicTest",内容为"异步消息"的字节数组 Message msg = new Message("TopicTest", ("异步消息").getBytes()); // 发送消息,并提供一个回调函数来处理发送结果 producer.send(msg, new SendCallback() { @Override public void onSuccess(SendResult sendResult) { // 当消息发送成功时,打印"发送成功" System.out.println("发送成功"); } @Override public void onException(Throwable e) { // 当消息发送失败时,打印"发送失败" System.out.println("发送失败"); } }); // 打印一条信息,用于观察回调函数是否已经执行 System.out.println("看看谁先执行"); // 挂起jvm,等待回调函数执行完成,因为回调是异步的,如果不挂起jvm,测试可能无法观察到回调的效果 System.in.read(); // 关闭生产者实例 producer.shutdown(); }
消费者
@Test public void testAsyncConsumer() throws Exception { // 创建默认消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); // 设置nameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个主题来消费,*表示没有过滤参数,表示这个主题的任何消息 consumer.subscribe("TopicTest", "*"); // 注册一个消费监听,MessageListenerConcurrently是并发消费 // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax() consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 这里执行消费的代码,默认是多线程消费 System.out.println(Thread.currentThread().getName() + "----" + msgs); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); // 启动消费者 consumer.start(); // 等待用户输入,防止程序立即退出 System.in.read(); }
RocketMQ发送单向消息
这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,例如日志信息的发送
生产者
@Test public void testOnewayProducer() throws Exception { // 创建默认的生产者,设置生产者组名为"test-group" DefaultMQProducer producer = new DefaultMQProducer("test-group"); // 设置nameServer地址为本地地址,端口为9876 producer.setNamesrvAddr("localhost:9876"); // 启动生产者实例 producer.start(); // 创建一个消息对象,主题为"TopicTest",内容为"单向消息"的字节数组 Message msg = new Message("TopicTest", ("单向消息").getBytes()); // 发送单向消息,不需要等待服务器响应 producer.sendOneway(msg); // 关闭生产者实例,释放资源 producer.shutdown(); }
消费者
消费者和上面一样
RocketMQ发送延迟消息
消息放入mq后,过一段时间,才会被监听到,然后消费
比如下订单业务,提交了一个订单就可以发送一个延时消息,30min后去检查这个订单的状态,如果还是未付款就取消订单释放库存。
@Test public void testDelayProducer() throws Exception { // 创建默认的生产者,并设置生产者组名为"test-group" DefaultMQProducer producer = new DefaultMQProducer("test-group"); // 设置nameServer地址为本地的9876端口 producer.setNamesrvAddr("localhost:9876"); // 启动生产者实例 producer.start(); // 创建一个消息对象,主题为"TopicTest",内容为"延迟消息"的字节数组 Message msg = new Message("TopicTest", ("延迟消息").getBytes()); // 给这个消息设定一个延迟等级,这里设置为3,表示延迟5秒发送 // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h" msg.setDelayTimeLevel(3); // 使用生产者发送单向消息 producer.send(msg); // 打印当前时间,用于观察消息发送的时间 System.out.println(new Date()); // 关闭生产者实例 producer.shutdown(); }
RocketMQ批量消息
生产者
@Test public void testBatchProducer() throws Exception { // 创建默认的生产者 DefaultMQProducer producer = new DefaultMQProducer("test-group"); // 设置nameServer地址 producer.setNamesrvAddr("localhost:9876"); // 启动实例 producer.start(); List<Message> msgs = Arrays.asList( new Message("TopicTest", "我是一组消息的A消息".getBytes()), new Message("TopicTest", "我是一组消息的B消息".getBytes()), new Message("TopicTest", "我是一组消息的C消息".getBytes()) ); SendResult send = producer.send(msgs); System.out.println(send); // 关闭实例 producer.shutdown(); }
消费者
@Test public void testBatchConsumer() throws Exception { // 创建默认消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); // 设置nameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个主题来消费 表达式,默认是* consumer.subscribe("TopicTest", "*"); // 注册一个消费监听 MessageListenerConcurrently是并发消费 // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax() consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 这里执行消费的代码 默认是多线程消费 System.out.println(Thread.currentThread().getName() + "----" + new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
RocketMQ发送顺序消息
消息有序指的是可以按照消息的发送顺序来消费(FIFO)。RocketMQ可以严格的保证消息有序,可以分为:分区有序或者全局有序。
可能大家会有疑问,mq不就是FIFO吗?
rocketMq的broker的机制,导致了rocketMq会有这个问题
因为一个broker中对应了四个queue
顺序消费的原理解析,在默认的情况下消息发送会采取Round Robin轮询方式把消息发送到不同的queue(分区队列);而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每个queue,消息都是有序的。
下面用订单进行分区有序的示例。一个订单的顺序流程是:下订单、发短信通知、物流、签收。订单顺序号相同的消息会被先后发送到同一个队列中,消费时,同一个顺序获取到的肯定是同一个队列。
生产者
@Test public void testOrderlyProducer() throws Exception { // 创建默认的生产者 DefaultMQProducer producer = new DefaultMQProducer("test-group"); // 设置nameServer地址 producer.setNamesrvAddr("localhost:9876"); // 启动实例 producer.start(); List<Order> orderList = Arrays.asList( new Order(1, 111, 59D, new Date(), "下订单"), new Order(2, 111, 59D, new Date(), "物流"), new Order(3, 111, 59D, new Date(), "签收"), new Order(4, 112, 89D, new Date(), "下订单"), new Order(5, 112, 89D, new Date(), "物流"), new Order(6, 112, 89D, new Date(), "拒收") ); // 循环集合开始发送 orderList.forEach(order -> { Message message = new Message("TopicTest", order.toString().getBytes()); try { // 发送的时候 相同的订单号选择同一个队列 producer.send(message, new MessageQueueSelector() { @Override public MessageQueue select(List<MessageQueue> mqs, Message msg, Object arg) { // 当前主题有多少个队列 int queueNumber = mqs.size(); // 这个arg就是后面传入的 order.getOrderNumber() Integer i = (Integer) arg; // 用这个值去%队列的个数得到一个队列 int index = i % queueNumber; // 返回选择的这个队列即可 ,那么相同的订单号 就会被放在相同的队列里 实现FIFO了 return mqs.get(index); } }, order.getOrderNumber()); } catch (Exception e) { System.out.println("发送异常"); } }); // 关闭实例 producer.shutdown(); }
消费者
@Test public void testOrderlyConsumer() throws Exception { // 创建默认消费者组 DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer-group"); // 设置nameServer地址 consumer.setNamesrvAddr("localhost:9876"); // 订阅一个主题来消费 *表示没有过滤参数 表示这个主题的任何消息 consumer.subscribe("TopicTest", "*"); // 注册一个消费监听 MessageListenerOrderly 是顺序消费 单线程消费 consumer.registerMessageListener(new MessageListenerOrderly() { @Override public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) { MessageExt messageExt = msgs.get(0); System.out.println(new String(messageExt.getBody())); return ConsumeOrderlyStatus.SUCCESS; } }); consumer.start(); System.in.read(); }
RocketMQ发送带标签的消息
Rocketmq提供消息过滤功能,通过tag或者key进行区分
我们往一个主题里面发送消息的时候,根据业务逻辑,可能需要区分,比如带有tagA标签的被A消费,带有tagB标签的被B消费,还有在事务监听的类里面,只要是事务消息都要走同一个监听,我们也需要通过过滤才区别对待
tag方法
标签消息生产者
@Test public void tagProducer() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("tag-producer-group"); producer.setNamesrvAddr(":9876"); producer.start(); Message message = new Message("tagTopic", "vip1", "我是vip1的文章".getBytes()); Message message2 = new Message("tagTopic", "vip2", "我是vip2的文章".getBytes()); producer.send(message); producer.send(message2); System.out.println("发送成功"); producer.shutdown(); }
标签消息消费者
/** * vip1 * * @throws Exception */ @Test public void tagConsumer1() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-a"); consumer.setNamesrvAddr("47.96.254.46:9876"); consumer.subscribe("tagTopic", "vip1"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("我是vip1的消费者,我正在消费消息" + new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); } /** * vip1 || vip2 * * @throws Exception */ @Test public void tagConsumer2() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tag-consumer-group-b"); consumer.setNamesrvAddr("47.96.254.46:9876"); consumer.subscribe("tagTopic", "vip1 || vip2"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { System.out.println("我是vip2的消费者,我正在消费消息" + new String(msgs.get(0).getBody())); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
总的来说,针对消息分类,您可以选择创建多个 Topic,或者在同一个 Topic 下创建多个 Tag。但通常情况下,不同的 Topic 之间的消息没有必然的联系,而 Tag 则用来区分同一个 Topic 下相互关联的消息,例如全集和子集的关系、流程先后的关系。
Key方法
在rocketmq中的消息,默认会有一个messageId当做消息的唯一标识,我们也可以给消息携带一个key,用作唯一标识或者业务标识,包括在控制面板查询的时候也可以使用messageId或者key来进行查询
生产者
@Test public void keyProducer() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("key-producer-group"); producer.setNamesrvAddr(""); producer.start(); String key = UUID.randomUUID().toString(); System.out.println(key); Message message = new Message("keyTopic", "vip1", key, "我是vip1的文章".getBytes()); producer.send(message); System.out.println("发送成功"); producer.shutdown(); }
消费者
@Test public void keyConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("key-consumer-group"); consumer.setNamesrvAddr(""); consumer.subscribe("keyTopic", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); System.out.println("我是vip1的消费者,我正在消费消息" + new String(messageExt.getBody())); System.out.println("我们业务的标识:" + messageExt.getKeys()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
2.5重复消费问题
在BROADCASTING(广播)模式下,所有注册的消费者都会收到消息,通常这些消费者是集群部署的微服务,导致多台机器重复消费,这取决于需求。在CLUSTERING(负载均衡)模式下,如果多个consumerGroup消费同一个topic,也会发生重复消费。对于同一个consumerGroup,虽然一个队列只分配给一个消费者看似避免重复消费,但在消费者上下线时需重新负载均衡,可能导致新消费者重复消费未提交offset的消息。此外,在发送批量消息时,若部分失败,则整个批量消息会被重新消费。
消息会重复
1.生产者多次投递了
2.消费者方因为扩容时会重试
生产者
@Test void repeatProducer() throws Exception { DefaultMQProducer producer = new DefaultMQProducer("repeat-producer-group"); producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR); producer.start(); String key = UUID.randomUUID().toString(); System.out.println(key); // 测试 发两个key一样的消息 Message m1 = new Message("repeatTopic", null, key, "扣减库存-1".getBytes()); Message m1Repeat = new Message("repeatTopic", null, key, "扣减库存-1".getBytes()); producer.send(m1); producer.send(m1Repeat); System.out.println("发送成功"); producer.shutdown(); }
消费组
@Test public void testRepeatConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("repeat-consumer-group"); consumer.setNamesrvAddr("47.96.254.46:9876"); consumer.subscribe("repeatTopic", "*"); // 注册一个消费监听 MessageListenerConcurrently是并发消费 // 默认是20个线程一起消费,可以参看 consumer.setConsumeThreadMax() consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { // 拿到消息的key MessageExt messageExt = msgs.get(0); String keys = messageExt.getKeys(); // 判断是否存在布隆过滤器中 if (bloomFilter.contains(keys)) { // 直接返回了 不往下处理业务 System.out.println("消息重复了"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } // 这个处理业务,然后放入过滤器中 // do sth... bloomFilter.add(keys); System.out.println("我是消费者,我正在消费消息" + new String(messageExt.getBody())); System.out.println("我们业务的标识:" + messageExt.getKeys()); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
2.6RocketMQ重试机制
生产者重试
// 失败的情况重发3次
producer.setRetryTimesWhenSendFailed(3);
// 消息在1S内没有发送成功,就会重试
producer.send(msg, 1000);
@Test public void retryProducer() throws Exception { // 创建一个名为"retry-producer-group"的DefaultMQProducer实例 DefaultMQProducer producer = new DefaultMQProducer("retry-producer-group"); // 设置NameServer地址,用于生产者与Broker通信 producer.setNamesrvAddr(":9876"); // 启动生产者 producer.start(); // 设置生产者发送消息失败时的重试次数为2次 producer.setRetryTimesWhenSendFailed(2); // 设置异步发送消息失败时的重试次数为2次 producer.setRetryTimesWhenSendAsyncFailed(2); // 生成一个随机的UUID作为消息的key String key = UUID.randomUUID().toString(); // 打印生成的key System.out.println(key); // 创建一个消息实例,包含主题、队列、key和内容 Message message = new Message("retryTopic", "vip1", key, "我是vip666的文章".getBytes()); // 发送消息 producer.send(message); // 打印发送成功的提示信息 System.out.println("发送成功"); // 关闭生产者 producer.shutdown(); }
在消费者放return ConsumeConcurrentlyStatus.RECONSUME_LATER;后就会执行重试
上图代码中说明了,我们再实际生产过程中,一般重试3-5次,如果还没有消费成功,则可以把消息签收了,通知人工等处理
/** * 重试的时间间隔 * 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h * 默认重试16次 * 1.能否自定义重试次数 * 2.如果重试了16次(并发模式) 顺序模式下(int最大值次)都是失败的? 是一个死信消息 则会放在一个死信主题中去 主题的名称:%DLQ%retry-consumer-group * 3.当消息处理失败的时候 该如何正确的处理? * -------------- * 重试的次数一般 5次 * @throws Exception */ @Test public void retryConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group"); consumer.setNamesrvAddr(":9876"); consumer.subscribe("retryTopic", "*"); // 设定重试次数 consumer.setMaxReconsumeTimes(2); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); System.out.println(new Date()); System.out.println(messageExt.getReconsumeTimes()); System.out.println(new String(messageExt.getBody())); // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试 return ConsumeConcurrentlyStatus.RECONSUME_LATER; } }); consumer.start(); System.in.read(); }
2.7RocketMQ死信消息
当消费重试到达阈值以后,消息不会被投递给消费者了,而是进入了死信队列
当一条消息初次消费失败,RocketMQ会自动进行消息重试,达到最大重试次数后,若消费依然失败,则表明消费者在正常情况下无法正确地消费该消息。此时,该消息不会立刻被丢弃,而是将其发送到该消费者对应的特殊队列中,这类消息称为死信消息(Dead-Letter Message),存储死信消息的特殊队列称为死信队列(Dead-Letter Queue),死信队列是死信Topic下分区数唯一的单独队列。如果产生了死信消息,那对应的ConsumerGroup的死信Topic名称为%DLQ%ConsumerGroupName,死信队列的消息将不会再被消费。可以利用RocketMQ Admin工具或者RocketMQ Dashboard上查询到对应死信消息的信息。我们也可以去监听死信队列,然后进行自己的业务上的逻辑
/// 直接监听死信主题的消息,记录下拉 通知人工接入处理 @Test public void retryDeadConsumer() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-dead-consumer-group"); consumer.setNamesrvAddr("9876"); consumer.subscribe("%DLQ%retry-consumer-group", "*"); consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); System.out.println(new Date()); System.out.println(new String(messageExt.getBody())); System.out.println("记录到特别的位置 文件 mysql 通知人工处理"); // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); }
第二种方法
第二种方案 用法比较多 @Test public void retryConsumer2() throws Exception { DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("retry-consumer-group"); consumer.setNamesrvAddr("47.96.254.46:9876"); consumer.subscribe("retryTopic", "*"); // 设定重试次数 consumer.registerMessageListener(new MessageListenerConcurrently() { @Override public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) { MessageExt messageExt = msgs.get(0); System.out.println(new Date()); // 业务处理 try { handleDb(); } catch (Exception e) { // 重试 int reconsumeTimes = messageExt.getReconsumeTimes(); if (reconsumeTimes >= 3) { // 不要重试了 System.out.println("记录到特别的位置 文件 mysql 通知人工处理"); return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } return ConsumeConcurrentlyStatus.RECONSUME_LATER; } // 业务报错了 返回null 返回 RECONSUME_LATER 都会重试 return ConsumeConcurrentlyStatus.CONSUME_SUCCESS; } }); consumer.start(); System.in.read(); } private void handleDb() { int i = 10 / 0; }
三、springboot整合
写依赖
rocketmq: name-server: 47.96.254.46:9876 producer: group: boot-producer-group enable-msg-trace: true # ?????????? access-key: rocketmq2 secret-key: 12345678
写配置
rocketmq: name-server: :9876 producer: group: boot-producer-group enable-msg-trace: true # ?????????? access-key: rocketmq2 secret-key: 12345678
报错No qualifying bean of type 'org.apache.rocketmq.spring.core.RocketMQTemplate' available: expected at least 1 bean which qualifies as autowire candidate.
解决
https://blog.csdn.net/zhenweiyi/article/details/130722046
3.1常见消息书写
@Autowired private RocketMQTemplate rocketMQTemplate; @Test void contextLoads() { // 同步 // rocketMQTemplate.syncSend("bootTestTopic", "我是boot的一个消息"); // // 异步 rocketMQTemplate.asyncSend("bootAsyncTestTopic", "我是boot的一个异步消息", new SendCallback() { @Override public void onSuccess(SendResult sendResult) { System.out.println("成功"); } @Override public void onException(Throwable throwable) { System.out.println("失败" + throwable.getMessage()); } }); // // 单向 rocketMQTemplate.sendOneWay("bootOnewayTopic", "单向消息"); // 延迟 Message<String> msg = MessageBuilder.withPayload("我是一个延迟消息").build(); rocketMQTemplate.syncSend("bootMsTopic", msg, 3000, 3); // 顺序消息 发送者放 需要将一组消息 都发在同一个队列中去 消费者 需要单线程消费 List<MsgModel> msgModels = Arrays.asList( new MsgModel("qwer", 1, "下单"), new MsgModel("qwer", 1, "短信"), new MsgModel("qwer", 1, "物流"), new MsgModel("zxcv", 2, "下单"), new MsgModel("zxcv", 2, "短信"), new MsgModel("zxcv", 2, "物流") ); msgModels.forEach(msgModel -> { // 发送 一般都是以json的方式进行处理 rocketMQTemplate.syncSendOrderly("bootOrderlyTopic", JSON.toJSONString(msgModel), msgModel.getOrderSn()); }); }
3.2标签消息
@Test void tagKeyTest() throws Exception { // topic:tag rocketMQTemplate.syncSend("bootTagTopic:tagA", "我是一个带tag的消息"); // key是写带在消息头的 Message<String> message = MessageBuilder.withPayload("我是一个带key的消息") .setHeader(RocketMQHeaders.KEYS, "qwertasdafg") .build(); rocketMQTemplate.syncSend("bootKeyTopic", message); }
3.3常见消费监听
@Component @RocketMQMessageListener(topic = "bootTestTopic", consumerGroup = "boot-test-consumer-group") public class ABootSimpleMsgListener implements RocketMQListener<MessageExt> { /** * 这个方法就是消费者的方法 * 如果泛型制定了固定的类型 那么消息体就是我们的参数 * MessageExt 类型是消息的所有内容 * ------------------------ * 没有报错 就签收了 * 如果报错了 就是拒收 就会重试 * * @param message */ @Override public void onMessage(MessageExt message) { System.out.println("消息的id:" + message.getMsgId()); System.out.println(new String(message.getBody())); } }
@Component @RocketMQMessageListener(topic = "bootOrderlyTopic", consumerGroup = "boot-orderly-consumer-group", consumeMode = ConsumeMode.ORDERLY, // 顺序消费模式 单线程 maxReconsumeTimes = 5 // 消费重试的次数 ) public class BOrderlyMsgListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { System.out.println(121212); MsgModel msgModel = JSON.parseObject(new String(message.getBody()), MsgModel.class); System.out.println(msgModel); } }
3.4标签消息监听
@Component @RocketMQMessageListener(topic = "bootTagTopic", consumerGroup = "boot-tag-consumer-group", selectorType = SelectorType.TAG,// tag过滤模式 selectorExpression = "tagA || tagB" // selectorType = SelectorType.SQL92,// sql92过滤模式 // selectorExpression = "a in (3,5,7)" // broker.conf中开启enbalePropertyFilter=true ) public class CTagMsgListener implements RocketMQListener<MessageExt> { @Override public void onMessage(MessageExt message) { System.out.println(new String(message.getBody())); } }
3.5RocketMQ集成SpringBoot发送不同消息模式
Rocketmq消息消费的模式分为两种:
负载均衡模式和广播模式负载均衡模式表示多个消费者交替消费同一个主题里面的消息
广播模式表示每个每个消费者都消费一遍订阅的主题的消息
/ 测试消息消费模式 集群模块 广播模式 @Test void modeTest() throws Exception { for (int i = 1; i <= 5; i++) { rocketMQTemplate.syncSend("modeTopic", "我是第" + i + "个消息"); } }
集群模式
/** * [CLUSTERING] 集群模式下 队列会被消费者分摊, 队列数量>=消费者数量 消息的消费位点 mq服务器会记录处理 * BROADCASTING 广播模式下 消息会被每一个消费者都处理一次, mq服务器不会记录消费点位,也不会重试 */ @Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-a", messageModel = MessageModel.CLUSTERING, // 集群模式 负载均衡 consumeThreadNumber = 40 ) public class DC1 implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-a组的第一个消费者:" + message); } }
广播模式
@Component @RocketMQMessageListener(topic = "modeTopic", consumerGroup = "mode-consumer-group-b", messageModel = MessageModel.BROADCASTING // 广播模式 ) public class DC4 implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("我是mode-consumer-group-b组的第一个消费者:" + message); } }
3.6解决消息堆积问题
一般认为单条队列消息差值>=10w时 算堆积问题
1.生产太快了
生产方可以做业务限流
增加消费者数量,但是消费者数量<=队列数量,适当的设置最大的消费线程数量(根据IO(2n)/CPU(n+1))
动态扩容队列数量,从而增加消费者数量
2.消费者消费出现问题
排查消费者程序的问题
// 积压问题 @Test void jyTest() throws Exception { for (int i = 1001; i <= 1200; i++) { rocketMQTemplate.syncSend("jyTopic", "我是第" + i + "个消息"); } }
@Component @RocketMQMessageListener(topic = "jyTopic", consumerGroup = "jy-consumer-group", consumeThreadNumber = 40, consumeMode = ConsumeMode.CONCURRENTLY ) public class EJyListener1 implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("我是第一个消费者:" + message); } }
3.7确保消息不丢失
- 生产者使用同步发送模式 ,收到mq的返回确认以后 顺便往自己的数据库里面写msgId status(0) time
- 消费者消费以后 修改数据这条消息的状态 = 1
- 写一个定时任务 间隔两天去查询数据 如果有status = 0 and time < day-2
- 将mq的刷盘机制设置为同步刷盘
- 使用集群模式 ,搞主备模式,将消息持久化在不同的硬件上
- 可以开启mq的trace机制,消息跟踪机制
1.在broker.conf中开启消息追踪
traceTopicEnable=true
2.重启broker即可
3.生产者配置文件开启消息轨迹
enable-msg-trace: true
4.消费者开启消息轨迹功能,可以给单独的某一个消费者开启
enableMsgTrace = true
在rocketmq的面板中可以查看消息轨迹
默认会将消息轨迹的数据存在 RMQ_SYS_TRACE_TOPIC 主题里面
@Test void jyTest() throws Exception { for (int i = 1001; i <= 1200; i++) { System.out.println("我是第" + i + "个消息"); rocketMQTemplate.syncSend("jumpTopic", "我是第" + i + "个消息"); } }
@Component @RocketMQMessageListener(topic = "jumpTopic", consumerGroup = "jump-consumer-group", consumeThreadNumber = 40, consumeMode = ConsumeMode.CONCURRENTLY ) public class FJumpListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("我是消费者:" + message); } }
// 轨迹消息 @Test void traceTest() throws Exception { rocketMQTemplate.syncSend("traceTopic", "我是第个消息"); }
@Component @RocketMQMessageListener(topic = "traceTopic", consumerGroup = "trace-consumer-group", consumeThreadNumber = 40, consumeMode = ConsumeMode.CONCURRENTLY, enableMsgTrace = true // 开启消费者方的轨迹 ) public class GTraceListener implements RocketMQListener<String> { @Override public void onMessage(String message) { System.out.println("我是消费者:" + message); } }
3.8安全
- 开启acl的控制 在broker.conf中开启aclEnable=true
- 配置账号密码 修改plain_acl.yml
- 修改控制面板的配置文件 放开52/53行 把49行改为true 上传到服务器的jar包平级目录下即可