SpringBoot整合阿里云RocketMQ对接,商业版

avatar
作者
筋斗云
阅读量:2

1.需要阿里云开通商业版RocketMQ

普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组

2.结构目录

在这里插入图片描述

3.引入依赖

<!--阿里云RocketMq整合-->         <dependency>             <groupId>com.aliyun.openservices</groupId>             <artifactId>ons-client</artifactId>             <version>1.8.8.5.Final</version>         </dependency> 

4.延迟消息配置

import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import com.aliyun.openservices.ons.api.bean.BatchConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  import java.util.HashMap; import java.util.Map; import java.util.Properties;  /**  * 延迟消息配置类  */ @Configuration public class BatchConsumerClient {      @Autowired     private MqConfig mqConfig;      @Autowired     private BatchDemoMessageListener messageListener;      @Bean(initMethod = "start", destroyMethod = "shutdown")     public BatchConsumerBean buildBatchConsumer() {         BatchConsumerBean batchConsumerBean = new BatchConsumerBean();         //配置文件         Properties properties = mqConfig.getMqPropertie();         properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getDelayGroupId());         //将消费者线程数固定为20个 20为默认值         properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");         batchConsumerBean.setProperties(properties);         //订阅关系         Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<Subscription, BatchMessageListener>();         Subscription subscription = new Subscription();         subscription.setTopic(mqConfig.getDelayTopic());         subscription.setExpression(mqConfig.getDelayTag());         subscriptionTable.put(subscription, messageListener);         //订阅多个topic如上面设置         batchConsumerBean.setSubscriptionTable(subscriptionTable);         return batchConsumerBean;     }  }  
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;  import java.text.SimpleDateFormat; import java.util.Date; import java.util.List;  /**  * 延迟消息消费者  */ @Slf4j @Component public class BatchDemoMessageListener implements BatchMessageListener {      @Override     public Action consume(final List<Message> messages, final ConsumeContext context) {         log.info("消费者收到消息大小:"+messages.size());         for (Message message : messages) {             byte[] body = message.getBody();             String s = new String(body);             Date date = new Date();             SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");             String formatTime = sdf.format(date);             System.out.println("接收到消息时间:"+formatTime);             log.info("接收到消息内容:"+s);         }         try {             //do something..             return Action.CommitMessage;         } catch (Exception e) {             //消费失败             return Action.ReconsumeLater;         }     } }  

5.MQ配置类

 import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration;  import java.util.Properties;  @Data @Configuration @ConfigurationProperties(prefix = "rocketmq") public class MqConfig {      private String accessKey;     private String secretKey;     private String nameSrvAddr;     private String topic;     private String groupId;     private String tag;     private String orderTopic;     private String orderGroupId;     private String orderTag;     private String delayTopic;     private String delayGroupId;     private String delayTag;      public Properties getMqPropertie() {         Properties properties = new Properties();         properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey);         properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey);         properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr);         return properties;     }  }  

6.YML配置

## 阿里云RocketMQ配置 rocketmq:   accessKey: laskdfjlaksdjflaksjdflaksdjflakdjf   secretKey: asdfasdlfkasjdlfkasjdlfkajsdlkfjkalksdfj   nameSrvAddr: rmq..rmq.acs.com:8080   topic: topic_lsdjf_test   groupId: Glskdfjalsdkfjalksdjflaksdfj_push   tag: "*"   orderTopic: XXX   orderGroupId: XXX   orderTag: "*"   delayTopic: topic_alskdjfalksdjflksdjfkla_delay   delayGroupId: GIlaskdjflkasdjflkajsdkf_delay   delayTag: "*" 

7.普通消息配置

import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  import java.util.HashMap; import java.util.Map; import java.util.Properties;   /**  * 普通消息配置类  */ @Configuration public class ConsumerClient {      @Autowired     private MqConfig mqConfig;      @Autowired     private DemoMessageListener messageListener;      @Bean(initMethod = "start", destroyMethod = "shutdown")     public ConsumerBean buildConsumer() {         ConsumerBean consumerBean = new ConsumerBean();         //配置文件         Properties properties = mqConfig.getMqPropertie();         properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId());         //将消费者线程数固定为20个 20为默认值         properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20");         consumerBean.setProperties(properties);         //订阅关系         Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>();         Subscription subscription = new Subscription();         subscription.setTopic(mqConfig.getTopic());         subscription.setExpression(mqConfig.getTag());         subscriptionTable.put(subscription, messageListener);         //订阅多个topic如上面设置          consumerBean.setSubscriptionTable(subscriptionTable);         return consumerBean;     }  }  
 import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;  /**  * 普通主题消费者  */ @Component @Slf4j public class DemoMessageListener implements MessageListener {      @Override     public Action consume(Message message, ConsumeContext context) {          log.info("接收到消息: " + message);         try {             byte[] body = message.getBody();             String s = new String(body);             log.info("接收到消息字符串:"+s);             //Action.CommitMessag 进行消息的确认             return Action.CommitMessage;         } catch (Exception e) {             //消费失败             return Action.ReconsumeLater;         }     } }  
import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  /**  * 普通消息生产者配置类  */ @Configuration public class ProducerClient {      @Autowired     private MqConfig mqConfig;      @Bean(initMethod = "start", destroyMethod = "shutdown")     public ProducerBean buildProducer() {         ProducerBean producer = new ProducerBean();         producer.setProperties(mqConfig.getMqPropertie());         return producer;     }  }  
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.atkj.devicewx.config.MqConfig; import org.springframework.stereotype.Component;  /**  * 普通消息生产者  *  **/ @Component public class RocketMessageProducer {       private static ProducerBean producer;     private static MqConfig mqConfig;       public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) {         this.producer = producer;         this.mqConfig = mqConfig;     }       /**      * @Description: <h2>生产 普通 消息</h2>      * @author: LiRen      */     public  static void producerMsg(String tag, String key, String body) {         Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes());         long time = System.currentTimeMillis();         try {             SendResult sendResult = producer.send(msg);             assert sendResult != null;             System.out.println(time                     + " Send mq message success.Topic is:" + msg.getTopic()                     + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey()                     + " msgId is:" + sendResult.getMessageId());         } catch (ONSClientException e) {             e.printStackTrace();             System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic());         }     }   } 
import com.aliyun.openservices.ons.api.*; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean;  import java.util.Properties;  /**  * 普通消息消费者  */ //效果和 DemoMessageListener 一致 //@Component public class RocketMQConsumer {       @Autowired     private MqConfig rocketMQConfig;         /**      * 1、普通订阅      *      * @param      */     @Bean //不加@Bean Spring启动时没有注册该方法,就无法被调用     public void normalSubscribe( ) {           Properties properties = rocketMQConfig.getMqPropertie();           properties.put(PropertyKeyConst.GROUP_ID,rocketMQConfig.getGroupId());           Consumer consumer = ONSFactory.createConsumer(properties);         consumer.subscribe(rocketMQConfig.getTopic(), rocketMQConfig.getTag(), new MessageListener() {             @Override             public Action consume(Message message, ConsumeContext context) {                 System.out.println("Receive: " + new String(message.getBody()));                   //把消息转化为java对象                 //JSONObject jsonObject=JSONObject.parseObject(jsonString);                 //Book book= jsonObject.toJavaObject(Book.class);                  return Action.CommitMessage;             }         });           consumer.start();     } } 

7.order没用到

 import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.OrderConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.aliyun.openservices.ons.api.order.MessageOrderListener; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean;  import java.util.HashMap; import java.util.Map; import java.util.Properties;  //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了 public class OrderConsumerClient {      @Autowired     private MqConfig mqConfig;      @Autowired     private OrderDemoMessageListener messageListener;      @Bean(initMethod = "start", destroyMethod = "shutdown")     public OrderConsumerBean buildOrderConsumer() {         OrderConsumerBean orderConsumerBean = new OrderConsumerBean();         //配置文件         Properties properties = mqConfig.getMqPropertie();         properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getOrderGroupId());         orderConsumerBean.setProperties(properties);         //订阅关系         Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>();         Subscription subscription = new Subscription();         subscription.setTopic(mqConfig.getOrderTopic());         subscription.setExpression(mqConfig.getOrderTag());         subscriptionTable.put(subscription, messageListener);         //订阅多个topic如上面设置          orderConsumerBean.setSubscriptionTable(subscriptionTable);         return orderConsumerBean;     }  }  
 import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.order.ConsumeOrderContext; import com.aliyun.openservices.ons.api.order.MessageOrderListener; import com.aliyun.openservices.ons.api.order.OrderAction; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;  @Slf4j @Component public class OrderDemoMessageListener implements MessageOrderListener {     @Override     public OrderAction consume(final Message message, final ConsumeOrderContext context) {         log.info("接收到消息: " + message);         try {             //do something..             return OrderAction.Success;         } catch (Exception e) {             //消费失败,挂起当前队列             return OrderAction.Suspend;         }     } }  
import com.aliyun.openservices.ons.api.bean.OrderProducerBean; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  /**  * 生产者配置类  */ @Configuration public class OrderProducerClient {      @Autowired     private MqConfig mqConfig;      @Bean(initMethod = "start", destroyMethod = "shutdown")     public OrderProducerBean buildOrderProducer() {         OrderProducerBean orderProducerBean = new OrderProducerBean();         orderProducerBean.setProperties(mqConfig.getMqPropertie());         return orderProducerBean;     }  }  

8.事务消息没用到

import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker; import com.aliyun.openservices.ons.api.transaction.TransactionStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component;  /**  * 事务消息  */ @Slf4j @Component public class DemoLocalTransactionChecker implements LocalTransactionChecker {     @Override     public TransactionStatus check(Message msg) {         log.info("开始回查本地事务状态");         return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus     } }  
import com.aliyun.openservices.ons.api.bean.TransactionProducerBean; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;  /**  * 事务消息配置类  */ @Configuration public class TransactionProducerClient {      @Autowired     private MqConfig mqConfig;      @Autowired     private DemoLocalTransactionChecker localTransactionChecker;      @Bean(initMethod = "start", destroyMethod = "shutdown")     public TransactionProducerBean buildTransactionProducer() {         TransactionProducerBean producer = new TransactionProducerBean();         producer.setProperties(mqConfig.getMqPropertie());         producer.setLocalTransactionChecker(localTransactionChecker);         return producer;     }  }  

9.测试类

 import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; import com.atkj.devicewx.config.MqConfig; import com.atkj.devicewx.normal.RocketMessageProducer; import com.atkj.devicewx.service.TestService; import com.atkj.devicewx.vo.MetabolicVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController;  import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties;  /**  * @Author: albc  * @Date: 2024/07/12/10:22  * @Description: good good study,day day up  */ @RequestMapping("/api/v1/mq/test") @RestController public class TestController {       @Autowired     private TestService testService;       @Autowired     private MqConfig mqConfig;      @RequestMapping("/one")     public String testOne(){         Integer count = testService.testOne();         return "发送成功:"+count;     }      /**      * 普通消息测试      * @return      */     @RequestMapping("/useRocketMQ")     public String useRocketMQ() {           MetabolicVo metabolicVo = new MetabolicVo();         metabolicVo.setAge(123);         metabolicVo.setName("测试名字");         metabolicVo.setWeight(75);         RocketMessageProducer.producerMsg("123","666", JSON.toJSONString(metabolicVo));         return "请求成功!";     }      /**      * 发送延迟消息测试      * @return      */     @RequestMapping("/delayMqMsg")     public String delayMqMsg() {         Properties producerProperties = new Properties();         producerProperties.setProperty(PropertyKeyConst.AccessKey, mqConfig.getAccessKey());         producerProperties.setProperty(PropertyKeyConst.SecretKey, mqConfig.getSecretKey());         producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, mqConfig.getNameSrvAddr());         //注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败         Producer producer = ONSFactory.createProducer(producerProperties);         producer.start();         System.out.println("生产者启动..........");          Date date = new Date();         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");         String formatTime = sdf.format(date);          String meg = formatTime + "发送延迟消息测试";         Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), meg.getBytes());         // 延时时间单位为毫秒(ms),指定一个时刻,在这个时刻之后才能被消费,这个例子表示 3秒 后才能被消费         long delayTime = 3000;         message.setStartDeliverTime(System.currentTimeMillis() + delayTime);         try {             SendResult sendResult = producer.send(message);             assert sendResult != null;             System.out.println(new Date() + "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId());         } catch (ONSClientException e) {             // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理             System.out.println(new Date() + "重试发送mq消息主题:" + mqConfig.getDelayTopic());             e.printStackTrace();         }         return "请求成功!";      }  }  

优化部分

每次发送消息都要创建生产者,效率低下
使用单例优化

import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component;  import java.util.Properties;  /**  * 生产者单例  * @Author: albc  * @Date: 2024/07/15/15:49  * @Description: good good study,day day up  */ @Component @Slf4j public class ProducerSingleton {      private volatile static Producer producer;      private static String accessKey;     private static String secretKey;     private static String nameSrvAddr;      private ProducerSingleton() {      }      @Value("${rocketmq.accessKey}")     private void setAccessKey(String accessKey) {         ProducerSingleton.accessKey = accessKey;     }      @Value("${rocketmq.secretKey}")     private void setSecretKey(String secretKey) {         ProducerSingleton.secretKey = secretKey;     }      @Value("${rocketmq.nameSrvAddr}")     private void setNameSrvAddr(String nameSrvAddr) {         ProducerSingleton.nameSrvAddr = nameSrvAddr;     }      /**      * 创建生产者      * @return      */     public static Producer getProducer(){         if (producer == null){             synchronized(ProducerSingleton.class){                 if (producer == null){                     Properties producerProperties = new Properties();                     producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey);                     producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey);                     producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr);                     //注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败                     producer = ONSFactory.createProducer(producerProperties);                     producer.start();                     log.info("生产者启动........");                 }             }         }         return producer;     }  }  
 import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.atkj.devicewx.level.config.MqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;   /**  * 延迟消息生产者  *  * @Author: albc  * @Date: 2024/07/15/14:11  * @Description: good good study,day day up  */ @Slf4j @Component public class BatchMessageProducer {       @Autowired     private MqConfig mqConfig;       /**      * 发送消息      * @param msg 发送消息内容      * @param delayTime 延迟时间,毫秒      */     public void sendDelayMeg(String msg,Long delayTime) {         Producer producer = ProducerSingleton.getProducer();         Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), msg.getBytes());         message.setStartDeliverTime(System.currentTimeMillis() + delayTime);         try {             SendResult sendResult = producer.send(message);             assert sendResult != null;             log.info( "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId());         } catch (ONSClientException e) {             // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理             log.error("重试发送mq消息主题:" + mqConfig.getDelayTopic());             e.printStackTrace();         }finally {             message = null;         }     }   }   

其他不变

广告一刻

为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!