阅读量:0
<!--匹配服务器的RocketMQ5.3.0--> <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.3.1</version> </dependency>
application.properties
spring.application.name=springboot-rocketmq server.port=8999 rocketmq.name-server=xxx.xxx.xxx:9876 rocketmq.producer.group=mq_producer_group_test
控制器
package com.example.springbootrocketmq.controller; import com.example.springbootrocketmq.pojo.User; import com.example.springbootrocketmq.producer.RocketMQProducerService; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.GetMapping; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; /** * @author hrui * @date 2024/8/2 11:40 */ @RestController @RequestMapping("/api/test") public class TestController { @Autowired private RocketMQProducerService producerService; @GetMapping("/send") public String sendMessage() { User user = new User("Hrui", 18, "China"); producerService.sendSimpleMessage("mq_test-topic", user); return "消息发送成功"; } }
生产者
package com.example.springbootrocketmq.producer; import com.example.springbootrocketmq.pojo.User; import jakarta.annotation.Resource; import org.apache.rocketmq.spring.core.RocketMQTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Service; /** * @author hrui * @date 2024/8/2 11:36 */ @Service public class RocketMQProducerService { @Autowired private RocketMQTemplate rocketMQTemplate; /** * 发送普通消息 * @param topic * @param message */ public void sendSimpleMessage(String topic, User message) { rocketMQTemplate.convertAndSend(topic, message); } }
消费者
package com.example.springbootrocketmq.consumer; import com.example.springbootrocketmq.pojo.User; import org.apache.rocketmq.spring.annotation.RocketMQMessageListener; import org.apache.rocketmq.spring.core.RocketMQListener; import org.springframework.stereotype.Service; /** * @author hrui * @date 2024/8/2 11:44 */ @Service @RocketMQMessageListener(topic = "mq_test-topic", consumerGroup = "mq_consumer_group_test") public class RocketMQConsumerService implements RocketMQListener<User> { @Override public void onMessage(User user) { System.out.println("消费者接收到消息: " + user); } }