接上一篇《RabbitMQ-安装篇(阿里云主机)-CSDN博客》
安装好RabbitMQ后,我们将开始RabbitMQ的使用,根据官网文档RabbitMQ Tutorials | RabbitMQ,我们一步一步的学习。
1. "Hello World!"
这里先说明几个概念:
生产者:指消息的发送方,用图例表示。
消费者:指消息的接收放,用图例表示。
队列(queue):生产者发送的消息将被传递到队列里,消费这从队列中消费消息
下面以 生产者 发送消息到队列,消费者从队列里消费消息为例,演示如何调用(Java代码)。
RabbitMQ支持多种协议。本教程使用AMQP0-9-1,它是一个开放的、通用的消息传递协议。
1.1 下载依赖项
下载客户端库客户端连接库及其依赖项(SLF4J API和SLF4J Simple)。将这些文件复制到您的工作目录中,连同教程Java文件。
1.2 生产者代码-Send.java
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import java.nio.charset.StandardCharsets; //生产者 public class Send { public static void main(String[] argv) throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("【服务器地址】"); factory.setPort(【端口:默认5672】); factory.setUsername("【账号】"); factory.setPassword("【密码】"); factory.setVirtualHost("【虚拟主机】"); // 创建信道,发送消息 String queueName = "rc.queue"; try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(queueName, true, false, false, null); String message = "Hello World!"; channel.basicPublish("", queueName, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
请将代码中相关配置项设置为您自己的配置。
运行代码,即可向RabbitMQ中的rc.queue队列发送一条Hello World消息。(rc.queue为我自己创建的,请根据实际情况调整)
在RabbitMQ后台可以查看到该队列里的消息。
1.3 消费者代码-Recv.java
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback; import java.nio.charset.StandardCharsets; //消费者 public class Recv { public static void main(String[] argv) throws Exception { // 创建连接 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("【服务器地址】"); factory.setPort(【端口:默认5672】); factory.setUsername("【账号】"); factory.setPassword("【密码】"); factory.setVirtualHost("【虚拟主机】"); // 创建连接通道 Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); String queueName = "rc.queue"; channel.queueDeclare(queueName, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }
运行结果
2.其他模式
调通了1种模式,其他模式类似的方式调试即可。RabbitMQ Tutorials | RabbitMQ
3.在Springboot中使用RabbitMQ
3.1 添加依赖
在pom.xml
中添加Spring Boot的RabbitMQ依赖。
<!--RabbitMQ--> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
3.2 配置RabbitMQ
在application.properties
或application.yml
中配置RabbitMQ连接信息。
# application.properties spring.rabbitmq.host=【服务器地址】 spring.rabbitmq.port=【端口:默认5672】 spring.rabbitmq.username=【用户名】 spring.rabbitmq.password=【密码】
spring: rabbitmq: host: 【服务器地址】 port: 【端口:默认5672】 username: 【用户名】 password: 【密码】
配置Queue、Exchange和Binding:
通过Java配置类定义消息队列、交换器和它们之间的绑定关系。
(如果在RabbitMQ控制台设置好了Queue、Exchange和Binding,无需下面的配置)
@Configuration public class RabbitMQConfig { @Bean Queue myQueue() { return new Queue("your-queue-name", true); } @Bean DirectExchange myExchange() { return new DirectExchange("your-exchange"); } @Bean Binding binding(Queue myQueue, DirectExchange myExchange) { return BindingBuilder.bind(myQueue).to(myExchange).with("your-routingKey"); } }
3.3 创建消息生产者
定义一个简单的消息生产者类,使用@RabbitTemplate
注解来发送消息到队列:
import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; @Component public class MessageProducer { private final RabbitTemplate rabbitTemplate; @Autowired public MessageProducer(RabbitTemplate rabbitTemplate) { this.rabbitTemplate = rabbitTemplate; } public void sendMessage(String message) { rabbitTemplate.convertAndSend("your-queue-name", message); } }
3.4.创建消息消费者
定义一个消息消费者类,使用@RabbitListener
注解来监听特定的队列:
import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @Component public class MessageConsumer { @RabbitListener(queues = "your-queue-name") public void receiveMessage(String message) { System.out.println("Received message: " + message); } }
3.5.启动和测试
确保你的应用启动类上有@EnableRabbit
注解启用RabbitMQ。确保你的RabbitMQ服务正在运行,并尝试从你的应用中发送和接收消息。你可以通过调用MessageProducer
中的sendMessage
方法来测试发送功能,而接收功能应该自动触发MessageConsumer
中的receiveMessage
方法。