RabbitMQ的快速入门

avatar
作者
筋斗云
阅读量:0

目录

前言

1. 安装RabbitMQ

2.基本结构

3. RabbitMQ消息模型

​​​​​​4. 入门案例  

4.1 publisher实现

4.2 consumer实现

4.3 总结


前言

RabbitMQ是一套开源(MPL)的消息队列服务软件,是由 LShift 提供的一个 Advanced Message Queuing Protocol (AMQP) 的开源实现,由以高性能、健壮以及可伸缩性出名的Erlang写成。

1. 安装RabbitMQ

可以参考我的Docker部署RabbitMQ指南来操作。

2.基本结构

MQ的基本结构:

RabbitMQ中的一些角色:

  • publisher:生产者,事件提供者
  • consumer:消费者,事件订阅者
  • exchange:交换机,负责消息路由
  • queue:队列,存储消息
  • virtualHost:虚拟主机,隔离不同租户的exchange、queue、消息的隔离

3. RabbitMQ消息模型

RabbitMQ官网​​​​​​​提供了5个跟消息发送相关的Demo示例,对应不同的消息模型:

  • 基本消息队列(BasicQueue):

  • 工作消息队列(WorkQueue): 

  • 发布订阅(Publish、Subscribe),由根据交换机类型不同分为三种:

         1.Fanout Exchange:广播 

         

         2.Direct Exchange:路由

        

        3.Topic Exchange:主题

        ​ 

​​​​​​4. 入门案例  

简单队列模式的模型图:

  

官方的HelloWorld是基于最基础的消息队列模型来实现的,只包含三个角色:

  •  publisher:消息发布者,将消息发送到队列queue
  • queue:消息队列,负责接受并缓存消息
  • consumer:订阅队列,处理队列中的消息 

我这里有一份示例代码,有需要的话,可以去最后部分下载:

  

包括三部分:

  • mq-demo:父工程,管理项目依赖
  • publisher:消息的发送者
  • consumer:消息的消费者

4.1 publisher实现

思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 发送消息
  • 关闭连接和channel

代码实现:

