目录
1.初识MQ
1.同步通讯
同步通讯是指通信双方在进行数据交流时,必须按照一定的顺序同步进行,数据的发送方必须等待接收方对前一条数据的接收和处理完成后,才能发送下一条数据,确保数据的
顺序和一致性
。
在同步通讯中,通信双方会相互等待对方的响应,直到收到对方的确认信号才会进行下一步操作。
同步通讯常见的应用有电话通话、视频聊天和实时游戏
等。
1.同步调用存在的问题
①
耦合度高
:
每次加入新的需求,都要修改原来的代码。
②性能下降
:
调用者需要等待服务提供者响应,如果调用链过长则响应时间等于每次调用的时间之和。
③资源浪费
:
调用链中的每个服务在等待响应过程中,不能释放请求占用的资源,高并发场景下会极度浪费系统资源。
④级联失败
:
如果服务提供者出现问题,所有调用方都会跟着出问题,如同多米诺骨牌一样,迅速导致整个微服务群故障。
2.优点
时效性较强
,可以立即得到结果
2.异步通讯
异步通讯是指通信双方在进行数据交流时,不需要即时等待对方的响应就能够继续进行下一步的操作。
发送方在发送数据后,不会立即等待接收方的确认信号,而是可以立即进行其他操作。
接收方在接收数据后,可以根据需要决定何时对数据进行处理,无需强制按照发送方的时序进行处理。
异步通讯能够提高通信的效率和并发性
,常见的应用有电子邮件、消息队列
等。
异步调用常见实现就是
事件驱动模式
1.事件驱动优势
①
服务解耦
②性能提升,吞吐量提高
③服务没有强依赖,不担心级联失败问题
④流量削峰
2.异步通信的缺点
①依赖于
Broker
的可靠性、安全性、吞吐能力
②架构复杂,业务没有明显的流程线,不好追踪管理
3.MQ常见框架
MQ(MessageQueue)
,中文是消息队列,字面来看就是存放消息的队列。
也就是事件驱动架构中的Broker
。
2.RabbitMQ快速入门
官方网址:
https://www.rabbitmq.com/
1.单机部署
在Centos7虚拟机中使用Docker来安装:
1.在线拉取:
docker pull rabbitmq:3-management
2.本地导入:
docker load -i mq.tar
3.执行下面的命令来运行MQ容器:
docker run \ -e RABBITMQ_DEFAULT_USER=root \ -e RABBITMQ_DEFAULT_PASS=123456 \ --name mq \ --hostname mq1 \ -p 15672:15672 \ -p 5672:5672 \ -d \ rabbitmq:3-management
RabbitMQ中的几个概念:
①channel
:操作MQ的工具.
②exchange
:路由消息到队列中
③queue
:缓存消息
④virtual host
:虚拟主机,是对queue、exchange等资源的逻辑分组
2.常见消息模型
1.
基本
消息队列(BasicQueue )
2.工作
消息队列( WorkQueue)
发布订阅( Publish、Subscribe),又根据交换机类型不同分为三种:
3.Fanout Exchange:广播
4.Direct Exchange:路由
5.Topic Exchange:主题
消息发送流程:
①建立connection
②创建channel
③利用channel声明队列
④利用channel向队列发送消息
消息接收流程:
①建立connection
②创建channel
③利用channel声明队列
④定义consumer的消费行为handleDelivery()
⑤利用channel将消费者与队列绑定
3.SpringAMQP
官方地址:https://spring.io/projects/spring-amqp
AMQP
:Advanced Message Queuing Protocol,是用于在应用程序或之间传递业务消息的开放标准。
该协议与语言和平台无关
,更符合微服务中独立性的要求。Spring AMQP
是基于AMQP协议定义的一套API规范,提供了模板来发送和接收消息。
包含两部分,其中spring-amqp是基础抽象,spring-rabbit是底层的默认实现。
1.Basic Queue简单队列模型
案例:利用SpringAMQP实现HelloWorld中的基础消息队列功能:
1.在父工程中引入spring-amqp的依赖
2.在publisher服务中利用RabbitTemplate发送消息到simple.queue这个队列
查看消息队列情况:
3.在consumer服务中编写消费逻辑,监听simple.queue这个队列
2.Work Queue工作队列模型
Work queue,工作队列,
①可以提高消息处理速度,避免队列消息堆积
②多个消费者绑定到一个队列,同一条消息只会被一个消费者处理
③通过设置prefetch
来控制消费者预取的消息数量
案例:模拟WorkQueue,实现一个队列绑定多个消费者
可以看到MQ默认的
消息预取机制
是相对平均的,并没有考虑到不同消费者的消费能力的问题
。
所以我们可以做一个消费预取限制:
修改application.yml文件,设置
preFetch
这个值,可以控制预取消息的上限:
重启消费者consumer类,并运行生产者测试类:
3.发布、订阅模型-Fanout
1.发布订阅模式
发布订阅模式与之前案例的区别就是
允许将同一消息发送给多个消费者
。实现方式是加入了exchange
(交换机)。
1.交换机的作用:
- 接收publisher发送的消息
- 将消息按照规则路由到与之绑定的队列
- 不能缓存消息,路由失败,消息丢失
2.常见exchange类型包括:
Fanout
:广播Direct
:路由Topic
:话题
注意:
exchange负责消息路由
,而不是存储,路由失败则消息丢失
2.Fanout Exchange
Fanout Exchange会将接收到的消息路由到每一个跟其
绑定的queue
案例:利用SpringAMQP演示FanoutExchange的使用
1.在consumer服务中,利用代码声明队列、交换机,并将两者绑定
2.在consumer服务中,编写两个消费者方法,分别监听fanout.queue1和fanout.queue2
3.在publisher中编写测试方法,向itcast.fanout发送消息
4.发布、订阅模型-Direct
Direct Exchange 会将接收到的消息根据
规则路
由到指定的Queue,因此称为路由模式(routes)。
- 每一个Queue都与Exchange设置一个
BindingKey
- 发布者发送消息时,指定消息的
RoutingKey
- Exchange将消息路由到
BindingKey
与消息RoutingKey
一致的队列
案例:利用SpringAMQP演示DirectExchange的使用
1.利用@RabbitListener声明Exchange.Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听direct.queue1和direct.queue2
启动消费者类,可以在MQ服务器上看到新增的队列和路由:
3.在publisher中编写测试方法,向itcast. direct发送消息
查看控制台信息:
队列1确实也是绑定的blue
描述下Direct交换机与Fanout交换机的差异?
①Fanout交换机将消息路由给每一个与之绑定的队列
②Direct交换机根据RoutingKey判断路由给哪个队列
③如果多个队列具有相同的RoutingKey,则与Fanout功能类似
5.发布、订阅模型-Topic
TopicExchange与DirectExchange类似,区别在于routingKey
必须是多个单词的列表,并且以.分割
。
Queue与Exchange指定BindingKey时可以使用通配符:
#
:代指0个或多个单词*
:代指一个单词
案例:利用SpringAMQP演示TopicExchange的使用
1.并利用@RabbitListener声明Exchange、Queue、RoutingKey
2.在consumer服务中,编写两个消费者方法,分别监听topic.queue1和topic.queue2
3.在publisher中编写测试方法,向itcast. topic发送消息
6.消息转换器
案例 : 测试发送Object类型消息:
说明:在SpringAMQP的发送方法中,接收消息的类型是Object,也就是说我们可以发送任意对象类型的消息,SpringAMQP会帮我们序列化为字节后发送。
消息转换器:
Spring的对消息对象的处理是由
org.springframework.amqp.support.converter.MessageConverter
来处理的。
而默认实现是SimpleMessageConverter
,基于JDK的ObjectOutputStream
完成序列化。
如果要修改只需要定义一个MessageConverter
类型的Bean即可。
推荐用JSON方式序列化,步骤如下:
consumer模块接收消息的形式:
SpringAMQP中消息的序列化和反序列化是怎么实现的?
- 利用
MessageConverter
实现的,默认是JDK的序列化 - 注意发送方与接收方必须使用相同的
MessageConverter