RabbitMQ
一.消息中间件简介
1.1 消息中间件
消息中间件也可以称消息队列,是指用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息队列模型,可以在分布式环境下扩展进程的通信。
消息中间件就是在通信的上下游之间截断:break it,Broker
然后利用中间件解耦、异步的特性,构建弹性、可靠、稳定的系统。
异步处理、流量削峰、限流、缓冲、排队、最终一致性、消息驱动等需求的场景都可以使用消息中间件。
消息队列的作用:
- 流量削峰:削去秒杀场景下的峰值写流量
- 异步处理:通过异步处理简化秒杀请求中的业务流程
- 解耦:实现秒杀系统模块之间松耦合
1.2 不同MQ介绍
当前业界比较流行的开源消息中间件包括:ActiveMQ、RabbitMQ、RocketMQ、Kafka、ZeroMQ等,其中应用最为广泛的要数RabbitMQ、RocketMQ、Kafka这三款。
作为消息队列,要具备以下几个特性:
- 消息传输的可靠性:保证消息不会丢失。
- 支持集群,包括横向扩展,单点故障都可以解决。
- 性能要好,要能够满足业务的性能需求。
不同mq优缺点:
RabbitMQ
- 优点
- 轻量级,快速,部署使用方便
- 支持灵活的路由配置。RabbitMQ中,在生产者和队列之间有一个交换器模块。根据配置的路由规则,生产者发送的消息可以发送到不同的队列中。路由规则很灵活,还可以自己实现。
- RabbitMQ的客户端支持大多数的编程语言。
- 缺点
- 如果有大量消息堆积在队列中,性能会急剧下降(因为会有大量的IO操作)
- RabbitMQ的性能在Kafka和RocketMQ中是最差的,每秒处理几万到几十万的消息。如果应用要求高的性能,不要选择RabbitMQ。
- RabbitMQ是Erlang开发的,功能扩展和二次开发代价很高
- 优点
RocketMQ
RocketMQ是一个开源的消息队列,使用java实现。借鉴了Kafka的设计并做了很多改进。
RocketMQ主要用于有序,事务,流计算,消息推送,日志流处理,binlog分发等场景。经过了历次的双11考验,性能,稳定性和可靠性没的说。
优点
java开发,阅读源代码、扩展、二次开发很方便。
对电商领域的响应延迟做了很多优化。在大多数情况下,响应在毫秒级。如果应用很关注响应时间,可以使用RocketMQ。
性能比RabbitMQ高一个数量级,每秒处理几十万的消息。
缺点
- 跟周边系统的整合和兼容不是很好。
Kafka
Kafka的可靠性,稳定性和功能特性基本满足大多数的应用场景。
跟周边系统的兼容性是数一数二的,尤其是大数据和流计算领域,几乎所有相关的开源软件都支持Kafka。
优点
Kafka高效,可伸缩,消息持久化。支持分区、副本和容错。
对批处理和异步处理做了大量的设计,因此Kafka可以得到非常高的性能。它的异步消息的发送和接收是三个中最好的,但是跟RocketMQ拉不开数量级,每秒处理几十万的消息
如果是异步消息,并且开启了压缩,Kafka最终可以达到每秒处理2000w消息的级别。
缺点
- 由于是异步的和批处理的,延迟也会高,不适合电商场景。
三种MQ对比
RabbitMQ | RocketMQ | Kafka | |
---|---|---|---|
单机吞吐量 | 1w量级 | 10w量级 | 10w量级 |
开发语言 | Erlang | Java | Java和Scala |
消息延迟 | 微秒 | 毫秒 | 毫秒 |
消息丢失 | 可能性很低 | 参数优化后可以0丢失 | 参数优化后可以0丢失 |
消费模式 | 推拉 | 推拉 | 拉取 |
主题数量对吞吐量的影响 | 几百上千个主题会对吞吐量有一个小的影响 | 几十上百个主题会极大影响吞吐量 | |
可用性 | 高(主从) | 很高(主从) | 很高(分布式) |
1.3 JMS规范和AMQP协议
1.3.1 JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM,Message oriented Middleware)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。与具体平台无关的API,绝大多数MOM提供商都支持。
1.3.1.1 JMS消息
消息是JMS中的一种类型对象,由两部分组成:报文头和消息主体。
报文头包括消息头字段和消息头属性。字段是JMS协议规定的字段,属性可以由用户按需添加。
JMS报文头全部字段
字段 | 含义 |
---|---|
JMSDestination | JMSDestination 字段包含了消息要发送到的目的地。 |
JMSDeliveryMode | JMSDeliveryMode 字段包含了消息在发送的时候指定的投递模式 |
JMSMessageID | 该字段包含了服务器发送的每个消息的唯一标识。 |
JMSTimestamp | 该字段包含了消息封装完成要发往服务器的时间。不是真正向服务器发送的时间,因为真正的发送时间,可能会由于事务或客户端消息排队而延后 |
JMSCorrelationID | 客户端使用该字段的值与另一条消息关联。一个典型的场景是使用该字段将响应消息与请求消息关联。JMSCorrelationID可以包含如下值: - 服务器规定的消息ID - 应用指定的字符串 - 服务器原生的byte[]值 |
JMSReplyTo | 该字段包含了在客户端发送消息的时候指定的Destination。即对该消息的响应应该发送到该字段指定的Destination。设置了该字段值的消息一般期望收到一个响应。 |
JMSRedelivered | 如果这个字段是true,则告知消费者应用这条消息已经发送过了,消费端应用应该小心别重复处理了。 |
JMSType | 消息发送的时候用于标识该消息的类型。具体有哪些类型,由JMS实现厂商决定。 |
JMSExpiration | 发送消息时,其到期时间将计算为send方法上指定的生存时间值与当前GMT值之和。 从send方法返回时,消息的JMSExpiration标头字段包含此值。 收到消息后,其JMSExpiration标头字段包含相同的值。 |
JMSPriority | JMS定义了一个十级优先级值,最低优先级为0,最高优先级为9。 此外,客户端应将优先级0-4视为正常优先级,将优先级5-9视为快速优先级。JMS不需要服务器严格执行消息的优先级排序; 但是,它应该尽力在普通消息之前传递加急消息。 |
消息主体则携带着应用程序的数据或有效负载。
根据有效负载的类型来划分,可以将消息分为几种类型:
- 简单文本(TextMessage)
- 可序列化的对象(ObjectMessage)
- 属性集合(MapMessage)
- 字节流(BytesMessage)
- 原始值流(StreamMessage)
- 无有效负载的消息(Message)。
1.3.1.2 JMS体系架构
JMS由以下元素组成:
JMS供应商产品
JMS接口的一个实现。该产品可以是Java的JMS实现,也可以是非Java的面向消息中间件的适配器。
JMS Client
生产或消费基于消息的Java的应用程序或对象。
JMS Producer
创建并发送消息的JMS客户。
JMS Consumer
接收消息的JMS客户。
JMS Message
包括可以在JMS客户之间传递的数据的对象
JMS Queue
缓存消息的容器。消息的接受顺序并不一定要与消息的发送顺序相同。消息被消费后将从队列中移除。
JMS Topic
Pub/Sub模式。
1.3.1.3 JMS对象模型
ConnectionFactory 接口(连接工厂)
JMS客户通过可移植的接口访问连接,这样当下层的实现改变时,代码不需要进行修改。管理员在JNDI名字空间中配置连接工厂,这样,JMS客户才能够查找到它们。根据消息类型的不同,用户将使用队列连接工厂,或者主题连接工厂。
Connection 接口(连接)
连接代表了应用程序和消息服务器之间的通信链路。在获得了连接工厂后,就可以创建一个与JMS提供者的连接。根据不同的连接类型,连接允许用户创建会话,以发送和接收队列和主题到目标。
Destination 接口(目标)
目标是一个包装了消息目标标识符的被管对象,消息目标是指消息发布和接收的地点,或者是队列,或者是主题。JMS管理员创建这些对象,然后用户通过JNDI发现它们。和连接工厂一样,管理员可以创建两种类型的目标,点对点模型的队列,以及发布者/订阅者模型的主题。
Session 接口(会话)
表示一个单线程的上下文,用于发送和接收消息。由于会话是单线程的,所以消息是连续的,就是说消息是按照发送的顺序一个一个接收的。会话的好处是它支持事务。如果用户选择了事务支持,会话上下文将保存一组消息,直到事务被提交才发送这些消息。在提交事务之前,用户可以使用回滚操作取消这些消息。一个会话允许用户创建消息,生产者来发送消息,消费者来接收消息。
MessageConsumer 接口(消息消费者)
由会话创建的对象,用于接收发送到目标的消息。消费者可以同步地(阻塞模式),或(非阻塞)接收队列和主题类型的消息。
MessageProducer 接口(消息生产者)
由会话创建的对象,用于发送消息到目标。用户可以创建某个目标的发送者,也可以创建一个通用的发送者,在发送消息时指定目标。
Message 接口(消息)
是在消费者和生产者之间传送的对象,也就是说从一个应用程序传送到另一个应用程序。一个消息有三个主要部分:
消息头(必须):包含用于识别和为消息寻找路由的操作设置。
一组消息属性(可选):包含额外的属性,支持其他提供者和用户的兼容。可以创建定制的字段和过滤器(消息选择器)。
一个消息体(可选):允许用户创建五种类型的消息(文本消息,映射消息,字节消息,流消息和对象消息)
1.3.1.4 JMS模式
Java消息服务应用程序结构支持两种模式:
点对点也叫队列模式
一个生产者向一个特定的队列发布消息,一个消费者从该队列中读取消息。这里,生产者知道消费者的队列,并直接将消息发送到消费者的队列,概括为:
一条消息只有一个消费者获得
生产者无需在接收者消费该消息期间处于运行状态,接收者也同样无需在消息发送时处于运行状态。
每一个成功处理的消息要么自动确认,要么由接收者手动确认
发布/订阅模式
- 支持向一个特定的主题发布消息。
- 0或多个订阅者可能对接收特定消息主题的消息感兴趣。
- 发布者和订阅者彼此不知道对方。
- 多个消费者可以获得消息
在发布者和订阅者之间存在时间依赖性。
发布者需要建立一个主题,以便客户能够订阅。
订阅者必须保持持续的活动状态以接收消息,否则会丢失未上线时的消息。
对于持久订阅,订阅者未连接时发布的消息将在订阅者重连时重发。
1.3.1.5 JMS传递方式
JMS有两种传递消息的方式。
标记为NON_PERSISTENT的消息最多投递一次,而标记为PERSISTENT的消息将使用暂存后再转送的机理投递。
如果一个JMS服务下线,持久性消息不会丢失,等该服务恢复时再传递。默认的消息传递方式是非持久性的。
1.3.1.6 JMS在集群中的问题
生产中应用基本上都是以集群部署的。在Queue模式下,消息的消费没有什么问题,因为不同节点的相同应用会抢占式地消费消息,这样还能分摊负载。
如果有两个项目A和B,都需要收到某个队列的消息那就要用Topic模式,如果A和B是单机部署的也没问题,当A和B是集群部署的就会有问题,A项目所有的节点都会收到消息,就会重复消费
解决方案一:选择Queue模式,创建多个一样的Queue,每个应用消费自己的Queue。
弊端:浪费空间,生产者还需要关注下游到底有几个消费者,违反了“解耦”的初衷。
解决方案二:选择Topic模式,在业务上做散列,或者通过分布式锁等方式来实现不同节点间的竞争。
弊端:对业务侵入较大,不是优雅的解决方法。
JMS是JEE平台的标准消息传递API。它可以在商业和开源实现中使用。每个实现都包括一个JMS服务器,一个JMS客户端库,以及用于管理消息传递系统的其他特定于实现的组件。JMS客户端API是标准化的,因此JMS应用程序可在供应商的实现之间移植。但是:
底层消息传递实现未指定,因此JMS实现之间没有互操作性。除非存在桥接技术,否则想要共享消息传递就必须使用相同的JMS实现。
如果没有供应商特定的JMS客户端库来启用互操作性,则非Java应用程序将无法访问JMS。
AMQP 0-9-1是一种消息传递协议,而不是像JMS这样的API。任何实现该协议的客户端都可以访问支持AMQP 0-9-1的代理。
协议级的互操作性允许以任何编程语言编写且在任何操作系统上运行的AMQP 0-9-1客户端都可以参与消息传递系统,而无需桥接不兼容的服务器实现。
1.3.2 AMQP
1.3.2.1 AMQP简介
AMQP全称高级消息队列协议(Advanced Message Queuing Protocol),是一种标准,类似于JMS,兼容JMS协议。目前RabbitMQ主流支持AMQP 0-9-1,3.8.4版本支持AMQP 1.0。
AMQP是一个二进制的协议,信息被组织成数据帧,有很多类型。数据帧携带协议方法和其他信息。所有数据帧都拥有基本相同的格式:帧头,负载,帧尾。数据帧负载的格式依赖于数据帧的类型。
我们假定有一个可靠的面向流的网络传输层(TCP/IP或等价的协议)。
在一个单一的socket连接中,可能有多个相互独立的控制线程,称为“channel”。每个数据帧使用通道号码编号。通过数据帧的交织,不同的通道共享一个连接。对于任意给定通道,数据帧严格按照序列传输。
我们使用小的数据类型来构造数据帧,如bit,integer,string以及字段表。数据帧的字段做了轻微的封装,不会让传输变慢或解析困难。根据协议规范机械地生成成数据帧层相对简单。
线级别的格式被设计为可伸缩和足够通用,以支持任意的高层协议(不仅是AMQP)。我们假定AMQP会扩展,改进以及随时间的其他变化,并要求wire-level格式支持这些变化
1.3.2.2 AMQP概念
Publisher
消息发送者,将消息发送到Exchange并指定RoutingKey,以便queue可以接收到指定的消息。
Consumer
消息消费者,从queue获取消息,一个Consumer可以订阅多个queue以从多个queue中接收消息。
Server
一个具体的MQ服务实例,也称为Broker。
Virtual host
虚拟主机,一个Server下可以有多个虚拟主机,用于隔离不同项目,一个Virtual host通常包含多个Exchange、Message Queue。
Exchange
交换器,接收Producer发送来的消息,把消息转发到对应的Message Queue中。
Routing key
路由键,用于指定消息路由规则(Exchange将消息路由到具体的queue中),通常需要和具体的Exchange类型、Binding的Routing key结合起来使用
Bindings
指定了Exchange和Queue之间的绑定关系。Exchange根据消息的Routing key和Binding配置(绑定关系、Binding、Routing key等)来决定把消息分派到哪些具体的queue中。这依赖于Exchange类型。
Message Queue
实际存储消息的容器,并把消息传递给最终的Consumer。
1.3.2.2 AMQP数据类型
Integers(数值范围1-8的十进制数字):用于表示大小,数量,限制等,整数类型无符号的,可以在帧内不对齐。
Bits(统一为8个字节):用于表示开/关值。
Short strings:用于保存简短的文本属性,字符串个数限制为255,8个字节
Long strings:用于保存二进制数据块。
Field tables:包含键值对,字段值一般为字符串,整数等
1.3.2.3 AMQP协议协商
AMQP客户端和服务端进行协议协商。意味着当客户端连接上之后,服务端会向客户端提出一些选项,客户端必须能接收或修改。如果双方都认同协商的结果,继续进行连接的建立过程。协议协商是一个很有用的技术手段,因为它可以让我们断言假设和前置条件。
在AMQP中,我们需要协商协议的一些特殊方面:
- 真实的协议和版本。服务器可能在同一个端口支持多个协议。
- 双方的加密参数和认证方式。这是功能层的一部分。
- 数据帧最大大小,通道数量以及其他操作限制。
对限制条件的认同可能会导致双方重新分配key的缓存,避免死锁。每个发来的数据帧要么遵守认同的限制,也就是安全的,要么超过了限制,此时另一方出错,必须断开连接。
协商双方认同限制到一个小的值,如下:
服务端必须告诉客户端它加上了什么限制。
客户端响应服务器,或许会要求对客户端的连接降低限制。
1.3.2.4 AMQP数据帧界定
TCP/IP是流协议,没有内置的机制用于界定数据帧。现有的协议从以下几个方面来解决:
每个连接发送单一数据帧。简单但是慢。
在流中添加帧的边界。简单,但是解析很慢。
计算数据帧的大小,在每个数据帧头部加上该数据帧大小。这简单,快速,AMQP的选择。
1.3.2.5 AMQP实现JMS
RabbitMQ的JMS客户端用RabbitMQ Java客户端实现,既与JMS API兼容,也与AMQP 0-9-1协议兼容。
RabbitMQ JMS客户端不支持某些JMS 1.1功能,有一定局限性:
JMS客户端不支持服务器会话。
XA事务支持接口未实现。
RabbitMQ JMS主题选择器插件支持主题选择器。队列选择器尚未实现。
支持RabbitMQ连接的SSL和套接字选项,但仅使用RabbitMQ客户端提供的(默认)SSL连接协议。
RabbitMQ不支持JMS NoLocal订阅功能,该功能禁止消费者接收通过消费者自己的连接发布的消息。
RabbitMQ使用amqp协议,JMS规范仅对于Java的使用作出的规定,跟其他语言无关,协议是语言无关的,只要语言实现了该协议,就可以做客户端。如此,则不同语言之间互操作性得以保证。
AMQP协议文档下载地址:https://www.amqp.org/sites/amqp.org/files/amqp0-9-1.zip
二.RabbitMQ
2.1 架构
2.1.1 Exchange类型
RabbitMQ常用的交换器类型有: fanout 、 direct 、 topic 、 headers 四种。
fanout
会把所有发送到该交换器的消息路由到所有与该交换器绑定的队列中,如图:
direct
direct类型的交换器路由规则很简单,它会把消息路由到那些BindingKey和RoutingKey完全匹配的队列中,如下图:
topic
topic类型的交换器在direct匹配规则上进行了扩展,也是将消息路由到BindingKey和RoutingKey
相匹配的队列中,这里的匹配规则稍微不同,它约定:
BindingKey和RoutingKey一样都是由"."分隔的字符串;BindingKey中可以存在两种特殊字符
*
和#
,用于模糊匹配,其中*
用于匹配一个单词,#
用于匹配多个单词(可以是0个)headers
headers类型的交换器不依赖于路由键的匹配规则来路由信息,而是根据发送的消息内容中的headers属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送的消息到交换器时,RabbitMQ会获取到该消息的headers,对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果匹配,消息就会路由到该队列。headers类型的交换器性能很差,不实用
2.1.2 数据存储
RabbitMQ消息有两种类型:
- 持久化消息和非持久化消息。
- 这两种消息都会被写入磁盘。
持久化消息在到达队列时写入磁盘,同时会内存中保存一份备份,当内存吃紧时,消息从内存中清除。这会提高一定的性能。
非持久化消息一般只存于内存中,当内存压力大时数据刷盘处理,以节省内存空间。
RabbitMQ存储层包含两个部分:队列索引和消息存储
队列索引:rabbit_queue_index ,索引维护队列的落盘消息的信息,如存储地点、是否已被给消费者接收、是否已被消费者ack等。每个队列都有相对应的索引。
索引使用顺序的段文件来存储,后缀为.idx,文件名从0开始累加,每个段文件中包含固定的segment_entry_count 条记录,默认值是16384。每个index从磁盘中读取消息的时候,至少要在内存中维护一个段文件**,所以设置 queue_index_embed_msgs_below 值得时候要**格外谨慎,一点点增大也可能会引起内存爆炸式增长。
消息存储:rabbit_msg_store,消息以键值对的形式存储到文件中,一个虚拟主机上的所有队列使用同一块存储,每个节点只有一个。存储分为持久化存储(msg_store_persistent)和短暂存储(msg_store_transient)。持久化存储的内容在broker重启后不会丢失,短暂存储的内容在broker重启后丢失。
store使用文件来存储,后缀为.rdq,经过处理的所有消息都会以追加的方式写入到该文件中,当该文件的大小超过指定的限制(file_size_limit)后,将会关闭该文件并创建一个新的文件以供新的消息写入。文件名从0开始进行累加。在进行消息的存储时,RabbitMQ会在ETS(Erlang Term Storage)表中记录消息在文件中的位置映射和文件的相关信息。
消息(包括消息头、消息体、属性)可以直接存储在index中,也可以存储在store中。最佳的方式是较小的消息存在index中,而较大的消息存在store中。这个消息大小的界定可以通过queue_index_embed_msgs_below 来配置,默认值为4096B。当一个消息小于设定的大小阈值时,就可以存储在index中,这样性能上可以得到优化,否则放到持久化消息文件中
读取消息时,先根据消息的ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息内容。如果文件不存在或者被锁住了,则发送请求由store进行处理。
删除消息时,只是从ETS表删除指定消息的相关信息,同时更新消息对应的存储文件和相关信息。在执行消息删除操作时,并不立即对文件中的消息进行删除,也就是说消息依然在文件中,仅仅是标记为垃圾数据而已。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并成一个文件,并且所有的垃圾数据的大小和所有文件(至少有3个文件存在的情况下)的数据大小的比值超过设置的阈值garbage_fraction(默认值0.5)时,才会触发垃圾回收,将这两个文件合并,执行合并的两个文件一定是逻辑上相邻的两个文件。合并逻辑:
- 锁定这两个文件
- 先整理前面的文件的有效数据,再整理后面的文件的有效数据
- 将后面文件的有效数据写入到前面的文件中
- 更新消息在ETS表中的记录
- 删除后面文件
队列结构
通常队列由rabbit_amqqueue_process和backing_queue这两部分组成,rabbit_amqqueue_process负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的confirm和消费端的ack)等。backing_queue是消息存储的具体形式和引擎,并向rabbit_amqqueue_process提供相关的接口以供调用。
如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。
rabbit_variable_queue.erl 源码中定义了RabbitMQ队列的4种状态:
alpha:消息索引和消息内容都存内存,最耗内存,很少消耗CPU
beta:消息索引存内存,消息内容存磁盘
gama:消息索引内存和磁盘都有,消息内容存磁盘
delta:消息索引和内容都存磁盘,基本不消耗内存,消耗更多CPU和I/O操作
消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断流动,消息的状态会不断变化。
持久化的消息,索引和内容都必须先保存在磁盘上,才会处于上述状态中的一种
gama状态只有持久化消息才会有的状态。
在运行时,RabbitMQ会根据消息传递的速度定期计算一个当前内存中能够保存的最大消息数量(target_ram_count),如果alpha状态的消息数量大于此值,则会引起消息的状态转换,多余的消息可能会转换到beta、gama或者delta状态。区分这4种状态的主要作用是满足不同的内存和CPU需求。
对于普通没有设置优先级和镜像的队列来说,backing_queue的默认实现是rabbit_variable_queue,其内部通过5个子队列Q1、Q2、delta、Q3、Q4来体现消息的各个状态。
当消费者获取消息时:
首先会从Q4中获取消息,如果获取成功则返回。
如果Q4为空,则尝试从Q3中获取消息,系统首先会判断Q3是否为空,如果为空则返回队列为空,即此时队列中无消息。
如果Q3不为空,则取出Q3中的消息;进而再判断此时Q3和Delta中的长度,如果都为空,则可以认为 Q2、Delta、 Q3、Q4 全部为空,此时将Q1中的消息直接转移至Q4,下次直接从Q4 中获取消息。
如果Q3为空,Delta不为空,则将Delta的消息转移至Q3中,下次可以直接从Q3中获取消息。
在将消息从Delta转移到Q3的过程中,是按照索引分段读取的,首先读取某一段,然后判断读取的消息的个数与Delta中消息的个数是否相等,如果相等,则可以判定此时Delta中己无消息,则直接将Q2和刚读取到的消息一并放入到Q3中,如果不相等,仅将此次读取到的消息转移到Q3。
通常在负载正常时,如果消费速度大于生产速度,对于不需要保证可靠不丢失的消息来说,极有可能只会处于alpha状态。
对于持久化消息,它一定会进入gamma状态,在开启publisher confirm机制时,只有到了gamma 状态时才会确认该消息己被接收,若消息消费速度足够快、内存也充足,这些消息也不会继续走到下一个状态。
在系统负载较高时,消息若不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中,继续增大处理每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。
应对这一问题一般有3种措施:
增加prefetch_count的值,即一次发送多条消息给消费者,加快消息被消费的速度。
采用multiple ack,降低处理 ack 带来的开销
流量控制
2.1.3 常用命令
# 前台启动Erlang VM和RabbitMQ rabbitmq-server # 后台启动 rabbitmq-server -detached # 停止RabbitMQ和Erlang VM rabbitmqctl stop # 查看所有队列 rabbitmqctl list_queues # 查看所有虚拟主机 rabbitmqctl list_vhosts # 在Erlang VM运行的情况下启动RabbitMQ应用 rabbitmqctl start_app rabbitmqctl stop_app # 查看节点状态 rabbitmqctl status # 查看所有可用的插件 rabbitmq-plugins list # 启用插件 rabbitmq-plugins enable <plugin-name> # 停用插件 rabbitmq-plugins disable <plugin-name> # 添加用户 rabbitmqctl add_user username password # 列出所有用户: rabbitmqctl list_users # 删除用户: rabbitmqctl delete_user username # 清除用户权限: rabbitmqctl clear_permissions -p vhostpath username # 列出用户权限: rabbitmqctl list_user_permissions username # 修改密码: rabbitmqctl change_password username newpassword # 设置用户权限: rabbitmqctl set_permissions -p vhostpath username ".*" ".*" ".*" # 创建虚拟主机: rabbitmqctl add_vhost vhostpath # 列出所以虚拟主机: rabbitmqctl list_vhosts # 列出虚拟主机上的所有权限: rabbitmqctl list_permissions -p vhostpath # 删除虚拟主机: rabbitmqctl delete_vhost vhost vhostpath # 移除所有数据,要在 rabbitmqctl stop_app 之后使用: rabbitmqctl reset
2.1.4 工作流程
2.1.4.1 生产者发送消息过程
生产者连接RabbitMQ,建立TCP连接( Connection),开启信道(Channel)
生产者声明一个Exchange(交换器),并设置相关属性,比如交换器类型、是否持久化等
生产者声明一个队列井设置相关属性,比如是否排他、是否持久化、是否自动删除等
生产者通过 bindingKey (绑定Key)将交换器和队列绑定( binding )起来
生产者发送消息至RabbitMQ Broker,其中包含 routingKey (路由键)、交换器等信息
相应的交换器根据接收到的 routingKey 查找相匹配的队列。
如果找到,则将从生产者发送过来的消息存入相应的队列中。
如果没有找到,则根据生产者配置的属性选择丢弃还是回退给生产者
关闭信道。
关闭连接。
2.1.4.2 消费者消费消息过程
消费者连接到RabbitMQ Broker ,建立一个连接(Connection ) ,开启一个信道(Channel) 。
消费者向RabbitMQ Broker 请求消费相应队列中的消息,可能会设置相应的回调函数, 以及做一些准备工作
等待RabbitMQ Broker 回应并投递相应队列中的消息, 消费者接收消息。
消费者确认( ack) 接收到的消息。
RabbitMQ 从队列中删除相应己经被确认的消息。
关闭信道。
关闭连接。
2.1.4.3 Connection 和Channel关系
生产者和消费者,需要与RabbitMQ Broker 建立TCP连接,也就是Connection 。一旦TCP 连接建立起来,客户端紧接着创建一个AMQP 信道(Channel),每个信道都会被指派一个唯一的ID。信道是建立在Connection 之上的虚拟连接, RabbitMQ 处理的每条AMQP 指令都是通过信道完成的。
RabbitMQ 采用类似NIO的做法,复用TCP 连接,减少性能开销,便于管理。当每个信道的流量不是很大时,复用单一的Connection 可以在产生性能瓶颈的情况下有效地节省TCP 连接资源。
当信道本身的流量很大时,一个Connection 就会产生性能瓶颈,流量被限制。需要建立多个Connection,分摊信道。具体的调优看业务需要。
信道在AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面进行的。
RabbitMQ 相关的API与AMQP紧密相连,比如channel.basicPublish 对应AMQP 的Basic.Publish命令。
2.1.5 工作模式
可以查看官网:https://www.rabbitmq.com/getstarted.html
如果无论何时连接RabbitMQ的时候,都需要一个新的,空的队列。我们可以使用随机的名字创建队列,也可以让服务器帮我们生成随机的消息队列名字。
其次,一旦我们断开到消费者的连接,该队列应该自动删除
String queueName = channel.queueDeclare().getQueue();
上述代码我们声明了一个非持久化的、排他的、自动删除的队列,并且名字是服务器随机生成的。queueName一般的格式类似: amq.gen-JzTY20BRgKO-HjmUJj0wLg 。
实现RabbitMQ的消费者有两种模式,推模式(Push)和拉模式(Pull)。 实现推模式推荐的方式是继承 DefaultConsumer 基类,也可以使用Spring AMQP的 SimpleMessageListenerContainer 。 推模式是最常用的,但是有些情况下推模式并不适用的,比如说: 由于某些限制,消费者在某个条件成立时才能消费消息 需要批量拉取消息进行处理 实现拉模式 RabbitMQ的Channel提供了 basicGet 方法用于拉取消息。
2.1.5.1 Work Queue
生产者发消息,启动多个消费者实例来消费消息,每个消费者仅消费部分信息,可达到负载均衡的效果。
如果在发送消息时没有指定交换器那会使用默认的交换器来发送消息,发送的目的队列就是指定的routingKey
工作队列模式就是利用这个性质来省掉了指定交换器,实际上会有一个默认的交换器来工作
2.1.5.2 Publish/Subscribe
使用fanout类型交换器,routingKey忽略。每个消费者定义生成一个队列并绑定到同一个Exchange,每个消费者都可以消费到完整的消息。
消息广播给所有订阅该消息的消费者。
在RabbitMQ中,生产者不是将消息直接发送给消息队列,实际上生产者根本不知道一个消息被发送到哪个队列。
生产者将消息发送给交换器。交换器非常简单,从生产者接收消息,将消息推送给消息队列。交换器必须清楚地知道要怎么处理接收到的消息。应该是追加到一个指定的队列,还是追加到多个队列,还是丢弃。规则就是交换器类型。
发布订阅使用fanout类型的交换器
2.1.5.3 Routing
使用 direct 类型的Exchange,发N条消费并使用不同的 routingKey ,消费者定义队列并将队列、 routingKey 、Exchange绑定。此时使用 direct 模式Exchagne必须要 routingKey 完全匹配的情况下消息才会转发到对应的队列中被消费。
2.1.5.4 Topics
使用 topic 类型的交换器,队列绑定到交换器、 bindingKey 时使用通配符,交换器将消息路由转发到具体队列时会根据消息 routingKey 模糊匹配,比较灵活。
routingKey 就不能随便写了,它必须得是点分单词。单词可以随便写,生产中一般使用消息的特征。如:“stock.usd.nyse”,“nyse.vmw”,“quick.orange.rabbit”等。该点分单词字符串最长255字节。
如果发送的消息 routingKey 是" lazy.orange.male.rabbit ",则会匹配最后一个绑定
2.1.6 springBoot整合RabbitMQ 案例
maven依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
配置
spring.application.name=springboot_rabbitmq spring.rabbitmq.host=myHost spring.rabbitmq.virtual-host=/ spring.rabbitmq.username=myName spring.rabbitmq.password=myPassword spring.rabbitmq.port=5672
RabbitConfig
在springboot中会生成默认的ConnectionFactory、AmqpAdmin、RabbitTemplate、SimpleRabbitListenerContainerFactory
@Configuration public class RabbitConfig { @Bean public Queue queue() { return new Queue("queueTest"); } @Bean public Exchange exchange() { return new DirectExchange("exTest", false, false, null); } @Bean public Binding binding(Queue queue, Exchange exchange) { Binding binding = BindingBuilder.bind(queue) .to(exchange) .with("test").noargs(); return binding; } }
监听器
@Component public class HelloConsumer { @RabbitListener(queues = "queueTest") public void service(String message) { System.out.println("消息队列推送来的消息:" + message); } }
2.2 RabbitMQ高级特性解析
2.2.1 消息可靠性
以支付平台举例,支付平台如何保证一笔帐不出问题?
支付平台必须保证数据正确性,保证数据并发安全性,保证数据最终一致性。支付平台通过如下几种方式保证数据一致性:
分布式锁
在操作某条数据时先锁定,可以用redis或zookeeper等常用框架来实现。 比如在修改账单时,先锁定该账单,如果该账单有并发操作,后面的操作只能等待上一个操作的锁释放后再依次执行。
优点:能够保证数据强一致性。 缺点:高并发场景下可能有性能问题。
消息队列
消息队列是为了保证最终一致性,我们需要确保消息队列有ack机制 客户端收到消息并消费处理完成后,客户端发送ack消息给消息中间件 如果消息中间件超过指定时间还没收到ack消息,则定时去重发消息。
优点:异步、高并发
缺点:有一定延时、数据弱一致性,并且必须能够确保该业务操作肯定能够成功完成,不可能失败。
异常捕获
先执行业务操作,业务操作成功后执行消息发送,消息发送过程通过try catch 方式捕获异常,在异常处理的代码块中执行行回滚业务操作或者执行行重发操作等。这是一种最大努力确保的方式,并无法保证100%绝对可靠,因为这里没有异常并不代表消息就一定投递成功。
另外,可以通过spring.rabbitmq.template.retry.enabled=true 配置开启发送端的重试
AMQP/RabbitMQ的事务机制
没有捕获到异常并不能代表消息就一定投递成功了。可以使用channel事务模式,一直到事务提交后都没有异常,确实就说明消息是投递成功了。但是这种方式在性能方面的开销比较大,一般也不推荐使用。
生产者确认机制
RabbitMQ后来引入了一种轻量量级的方式,叫发送方确认(publisher confirm)机制。生产者将信道设置成confirm(确认)模式,一旦信道进入confirm 模式,所有在该信道上⾯面发布的消息都会被指派一个唯一的ID(从1 开始),一旦消息被投递到所有匹配的队列之后(如果消息和队列是持久化的,那么确认消息会在消息持久化后发出),RabbitMQ 就会发送一个确认(Basic.Ack)给生产者(包含消息的唯一ID),这样生产者就知道消息已经正确送达了
RabbitMQ 回传给生产者的确认消息中的deliveryTag 字段包含了确认消息的序号,另外,通过设置channel.basicAck方法中的multiple参数,表示到这个序号之前的所有消息是否都已经得到了处理了。生产者投递消息后并不需要一直阻塞着,可以继续投递下一条消息并通过回调方式处理理ACK响应。如果 RabbitMQ 因为自身内部错误导致消息丢失等异常情况发生,就会响应一条nack(Basic.Nack)命令,生产者应用程序同样可以在回调方法中处理理该 nack 命令。
waitForConfirmsOrDie方法在Broker端在返回nack(Basic.Nack)之后会抛出java.io.IOException。TimeoutException超时是属于第三状态(无法确定成功还是失败),而返回Basic.Nack抛出IOException这种是明确的失败。
// 获取通道 Channel channel = connection.createChannel(); channel.confirmSelect(); String exchangeName = "exTest"; String routeName = "test"; String queueName = "queueTest"; String message = "hello world 1"; // 声明消息队列 消息队列名称 // 是否是持久化的 // 是否是排他的 // 是否是自动删除的 // 消息队列的属性信息。使用默认值; channel.queueDeclare(queueName, false, false, true, null); // 声明交换器 // 交换器的名称 // 交换器的类型 // 交换器是否是持久化的 // 交换器是否是自动删除的 // 交换器的属性map集合 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, false, false, null); // 将交换器和消息队列绑定,并指定路由键 channel.queueBind(queueName, exchangeName, routeName); //即将发送消息的no // long nextPublishSeqNo = channel.getNextPublishSeqNo(); // 发送消息 // 交换器的名字 // 该消息的路由键 // 该消息的属性BasicProperties对象 // 消息的字节数组 channel.basicPublish(exchangeName, routeName, null, message.getBytes()); try { channel.waitForConfirmsOrDie(5000); System.out.println("消息被确认:message = " + message); } catch (IOException e) { e.printStackTrace(); System.err.println("消息被拒绝! message = " + message); } catch (InterruptedException e) { e.printStackTrace(); System.err.println("在不是Publisher Confirms的通道上使用该方法"); } catch (TimeoutException e) { e.printStackTrace(); System.err.println("等待消息确认超时! message = " + message); } // 关闭通道 channel.close(); // 关闭连接 connection.close();
实际上,我们也可以通过“批处理理”的方式来改善整体的性能(即批量量发送消息后仅调用一次waitForConfirms方法)。正常情况下这种批量处理的方式效率会高很多,但是如果发生了超时或者nack(失败)后那就需要批量量重发消息或者通知上游业务批量回滚(因为我们只知道这个批次中有消息没投递成功,而并不知道具体是那条消息投递失败了,所以很难针对性处理),批量重发消息肯定会造成部分消息重复。
可以通过异步回调的方式来处理Broker的响应。addConfirmListener 方法可以添加ConfirmListener 这个回调接口,这个 ConfirmListener 接口包含两个方法:handleAck 和handleNack,分别用来处理 RabbitMQ 回传的 Basic.Ack 和 Basic.Nack。
// 获取通道 Channel channel = connection.createChannel(); channel.confirmSelect(); String exchangeName = "exConfirmSelectTest"; String routeName = "confirmSelectTest"; String queueName = "queueConfirmSelectTest"; String message = "hello world "; // 声明消息队列 消息队列名称 // 是否是持久化的 // 是否是排他的 // 是否是自动删除的 // 消息队列的属性信息。使用默认值; channel.queueDeclare(queueName, false, false, true, null); // 声明交换器 // 交换器的名称 // 交换器的类型 // 交换器是否是持久化的 // 交换器是否是自动删除的 // 交换器的属性map集合 channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT, false, false, null); // 将交换器和消息队列绑定,并指定路由键 channel.queueBind(queueName, exchangeName, routeName); ConcurrentNavigableMap<Long, String> outstandingConfirms = new ConcurrentSkipListMap<>(); ConfirmCallback cleanOutstandingConfirms = (sequenceNumber, multiple) -> { if (multiple) { System.out.println("小于等于 " + sequenceNumber + " 的消息都被确认了"); // 获取map集合的子集 ConcurrentNavigableMap<Long, String> headMap = outstandingConfirms.headMap(sequenceNumber, true); // 清空子集 outstandingConfirms中的数据也会被清 headMap.clear(); } else { System.out.println(sequenceNumber + " 对应的消息被确认"); outstandingConfirms.remove(sequenceNumber); } }; channel.addConfirmListener(cleanOutstandingConfirms, (sequenceNumber, multiple) -> { if (multiple) { System.out.println("小于等于 " + sequenceNumber + " 的消息都不确认了"); ConcurrentNavigableMap<Long, String> headMap = outstandingConfirms.headMap(sequenceNumber, true); // 清空子集 outstandingConfirms中的数据也会被清 headMap.clear(); } else { System.out.println(sequenceNumber + " 对应的消息不确认"); outstandingConfirms.remove(sequenceNumber); } }); for (int i = 0; i < 1000; i++) { long nextPublishSeqNo = channel.getNextPublishSeqNo(); channel.basicPublish(exchangeName, routeName, null, (message + i).getBytes()); System.out.println("序列号为:" + nextPublishSeqNo + "的消息已经发送了:" + (message + i)); outstandingConfirms.put(nextPublishSeqNo, (message + i)); } Thread.sleep(60000); // 关闭通道 channel.close(); // 关闭连接 connection.close();
持久化存储机制
持久化是提高RabbitMQ可靠性的基础,否则当RabbitMQ遇到异常时(如:重启、断电、停机等)数据将会丢失。主要从以下几个方面来保障消息的持久性:
Exchange的持久化。通过定义时设置durable 参数为ture来保证Exchange相关的元数据不丢失。
Queue的持久化。也是通过定义时设置durable 参数为ture来保证Queue相关的元数据不不丢失。
消息的持久化。通过将消息的投递模式 (BasicProperties 中的 deliveryMode 属性)设置为 2即可实现消息的持久化,保证消息自身不丢失。
RabbitMQ中的持久化消息都需要写入磁盘(当系统内存不不足时,非持久化的消息也会被刷盘处理),这些处理理动作都是在“持久层”中完成的。持久层是一个逻辑上的概念,实际包含两个部分:
队列索引(rabbit_queue_index),rabbit_queue_index 负责维护Queue中消息的信息,包括消息的存储位置、是否已交给消费者、是否已被消费及Ack确认等,每个Queue都有与之对应的rabbit_queue_index。
消息存储(rabbit_msg_store),rabbit_msg_store 以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。
HOSTNAME/msg_stores/vhosts/$VHostId 这个路路径下包含 queues、msg_store_persistent、msg_store_transient 这 3 个目录,这是实际存储消息的位置。其中queues目录中保存着rabbit_queue_index相关的数据,而msg_store_persistent保存着持久化消息数据,msg_store_transient保存着⾮非持久化相关的数据。
另外,RabbitMQ通过配置queue_index_embed_msgs_below可以根据消息大小决定存储位置,默认queue_index_embed_msgs_below是4096字节(包含消息体、属性及headers),小于该值的消息存在rabbit_queue_index中。
消费者确认机制
前面我们讲了生产者发送确认机制和消息的持久化存储机制,然而这依然无法完全保证整个过程的可靠性,因为如果消息被消费过程中业务处理失败了但是消息却已经出列了(被标记为已消费了),我们又没有任何重试,那结果跟消息丢失没什么分别。
RabbitMQ在消费端会有Ack机制,即消费端消费消息后需要发送Ack确认报文给Broker端,告知自己是否已消费完成,否则可能会一直重发消息直到消息过期(AUTO模式)。这也是我们之前一直在讲的“最终一致性”、“可恢复性” 的基础。
一般而言,我们有如下处理手段:
采用NONE模式。只要消费者收到消息后就立即确认(消息出列,标记已消费),有丢失数据的风险。消费的过程中自行捕获异常,引发异常后直接记录日志并落到异常恢复表,再通过后台定时任务扫描异常恢复表尝试做重试动作。如果业务不自行处理则有丢失数据的风险
采用AUTO(自动Ack)模式。如果消费者抛出异常则消息会返回到队列中。不主动捕获异常,当消费过程中出现异常时会将消息放回Queue中,然后消息会被重新分配到其他消费者节点(如果没有则还是选择当前节点)重新被消费,默认会一直重发消息并直到消费完成返回Ack或者一直到过期
采用MANUAL(手动Ack)模式,消费者自行控制流程并手动调用channel相关的方法返回Ack
消费端限流
当消息投递速度远快于消费速度时,随着时间积累就会出现“消息积压”。消息中间件本身是具备一定的缓冲能力的,但这个能力是有容量限制的,如果长期运行并没有任何处理,最终会导致Broker崩溃,而分布式系统的故障往往会发生上下游传递
解决方案如下:
- 方案一:
RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值后,生产者将被阻塞(block),直到对应项指标恢复正常。全局上可以防止超大流量、消息积压等导致的Broker被压垮。当内存受限或磁盘可用空间受限的时候,服务器都会暂时阻止连接,服务器将暂停从发布消息的已连接客户端的套接字读取数据。连接心跳监视也将被禁用。所有网络连接将在rabbitmqctl和管理插件中显示为“已阻止”,这意味着它们已发布,现在已暂停。兼容的客户端被阻止时将收到通知。
在/etc/rabbitmq/rabbitmq.conf中配置磁盘可用空间大小:
#设置磁盘可用空间大小,单位是默认是字节 #当磁盘可用空间低于这个值的时候发出磁盘警告,触发限流 #如果设置了相对大小则忽略该值 disk_free_limit.absolute=50000 #相对于总内存的相对值,不要低于1.0 #如果值是2.0 当磁盘可用空间低于总可用内存的2.0倍的时候,触发限流 disk_free_limit.relative=2.0
方案二
RabbitMQ 还默认提供了一种基于credit flow 的流控机制,面向每一个连接进行流控。当单个队列达到最大流速时,或者多个队列达到总流速时,都会触发流控。触发单个连接的流控可能是因为connection、channel、queue的某一个过程处于flow状态,这些状态都可以从监控平台看到。
方案三
RabbitMQ中有一种QoS保证机制,可以限制Channel上接收到的未被ACK的消息量,如果超过这个数量限制RabbitMQ将不会再往消费端推送消息。这是一种流控手段,可以防止大量消息瞬时从Broker送达消费端造成消费端巨大压力(甚至压垮消费端)。比较值得注意的是QoS机制仅对于消费端推模式有效,对拉模式无效。而且不支持NONE Ack模式。执行channel.basicConsume 方法之前通过 channel.basicQoS 方法可以设置该数量。消息的发送是异步的,消息的确认也是异步的。在消费者消费慢的时候,可以设置Qos的prefetchCount,它表示broker在向消费者发送消息的时候,一旦发送了prefetchCount个消息而没有一个消息确认的时候,就停止发送。消费者确认一个,broker就发送一个,确认两个就发送两个。换句话说,消费者确认多少,broker就发送多少,消费者等待处理的个数永远限制在prefetchCount个。
如果对于每个消息都发送确认,增加了网络流量,此时可以批量确认消息。如果设置了multiple为true,消费者在确认的时候,比如说id是8的消息确认了,则在8之前的所有消息都确认了。
生产者往往是希望自己产生的消息能快速投递出去,而当消息投递太快且超过了下游的消费速度时就容易出现消息积压/堆积,所以,从上游来讲我们应该在生产端应用程序中也可以加入限流、应急开关等控制手段,避免超过Broker端的极限承载能力或者压垮下游消费者。
再看看下游,我们期望下游消费端能尽快消费完消息,而且还要防止瞬时大量消息压垮消费端(推模式),我们期望消费端处理速度是最快、最稳定而且还相对均匀(比较理想化)。
提升下游应用的吞吐量和缩短消费过程的耗时,优化主要以下几种方式:
- 优化应用程序的性能,缩短响应时间(需要时间)
- 增加消费者节点实例(成本增加,而且底层数据库操作这些也可能是瓶颈)
- 调整并发消费的线程数(线程数并非越大越好,需要大量压测调优至合理值)
消息可靠传输
消息可靠传输一般是业务系统接入消息中间件时首要考虑的问题,一般消息中间件的消息传输保障
分为三个层级:
At most once
最多一次。消息可能会丢失,但绝不会重复传输
生产者随意发送,消费者随意消费,不过这样很难确保消息不会丢失。
At least once
最少一次。消息绝不会丢失,但可能会重复传输
- 消息生产者需要开启事务机制或者publisher confirm 机制,以确保消息可以可靠地传输到RabbitMQ 中。
消息生产者需要配合使用 mandatory 参数或者备份交换器来确保消息能够从交换器路由到队列中,进而能够保存下来而不会被丢弃。
消息和队列都需要进行持久化处理,以确保RabbitMQ 服务器在遇到异常情况时不会造成消息丢失。
消费者在消费消息的同时需要将autoAck 设置为false,然后通过手动确认的方式去确认已经正确消费的消息,以避免在消费端引起不必要的消息丢失。
Exactly once
恰好一次。每条消息肯定会被传输一次且仅传输一次
是RabbitMQ 目前无法保障的。
考虑这样一种情况,消费者在消费完一条消息之后向RabbitMQ 发送确认Basic.Ack 命令,此时由于网络断开或者其他原因造成RabbitMQ 并没有收到这个确认命令,那么RabbitMQ 不会将此条消息标记删除。在重新建立连接之后,消费者还是会消费到这一条消息,这就造成了重复消费。
再考虑一种情况,生产者在使用publisher confirm机制的时候,发送完一条消息等待RabbitMQ返回确认通知,此时网络断开,生产者捕获到异常情况,为了确保消息可靠性选择重新发送,这样RabbitMQ 中就有两条同样的消息,在消费的时候消费者就会重复消费。
消息幂等处理
追求高性能就无法保证消息的顺序,而追求可靠性那么就可能产生重复消息,从而导致重复消费,做架构就是权衡取舍。
RabbitMQ层面没有实现“去重机制”来保证“恰好一次”
一般解决重复消息的办法是,在消费端让我们消费消息的操作具备幂等性。
幂等性问题并不是消息系统独有,而是(分布式)系统中普遍存在的问题。例如:RPC框架调用超后会重试,HTTP请求会重复发起(用户手抖多点了几下按钮)
一个幂等操作的特点是,其任意多次执行所产生的影响均与一次执行的影响相同。一个幂等的方法,使用同样的参数,对它进行多次调用和一次调用,对系统产生的影响是一样的。对于幂等的方法,不用担心重复执行会对系统造成任何改变。
业界对于幂等性的一些常见做法:
借助数据库唯一索引,重复插入直接报错,事务回滚。还是举经典的转账的例子,为了保证不重复扣款或者重复加钱,我们这边维护一张“资金变动流水表”,里面至少需要交易单号、变动账户、变动金额等3个字段。我们选择交易单号和变动账户做联合唯一索引(单号是上游生成的可保证唯一性),这样如果同一笔交易发生重复请求时就会直接报索引冲突,事务直接回滚。现实中,数据库唯一索引的方式通常做为兜底保证;
前置检查机制。执行更改账户余额这个动作之前,我得先检查下资金变动流水表(或者Tair中)中是否已经存在这笔交易相关的记录了, select * from xxx where accountNumber=xxx and orderId=yyy ,如果已经存在,那么直接返回,否则执行正常的更新余额的动作。为了防止并发问题,我们通常需要借助**“排他锁**”来完成。在支付宝有一条铁律叫:一锁、二判、三操作。当然,我们也可以使用乐观锁或CAS机制,乐观锁一般会使用扩展一个版本号字段做判断条件
唯一Id机制,比较通用的方式。对于每条消息我们都可以生成唯一Id,消费前判断Tair中是否存在(MsgId做Tair排他锁的key),消费成功后将状态写入Tair中,这样就可以防止重复消费了。
对于接口请求类的幂等性保证要相对更复杂,我们通常要求上游请求时传递一个类GUID的请求号(或TOKEN),如果我们发现已经存在了并且上一次请求处理结果是成功状态的(有时候上游的重试请求是正常诉求,我们不能将上一次异常/失败的处理结果返回或者直接提示“请求异常”,如果这样重试就变得没意义了)则不继续往下执行,直接返回“重复请求”的提示和上次的处理结果(上游通常是由于请求超时等未知情况才发起重试的,所以直接返回上次请求的处理结果就好了)。如果请求ID都不存在或者上次处理结果是失败/异常的,那就继续处理流程,并最终记录最终的处理结果。这个请求序号由上游自己生成,上游通用需要根据请求参数、时间间隔等因子来生成请求ID。同样也需要利用这个请求ID做分布式锁的KEY实现排他。
2.2.2 消息可靠性分析
在使用任何消息中间件的过程中,难免会出现消息丢失等异常情况,这个时候就需要有一个良好的机制来跟踪记录消息的过程(轨迹溯源),帮助我们排查问题。
在RabbitMQ 中可以使用Firehose 功能来实现消息追踪,Firehose 可以记录每一次发送或者消费消息的记录,方便RabbitMQ 的使用者进行调试、排错等。
Firehose 的原理是将生产者投递给RabbitMQ 的消息,或者RabbitMQ 投递给消费者的消息按照指定的格式发送到默认的交换器上。这个默认的交换器的名称为 amq.rabbitmq.trace ,它是一个topic 类型的交换器。发送到这个交换器上的消息的路由键为 publish.{exchangename} 和 deliver.{queuename} 。其中 exchangename 和 queuename 为交换器和队列的名称,分别对应生产者投递到交换器的消息和消费者从队列中获取的消息。
开启Firehose命令:rabbitmqctl trace_on [-p vhost]
其中[-p vhost]是可选参数,用来指定虚拟主机vhost。对应的关闭命令为rabbitmqctl trace_off [-p vhost]
Firehose 默认情况下处于关闭状态,并且Firehose 的状态是非持久化的,会在RabbitMQ服务重启的时候还原成默认的状态。Firehose 开启之后多少会影响RabbitMQ整体服务性能,因为它会引起额外的消息生成、路由和存储。
rabbitmq_tracing 插件相当于Firehose 的GUI 版本,它同样能跟踪RabbitMQ 中消息的流入流出情况。rabbitmq_tracing 插件同样会对流入流出的消息进行封装,然后将封装后的消息日志存入相应的trace文件中。可以使用rabbitmq-plugins enable rabbitmq_tracing
命令来启动rabbitmq_ tracing 插件,使用rabbitmq-plugins disable rabbitmq_tracing
命令关闭该插件
2.2.3 TTL
RabbitMQ 可以对消息和队列两个维度来设置TTL。
任何消息中间件的容量和堆积能力都是有限的,如果有一些消息总是不被消费掉,那么需要有一种过期的机制来做兜底。
目前有两种方法可以设置消息的TTL。
通过Queue属性设置,队列中所有消息都有相同的过期时间。
对消息自身进行单独设置,每条消息的TTL 可以不同。
如果两种方法一起使用,则消息的TTL 以两者之间较小数值为准。通常来讲,消息在队列中的生存时间一旦超过设置的TTL 值时,就会变成“死信”(Dead Message),消费者默认就无法再收到该消息。当然,“死信”通过死信队列也是可以被取出来消费的。
如果不设置TTL,则表示此消息不会过期;如果TTL设置为0,则表示除非此时可以直接将消息投递到消费者,否则该消息会被立即丢弃;
2.2.4 死信队列
在定义业务队列时可以考虑指定一个 死信交换机,并绑定一个死信队列。当消息变成死信时,该消息就会被发送到该死信队列上,这样方便我们查看消息失败的原因。
DLX,全称为Dead-Letter-Exchange,死信交换器。消息在一个队列中变成死信(Dead Letter)之后,被重新发送到一个特殊的交换器(DLX)中,同时,绑定DLX的队列就称为“死信队列”。
以下几种情况导致消息变为死信:
消息被拒绝(Basic.Reject/Basic.Nack),并且设置requeue参数为false;
消息过期;
队列达到最大长度。
对于RabbitMQ 来说,DLX 是一个非常有用的特性。它可以处理异常情况下,消息不能够被消费者正确消费(消费者调用了Basic.Nack 或者Basic.Reject)而被置入死信队列中的情况,后续分析程序可以通过消费这个死信队列中的内容来分析当时所遇到的异常情况,进而可以改善和优化系统。
2.2.5 延迟队列
延迟消息是指的消息发送出去后并不想立即就被消费,而是需要等(指定的)一段时间后才触发消费。
在AMQP协议和RabbitMQ中都没有相关的规定和实现。可以借助“死信队列”来变相的实现也可以使用rabbitmq_delayed_message_exchange插件实现。
插件和TTL方式有个很大的不同就是TTL存放消息在死信队列(delayqueue)里,基于插件存放消息在延时交换机里(x-delayed-message exchange)。
插件实现的流程
- 生产者将消息(msg)和路由键(routekey)发送指定的延时交换机(exchange)上
- 延时交换机(exchange)存储消息等待消息到期根据路由键(routekey)找到绑定自己的队列(queue)并把消息给它
- 队列(queue)再把消息发送给监听它的消费者(customer)
2.3 RabbitMQ部署方式简介
对于无状态应用(如普通的微服务)很容易实现负载均衡、高可用集群。而对于有状态的系统(如数据库等)就比较复杂。
部署方案
主备模式
单活,容量对等,可以实现故障转移。使用独立存储时需要借助复制、镜像同步等技术,数据会有延迟、不一致等问题(CAP定律),使用共享存储时就不会有状态同步这个问题。
主从模式
一定程度的双活,容量对等,最常见的是读写分离。通常也需要借助复制技术,或者要求上游实现双写来保证节点数据一致。
主主模式
两边都可以读写,互为主备。如果两边同时写入很容易冲突,所以通常实现的都是“伪主主模式”,或者说就是主从模式的升级版,只是新增了主从节点的选举和切换。
分片集群
不同节点保存不同的数据,上游应用或者代理节点做路由,突破存储容量限制,分摊读写负载;典型的如MongoDB的分片、MySQL的分库分表、Redis集群。
异地多活
“两地三中心”是金融行业经典的容灾模式(有资源闲置的问题),“异地多活”才是王道。
常用负载均衡算法
- 随机
- 轮询
- 加权轮询
- 最少活跃连接
- 原地址/目标地址hash(一致性hash)
集群中的经典问题
脑裂(可以通过协调器选举算法、仲裁节点等方式来解决)
网络分区、一致性、可用性(CAP)
RabbitMQ分布式架构模式
主备模式
也叫Warren(兔子窝)模式,同一时刻只有一个节点在工作(备份节点不能读写),当主节点发生故障后会将请求切换到备份节点上(主恢复后成为备份节点)。需要借助HAProxy之类的(VIP模式)负载均衡器来做健康检查和主备切换,底层需要借助共享存储(如SAN设备)。
这不是RabbitMQ官方或者开源社区推荐方案,适用于访问压力不是特别大但是又有高可用架构需求(故障切换)的中小规模的系统来使用。首先有一个节点闲置,本身就是资源浪费,其次共享存储往往需要借助硬件存储,或者分布式文件系统
Shovel铲子模式
Shovel是一个插件,用于实现跨机房数据复制,或者数据迁移,故障转移与恢复等。
用户下单的消费先是投递在Goleta Broker实例中,当Goleta实例达到触发条件后(例如:消息堆积数达到阈值)会将消息放到Goleta实例的backup_orders备份队列中,并通过Shovel插件从Goleta的backup_orders队列中将消息拉取到Carpinteria实例存储。
使用Shovel插件后,模型变成了近端同步确认,远端异步确认的方式。此模式支持WAN传输,并且broker实例的RabbitMQ、Erlang版本不要求完全一致。Shovel的配置分静态模式(修改RabbitMQ配置)和动态模式(在控制台直接部署,重启后失效)
集群模式
RabbitMQ集群允许消费者和生产者在RabbitMQ单个节点崩溃的情况下继续运行,并可以通过添加更多的节点来线性扩展消息通信的吞吐量。当失去一个RabbitMQ节点时,客户端能够重新连接到集群中的任何其他节点并继续生产和消费。
RabbitMQ集群中的所有节点都会备份所有的元数据信息,包括:
队列元数据:队列的名称及属性;
交换器:交换器的名称及属性;
绑定关系元数据:交换器与队列或者交换器与交换器之间的绑定关系;
vhost元数据:为vhost内的队列、交换器和绑定提供命名空间及安全属性。
基于存储空间和性能的考虑,RabbitMQ集群中的各节点存储的消息是不同的(有点儿类似分片集群,各节点数据并不是全量对等的),各节点之间同步备份的仅仅是上述元数据以及Queue Owner(队列所有者,就是实际创建Queue并保存消息数据的节点)的指针。当集群中某个节点崩溃后,该节点的队列进程和关联的绑定都会消失,关联的消费者也会丢失订阅信息,节点恢复后(前提是消息有持久化)消息可以重新被消费。虽然消息本身也会持久化,但如果节点磁盘存储设备发生故障那同样会导致消息丢失。
总的来说,集群模式只能保证集群中的某个Node挂掉后应用程序还可以切换到其他Node上继续地发送和消费消息,但并无法保证原有的消息不丢失,所以并不是一个真正意义的高可用集群。
RabbitMQ内置的集群模式,Erlang语言天生具备分布式特性,所以不需要借助类似Zookeeper之类的组件来实现集群(集群节点间使用cookie来进行通信验证,所有节点都必须使用相同的 .erlang.cookie 文件内容),不同节点的Erlang、RabbitMQ版本必须一致。
镜像队列模式
RabbitMQ内置的集群模式有丢失消息的风险,“镜像队列“可以看成是对内置默认集群模式的一种高可用架构的补充。可以将队列镜像(同步)到集群中的其他broker上,相当于是多副本冗余。如果集群中的一个节点失效,队列能自动地切换到集群中的另一个镜像节点上以保证服务的可用性,而且消息不丢失。
在RabbitMQ镜像队列中所谓的master和slave都仅仅是针对某个queue而言的,而不是node。一个queue第一次创建所在的节点是它的master节点,其他节点为slave节点。如果master由于某种原因失效,最先加入的slave会被提升为新的master。
无论客户端请求到达master还是slave,最终数据都是从master节点获取。
当请求到达master节点时,master节点直接将消息返回给client,同时master节点会通过GM(Guaranteed Multicast)议将queue的最新状态广播到slave节点。GM保证了广播消息的原子性,即要么都更新要么都不更新。
当请求到达slave节点时,slave节点需要将请求先重定向到master节点,master节点将消息返回给client,同时master节点会通过GM协议将queue的最新状态广播到slave节点。
RabbitMQ镜像队列中的master、slave是Queue维度而并非Node维度,所以可以交叉减少资源限制,如下图所示:
Federation联邦模式
Federation和Shovel类似,也是一个实现跨集群、节点消息同步的插件。支持联邦交换器、联邦队列(作用在不同级别)。
Federation插件允许你配置一个exchanges federation或者queues federation。
一个exchange/queues federation允许你从一个或者多个upstream接收信息,就是远程的exchange/queues。
无论是Federation还是Shovel都只是解决消息数据传输的问题(当然插件自身可能会一些应用层的优化),跨机房跨城市的这种网络延迟问题是客观存在的,不是简单的通过什么插件可以解决的,一般需要借助昂贵的专线。
很多书籍和文章中存在误导大家的,可能会说Federation/Shovel可以解决延迟的问题,可以实现异地多活等等,其实这都是错误的。
例如:使用Shovel构建集群,RabbitMQ和应用程序都选择双机房部署时,当杭州机房发生了消息积压后超出阈值部分的消息就会被转发到上海机房中,此时上海机房的应用程序直接消费掉上海机房RabbitMQ的消息,这样看起来上海机房是可以分摊负载,而且一定程度上实现“双机房多活”的。但是数据库呢?选择两边都部署还是仅部署在某个机房呢?两边同时写入是很容易造成冲突的,如果数据库仅仅部署在杭州机房,那么数据库也可能成为瓶颈导致消费速度依然上不去,只不过是多了上海机房中的消费者实例节点而已。
而使用Federation模式呢?如果要真正要实现“双机房多活”那么应用程序也是多机房的,那某些Exchange/Queue中的消息会在两边机房都有,两边机房的应用程序都会同时消费消息,那必然会造成重复消息!
异地多活架构
方案 容量 容灾 成本 异地多活 [优]基于逻辑机房,容量可伸缩的云微架构,容量可异地伸缩 [优]日常运行,容灾时可用性高。
[劣]受城际网络故障影响,影响度取决于横向依赖程度[优]IDC、应用等成本在日常得到有效利用 两地三中心 [劣]仅可部署在一个城市,容量伸缩有城市级瓶颈 [劣]灾备设施冷备等待,容灾时可用性低。 [劣]容灾设施等成本仅在容灾时才使用,且受限于可用性