package cn.itcast.mq.helloworld;  import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import org.junit.Test;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class PublisherTest {     @Test     public void testSendMessage() throws IOException, TimeoutException {         // 1.建立连接         ConnectionFactory factory = new ConnectionFactory();         // 1.1.设置连接参数,分别是:主机IP、端口号、vhost、用户名、密码         factory.setHost("你的RabbitMQ服务器IP");         factory.setPort(5672);         factory.setVirtualHost("/");         factory.setUsername("lyf");         factory.setPassword("123456");         // 1.2.建立连接         Connection connection = factory.newConnection();          // 2.创建通道Channel         Channel channel = connection.createChannel();          // 3.创建队列         String queueName = "simple.queue";         channel.queueDeclare(queueName, false, false, false, null);          // 4.发送消息         String message = "hello, rabbitmq!";         channel.basicPublish("", queueName, null, message.getBytes());         System.out.println("发送消息成功:【" + message + "】");          // 5.关闭通道和连接         channel.close();         connection.close();      } } 

接下来,我们来debug一步步讲解一下:

首先要想发送消息,就要跟MQ建立连接,而建立连接首先就是要创建连接工厂(ConnectionFactory):

  

连接工厂(ConnectionFactory)创建之后,需要设置一些信息。首先是MQ主机的IP地址,因为我的RabbitMQ是安装在我的虚拟机上的,所以这里是我的虚拟机的IP地址,你根据自己的实际情况来填写,如果安装虚拟机上,就填写虚拟机的IP,安装在主机上,就填写本地的IP:

  

这里是MQ的通信端口5672,MQ还有一个常用的管理平台的端口是15672,这两个端口不要搞错了:

  

接下来,走到的就是设置虚拟主机,每一个用户都有自己的虚拟主机,所以他是跟下面的用户名是对应的:

  

如果,你不知道自己用户的虚拟主机是多少,可以登录自己的RabbitMQ管理平台看看,我这里lyf这个用户名对应的虚拟主机就是 “/”:

  

下面的用户和密码设置,我就直接过了。只要跟虚拟主机是对应的就可以了。接下来, 就是将上面的设置信息用上之后,创建一个跟MQ的连接:

  

上面这一步走完之后,我们可以在RabbitMQ的管理平台看到新创建的连接:

  

创建了连接之后,就是创建一个消息的通道:

    

这一行执行过后,我们就可以在RabbitMQ的管理界面看到新创建的通道(channel)了:

  

在通道创建之后,就是创建一个队列,声明了队列的名称,然后声明一个队列: 

    

走完上面两步,我们就可以在MQ的管理平台上看见创建出来的队列:

  

有了队列之后是不是就可以开始发消息了,将string的消息转成字节(byte)方式发送了:

  

后面的就是打印,关闭通道关闭连接,我就直接把代码放行了,我们可以看到这里已经发送消息成功。

  

然后去MQ管理平台,可以看到刚刚发送成功的消息:

  

这时候消息发送者已经完成了自己的事情。不会管,谁来接收消息,这就是解耦合。

4.2 consumer实现

思路:

  • 建立连接
  • 创建Channel
  • 声明队列
  • 订阅消息

代码实现:

package cn.itcast.mq.helloworld;  import com.rabbitmq.client.*;  import java.io.IOException; import java.util.concurrent.TimeoutException;  public class ConsumerTest {      public static void main(String[] args) throws IOException, TimeoutException {         // 1.建立连接         ConnectionFactory factory = new ConnectionFactory();         // 1.1.设置连接参数,分别是:主机IP、端口号、vhost、用户名、密码         factory.setHost("你的RabbitMQ服务器IP");         factory.setPort(5672);         factory.setVirtualHost("/");         factory.setUsername("lyf");         factory.setPassword("123456");         // 1.2.建立连接         Connection connection = factory.newConnection();          // 2.创建通道Channel         Channel channel = connection.createChannel();          // 3.创建队列         String queueName = "simple.queue";         channel.queueDeclare(queueName, false, false, false, null);          // 4.订阅消息         channel.basicConsume(queueName, true, new DefaultConsumer(channel){             @Override             public void handleDelivery(String consumerTag, Envelope envelope,                                        AMQP.BasicProperties properties, byte[] body) throws IOException {                 // 5.处理消息                 String message = new String(body);                 System.out.println("接收到消息:【" + message + "】");             }         });         System.out.println("等待接收消息。。。。");     } } 

注意:consumer是不需要关闭通道,关闭连接的,因为在实际的开发过程中,消费者要一直关注publisher是否有发送消息,如果通道和连接关闭了,就像一次性的,处理一次就不再处理了。

consumer创建连接,设置参数跟publisher都是一样的,就不细讲了。consumer创建队列是因为在实际开发中,如果是消费者先执行的话,就会出现报错的情况,所以确保有这个队列:

    

我们debug走过了,来看一下MQ的管理平台:

  

发现只要发布者已经创建了队列的话,消费者并不会重新创建一个新的队列。

  

这里就是我们订阅消息的具体逻辑部分,这是一个匿名的内部类。这里面就像是JS里的回调函数一样,把处理的逻辑挂在队列上,当这个队列一旦有了消息,就会执行这一部分。放行之后,在控制台我们可以看到:

  

说明匿名内部类是一个异步处理的逻辑,在前面都执行完了之后,MQ的队列里面只要有消息了之后,才会执行里面的逻辑。更加说明了MQ的异步机制。

然后我们去MQ的管理平台去看一下:

  

这时候可以看到,消息已经被处理完了。说明只要我这消息被消费者消费,就会被删除。就是所谓的阅后即焚。

4.3 总结

基本消息队列的消息发送流程:

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队列
  4. 利用channel向队列发送消息

基本消息队列的消息接收流程: 

  1. 建立connection
  2. 创建channel
  3. 利用channel声明队
  4. 定义consumer的消费行为handleDelivery()
  5. 利用channel将消费者与队列绑定

5. 代码分享

链接: https://pan.baidu.com/s/1TOVZIJXXiSsRF3clun75TQ?pwd=1234

提取码: 1234 

    广告一刻

    为您即时展示最新活动产品广告消息,让您随时掌握产品活动新动态!