阅读量:2
1.需要阿里云开通商业版RocketMQ
普通消息新建普通主题,普通组,延迟消息新建延迟消息主题,延迟消息组
2.结构目录
3.引入依赖
<!--阿里云RocketMq整合--> <dependency> <groupId>com.aliyun.openservices</groupId> <artifactId>ons-client</artifactId> <version>1.8.8.5.Final</version> </dependency>
4.延迟消息配置
import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import com.aliyun.openservices.ons.api.bean.BatchConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * 延迟消息配置类 */ @Configuration public class BatchConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private BatchDemoMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public BatchConsumerBean buildBatchConsumer() { BatchConsumerBean batchConsumerBean = new BatchConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getDelayGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); batchConsumerBean.setProperties(properties); //订阅关系 Map<Subscription, BatchMessageListener> subscriptionTable = new HashMap<Subscription, BatchMessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getDelayTopic()); subscription.setExpression(mqConfig.getDelayTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 batchConsumerBean.setSubscriptionTable(subscriptionTable); return batchConsumerBean; } }
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.batch.BatchMessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import java.text.SimpleDateFormat; import java.util.Date; import java.util.List; /** * 延迟消息消费者 */ @Slf4j @Component public class BatchDemoMessageListener implements BatchMessageListener { @Override public Action consume(final List<Message> messages, final ConsumeContext context) { log.info("消费者收到消息大小:"+messages.size()); for (Message message : messages) { byte[] body = message.getBody(); String s = new String(body); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String formatTime = sdf.format(date); System.out.println("接收到消息时间:"+formatTime); log.info("接收到消息内容:"+s); } try { //do something.. return Action.CommitMessage; } catch (Exception e) { //消费失败 return Action.ReconsumeLater; } } }
5.MQ配置类
import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.Data; import org.springframework.boot.context.properties.ConfigurationProperties; import org.springframework.context.annotation.Configuration; import java.util.Properties; @Data @Configuration @ConfigurationProperties(prefix = "rocketmq") public class MqConfig { private String accessKey; private String secretKey; private String nameSrvAddr; private String topic; private String groupId; private String tag; private String orderTopic; private String orderGroupId; private String orderTag; private String delayTopic; private String delayGroupId; private String delayTag; public Properties getMqPropertie() { Properties properties = new Properties(); properties.setProperty(PropertyKeyConst.AccessKey, this.accessKey); properties.setProperty(PropertyKeyConst.SecretKey, this.secretKey); properties.setProperty(PropertyKeyConst.NAMESRV_ADDR, this.nameSrvAddr); return properties; } }
6.YML配置
## 阿里云RocketMQ配置 rocketmq: accessKey: laskdfjlaksdjflaksjdflaksdjflakdjf secretKey: asdfasdlfkasjdlfkasjdlfkajsdlkfjkalksdfj nameSrvAddr: rmq..rmq.acs.com:8080 topic: topic_lsdjf_test groupId: Glskdfjalsdkfjalksdjflaksdfj_push tag: "*" orderTopic: XXX orderGroupId: XXX orderTag: "*" delayTopic: topic_alskdjfalksdjflksdjfkla_delay delayGroupId: GIlaskdjflkasdjflkajsdkf_delay delayTag: "*"
7.普通消息配置
import com.aliyun.openservices.ons.api.MessageListener; import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.ConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import java.util.HashMap; import java.util.Map; import java.util.Properties; /** * 普通消息配置类 */ @Configuration public class ConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private DemoMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public ConsumerBean buildConsumer() { ConsumerBean consumerBean = new ConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getGroupId()); //将消费者线程数固定为20个 20为默认值 properties.setProperty(PropertyKeyConst.ConsumeThreadNums, "20"); consumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageListener> subscriptionTable = new HashMap<Subscription, MessageListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getTopic()); subscription.setExpression(mqConfig.getTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 consumerBean.setSubscriptionTable(subscriptionTable); return consumerBean; } }
import com.aliyun.openservices.ons.api.Action; import com.aliyun.openservices.ons.api.ConsumeContext; import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.MessageListener; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * 普通主题消费者 */ @Component @Slf4j public class DemoMessageListener implements MessageListener { @Override public Action consume(Message message, ConsumeContext context) { log.info("接收到消息: " + message); try { byte[] body = message.getBody(); String s = new String(body); log.info("接收到消息字符串:"+s); //Action.CommitMessag 进行消息的确认 return Action.CommitMessage; } catch (Exception e) { //消费失败 return Action.ReconsumeLater; } } }
import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 普通消息生产者配置类 */ @Configuration public class ProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public ProducerBean buildProducer() { ProducerBean producer = new ProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); return producer; } }
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.SendResult; import com.aliyun.openservices.ons.api.bean.ProducerBean; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.atkj.devicewx.config.MqConfig; import org.springframework.stereotype.Component; /** * 普通消息生产者 * **/ @Component public class RocketMessageProducer { private static ProducerBean producer; private static MqConfig mqConfig; public RocketMessageProducer(ProducerBean producer, MqConfig mqConfig) { this.producer = producer; this.mqConfig = mqConfig; } /** * @Description: <h2>生产 普通 消息</h2> * @author: LiRen */ public static void producerMsg(String tag, String key, String body) { Message msg = new Message(mqConfig.getTopic(), tag, key, body.getBytes()); long time = System.currentTimeMillis(); try { SendResult sendResult = producer.send(msg); assert sendResult != null; System.out.println(time + " Send mq message success.Topic is:" + msg.getTopic() + " Tag is:" + msg.getTag() + " Key is:" + msg.getKey() + " msgId is:" + sendResult.getMessageId()); } catch (ONSClientException e) { e.printStackTrace(); System.out.println(time + " Send mq message failed. Topic is:" + msg.getTopic()); } } }
import com.aliyun.openservices.ons.api.*; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import java.util.Properties; /** * 普通消息消费者 */ //效果和 DemoMessageListener 一致 //@Component public class RocketMQConsumer { @Autowired private MqConfig rocketMQConfig; /** * 1、普通订阅 * * @param */ @Bean //不加@Bean Spring启动时没有注册该方法,就无法被调用 public void normalSubscribe( ) { Properties properties = rocketMQConfig.getMqPropertie(); properties.put(PropertyKeyConst.GROUP_ID,rocketMQConfig.getGroupId()); Consumer consumer = ONSFactory.createConsumer(properties); consumer.subscribe(rocketMQConfig.getTopic(), rocketMQConfig.getTag(), new MessageListener() { @Override public Action consume(Message message, ConsumeContext context) { System.out.println("Receive: " + new String(message.getBody())); //把消息转化为java对象 //JSONObject jsonObject=JSONObject.parseObject(jsonString); //Book book= jsonObject.toJavaObject(Book.class); return Action.CommitMessage; } }); consumer.start(); } }
7.order没用到
import com.aliyun.openservices.ons.api.PropertyKeyConst; import com.aliyun.openservices.ons.api.bean.OrderConsumerBean; import com.aliyun.openservices.ons.api.bean.Subscription; import com.aliyun.openservices.ons.api.order.MessageOrderListener; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import java.util.HashMap; import java.util.Map; import java.util.Properties; //项目中加上 @Configuration 注解,这样服务启动时consumer也启动了 public class OrderConsumerClient { @Autowired private MqConfig mqConfig; @Autowired private OrderDemoMessageListener messageListener; @Bean(initMethod = "start", destroyMethod = "shutdown") public OrderConsumerBean buildOrderConsumer() { OrderConsumerBean orderConsumerBean = new OrderConsumerBean(); //配置文件 Properties properties = mqConfig.getMqPropertie(); properties.setProperty(PropertyKeyConst.GROUP_ID, mqConfig.getOrderGroupId()); orderConsumerBean.setProperties(properties); //订阅关系 Map<Subscription, MessageOrderListener> subscriptionTable = new HashMap<Subscription, MessageOrderListener>(); Subscription subscription = new Subscription(); subscription.setTopic(mqConfig.getOrderTopic()); subscription.setExpression(mqConfig.getOrderTag()); subscriptionTable.put(subscription, messageListener); //订阅多个topic如上面设置 orderConsumerBean.setSubscriptionTable(subscriptionTable); return orderConsumerBean; } }
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.order.ConsumeOrderContext; import com.aliyun.openservices.ons.api.order.MessageOrderListener; import com.aliyun.openservices.ons.api.order.OrderAction; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; @Slf4j @Component public class OrderDemoMessageListener implements MessageOrderListener { @Override public OrderAction consume(final Message message, final ConsumeOrderContext context) { log.info("接收到消息: " + message); try { //do something.. return OrderAction.Success; } catch (Exception e) { //消费失败,挂起当前队列 return OrderAction.Suspend; } } }
import com.aliyun.openservices.ons.api.bean.OrderProducerBean; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 生产者配置类 */ @Configuration public class OrderProducerClient { @Autowired private MqConfig mqConfig; @Bean(initMethod = "start", destroyMethod = "shutdown") public OrderProducerBean buildOrderProducer() { OrderProducerBean orderProducerBean = new OrderProducerBean(); orderProducerBean.setProperties(mqConfig.getMqPropertie()); return orderProducerBean; } }
8.事务消息没用到
import com.aliyun.openservices.ons.api.Message; import com.aliyun.openservices.ons.api.transaction.LocalTransactionChecker; import com.aliyun.openservices.ons.api.transaction.TransactionStatus; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; /** * 事务消息 */ @Slf4j @Component public class DemoLocalTransactionChecker implements LocalTransactionChecker { @Override public TransactionStatus check(Message msg) { log.info("开始回查本地事务状态"); return TransactionStatus.CommitTransaction; //根据本地事务状态检查结果返回不同的TransactionStatus } }
import com.aliyun.openservices.ons.api.bean.TransactionProducerBean; import com.atkj.devicewx.config.MqConfig; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; /** * 事务消息配置类 */ @Configuration public class TransactionProducerClient { @Autowired private MqConfig mqConfig; @Autowired private DemoLocalTransactionChecker localTransactionChecker; @Bean(initMethod = "start", destroyMethod = "shutdown") public TransactionProducerBean buildTransactionProducer() { TransactionProducerBean producer = new TransactionProducerBean(); producer.setProperties(mqConfig.getMqPropertie()); producer.setLocalTransactionChecker(localTransactionChecker); return producer; } }
9.测试类
import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; import com.atkj.devicewx.config.MqConfig; import com.atkj.devicewx.normal.RocketMessageProducer; import com.atkj.devicewx.service.TestService; import com.atkj.devicewx.vo.MetabolicVo; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; import java.text.SimpleDateFormat; import java.util.Date; import java.util.Properties; /** * @Author: albc * @Date: 2024/07/12/10:22 * @Description: good good study,day day up */ @RequestMapping("/api/v1/mq/test") @RestController public class TestController { @Autowired private TestService testService; @Autowired private MqConfig mqConfig; @RequestMapping("/one") public String testOne(){ Integer count = testService.testOne(); return "发送成功:"+count; } /** * 普通消息测试 * @return */ @RequestMapping("/useRocketMQ") public String useRocketMQ() { MetabolicVo metabolicVo = new MetabolicVo(); metabolicVo.setAge(123); metabolicVo.setName("测试名字"); metabolicVo.setWeight(75); RocketMessageProducer.producerMsg("123","666", JSON.toJSONString(metabolicVo)); return "请求成功!"; } /** * 发送延迟消息测试 * @return */ @RequestMapping("/delayMqMsg") public String delayMqMsg() { Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.AccessKey, mqConfig.getAccessKey()); producerProperties.setProperty(PropertyKeyConst.SecretKey, mqConfig.getSecretKey()); producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, mqConfig.getNameSrvAddr()); //注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败 Producer producer = ONSFactory.createProducer(producerProperties); producer.start(); System.out.println("生产者启动.........."); Date date = new Date(); SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); String formatTime = sdf.format(date); String meg = formatTime + "发送延迟消息测试"; Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), meg.getBytes()); // 延时时间单位为毫秒(ms),指定一个时刻,在这个时刻之后才能被消费,这个例子表示 3秒 后才能被消费 long delayTime = 3000; message.setStartDeliverTime(System.currentTimeMillis() + delayTime); try { SendResult sendResult = producer.send(message); assert sendResult != null; System.out.println(new Date() + "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId()); } catch (ONSClientException e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理 System.out.println(new Date() + "重试发送mq消息主题:" + mqConfig.getDelayTopic()); e.printStackTrace(); } return "请求成功!"; } }
优化部分
每次发送消息都要创建生产者,效率低下
使用单例优化
import com.aliyun.openservices.ons.api.ONSFactory; import com.aliyun.openservices.ons.api.Producer; import com.aliyun.openservices.ons.api.PropertyKeyConst; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Component; import java.util.Properties; /** * 生产者单例 * @Author: albc * @Date: 2024/07/15/15:49 * @Description: good good study,day day up */ @Component @Slf4j public class ProducerSingleton { private volatile static Producer producer; private static String accessKey; private static String secretKey; private static String nameSrvAddr; private ProducerSingleton() { } @Value("${rocketmq.accessKey}") private void setAccessKey(String accessKey) { ProducerSingleton.accessKey = accessKey; } @Value("${rocketmq.secretKey}") private void setSecretKey(String secretKey) { ProducerSingleton.secretKey = secretKey; } @Value("${rocketmq.nameSrvAddr}") private void setNameSrvAddr(String nameSrvAddr) { ProducerSingleton.nameSrvAddr = nameSrvAddr; } /** * 创建生产者 * @return */ public static Producer getProducer(){ if (producer == null){ synchronized(ProducerSingleton.class){ if (producer == null){ Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.AccessKey, accessKey); producerProperties.setProperty(PropertyKeyConst.SecretKey, secretKey); producerProperties.setProperty(PropertyKeyConst.NAMESRV_ADDR, nameSrvAddr); //注意!!!如果访问阿里云RocketMQ 5.0系列实例,不要设置PropertyKeyConst.INSTANCE_ID,否则会导致收发失败 producer = ONSFactory.createProducer(producerProperties); producer.start(); log.info("生产者启动........"); } } } return producer; } }
import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.ons.api.exception.ONSClientException; import com.atkj.devicewx.level.config.MqConfig; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; /** * 延迟消息生产者 * * @Author: albc * @Date: 2024/07/15/14:11 * @Description: good good study,day day up */ @Slf4j @Component public class BatchMessageProducer { @Autowired private MqConfig mqConfig; /** * 发送消息 * @param msg 发送消息内容 * @param delayTime 延迟时间,毫秒 */ public void sendDelayMeg(String msg,Long delayTime) { Producer producer = ProducerSingleton.getProducer(); Message message = new Message(mqConfig.getDelayTopic(), mqConfig.getDelayTag(), msg.getBytes()); message.setStartDeliverTime(System.currentTimeMillis() + delayTime); try { SendResult sendResult = producer.send(message); assert sendResult != null; log.info( "发送mq消息主题:" + mqConfig.getDelayTopic() + "消息id: " + sendResult.getMessageId()); } catch (ONSClientException e) { // 消息发送失败,需要进行重试处理,可重新发送这条消息或持久化这条数据进行补偿处理 log.error("重试发送mq消息主题:" + mqConfig.getDelayTopic()); e.printStackTrace(); }finally { message = null; } } }
其他不变