9 RabbitMQ 高阶

felix.shao2025-02-18

9 RabbitMQ 高阶

TIP

 本小节主要介绍以下知识:

  • 相关原理。

存储机制

 RabbitMQ 的“持久层”是一个逻辑上的概念,其实有持久化的消息(磁盘上)和非持久化的消息(内存中)。它实际包含两个部分:

  • 队列索引(rabbit_queue_index):负责维护队列中落盘消息的信息,包括消息的存储地点、是否已被交付给消费者、是否已被消费者 ack 等。每个队列都有与之对应的一个 rabbitmq_queue_index。
  • 消息存储(rabbit_msg_store):以键值对的形式存储消息,它被所有队列共享,在每个节点中有且只有一个。  从技术层面上来说,rabbit_msg_store 具体还可以分为 msg_store_persistent 和 msg_store_transient,msg_store_persistent 负责持久化消息的持久化,重启后消息不会丢失;msg_store_transient 负责非持久化消息的持久化,重启后消息会丢失。通过情况下,习惯性地讲 msg_store_persistent 和 msg_store_transient 堪称 rabbit_msg_store 这样一个整体。  消息(包括消息体、属性和 headers)可以直接存储在 rabbit_queue_index 中,也可以被保存在 rabbit_msg_store 中。默认在 $RABBITMQ_HOME/var/lib/mnesia/rabbit@HOSTNAME/ 路径下包含 queues、msg_store_persistent、msg_store_transient 这 3 个文件夹。  最佳的配备是较小的消息存储在 rabbit_queue_index 中而较大的消息存储在 rabbit_msg_store 中。这个消息大小的界定可以通过 queue_index_embed_msgs_below 来配置,单位为 B。注意这里的消息大小是指消息体、属性及 headers 整体的大小。

 rabbit_queue_index 中以顺序(文件名从 0 开始累加)的段文件来进行存储,后缀为“.idx”,每个段文件中包含固定的 SEGMENT_ENTRY_COUNT 条记录,SEGMENT_ENTRY_COUNT 默认值为 16384。每个 rabbit_queue_index 从磁盘中读取消息的时候至少要在内存中维护一个段文件,所以设置 queue_index_embed_msgs_below 值的时候要格外谨慎,一点点增大也可能会引起内存爆炸式的增长。  经过 rabbit_msg_store 处理的所有消息都会以追加的方式写入到文件中,当一个文件的大小超过指定的限制(file_size_limit)后,关闭这个文件再创建一个新的文件以供新的消息写入。文件名(文件后缀是“.rdq”)从 0 开始进行累加,因此文件名最小的文件也是最老的文件。在进行消息的存储时,RabbitMQ 会在 ETS(Erlang Term Storage)表中记录消息在文件中的为止映射(Index)和文件的相关消息(FileSummary)。  在读取消息的时候,现根据消息的 ID(msg_id)找到对应存储的文件,如果文件存在并且未被锁住,则直接打开文件,从指定位置读取消息的内容。如果文件不存在或者被锁住了,则发送请求由 rabbit_msg_store 进行处理。  消息的删除只是从 ETS 表删除指定消息的相关信息,同时更新消息对应的存储文件的相关信息。执行消息删除操作时,并不立即对在文件中的消息进行删除,仅仅是标记为垃圾数据。当一个文件中都是垃圾数据时可以将这个文件删除。当检测到前后两个文件中的有效数据可以合并在一个文件中,并且所有的垃圾数据的大小和所有文件(至少有 3 个文件存在的情况下)的数据大小的比值超过设置的阈值 GARBAGE_FRACTION(默认值为 0.5)时才会触发垃圾回收将两个文件合并。  执行合并的两个文件一定是逻辑上相邻的两个文件。

队列的结构

 通过队列由 rabbit_amqqueue_process 和 backing_queue 这两部分组成,rabbit_amqqueue_process 负责协议相关的消息处理,即接收生产者发布的消息、向消费者交付消息、处理消息的确认(包括生产端的 confirm 和消费端的 ack)等。backing_queue 是消息存储的具体形式和引擎,并向 rabbit_amqqueue_process 提供相关的接口以供调用。  如果消息投递的目的队列是空的,并且有消费者订阅了这个队列,那么该消息会直接发送给消费者,不会经过队列这一步。而当消息无法直接投递给消费者时,需要暂时将消息存入队列,以便重新投递。消息存入队列后,不是固定不变的,它会随着系统的负载在队列中不断地流动,消息的状态会不断发生变化。RabbitMQ 中的队列消息可能会处于以下 4 种状态。

  • alpha:消息内容(包括消息体、属性和 headers)和消息索引都存储在内存中。
  • beta:消息内容保存至磁盘中,消息索引保存至内存中。
  • gamma:消息内容保存在磁盘中,消息索引在磁盘和内存中都有。
  • delta:消息内容和索引都在磁盘中。

 对于持久化的消息,消息内容和消息索引都必须先保存在磁盘上,才会处于上述状态中的一种,而 gamma 状态的消息是只有持续化的消息才会有的状态。  RabbitMQ 在运行时会根据统计的消息传送速度定期计算一个当前内存中能够保存的最多消息数量(target_ram_count),如果 alpha 状态的消息数量大于此值时,就会引起消息的状态转换,多余的消息可能会转换到 beta 状态、gamma 状态或者 delta 状态。却分这 4 种状态的主要作用是满足不同的内存和 CPU 需求。alpha 状态最耗内存,但很少消耗 CPU。delta 状态需要执行两次 I/O 操作才能读取到消息,一次是读消息索引(从 rabbit_queue_index 中),一次是读消息内容(从 rabbit_msg_store 中);beta 和 gamma 状态都只需要一次 I/O 操作就可以读取到消息(从 rabbit_msg_store 中)。  对于普通的没有设置优先级和镜像的队列来说,backing_queue 的默认实现是 rabbit_variable_queue,其内部通过 5 个子队列 Q1、Q2、Delta、Q3 和 Q4 来体现消息的各个状态。整个队列包括 rabbit_amqqueue_process 和 backing_queue 的各个子队列,队列的结构如下图所示: queue_structure.png

 其中Q1、Q4 只包含 alpha 状态的消息,Q2 和 Q3 包含 beta 和 gamma 状态的消息,Delta 只包含 delta 状态的消息。一般情况下,消息按照图示的方向进行流动,但并不是每一条消息都一定会经历所有的状态,这个取决于当前系统的负载状况。从 Q1 至 Q4 基本经历内存到磁盘,再由磁盘到内存这样的一个过程,如此可以再队列负载很高的情况下,能够通过将一部分消息由磁盘保存来节省内存空间,而在负载降低的时候,这部分消息又渐渐回到内存被消费者获取,使得整个队列具有很好的弹性。  消费者获取消息也会引起消息的状态转换,当消费者获取消息时,首先会从 Q4 中获取消息,如果获取成功则返回。如果 Q4 为空,则尝试从 Q3 中获取消息,系统首先会判断 Q3 是否为空,如果为空则返回队列为空,即此时队列中没有消息。如果 Q3 不为空,则取出 Q3 中的消息,进而再判断此时 Q3 和 Delta 中的长度,如果都为空,则可以认为 Q2、Delta、Q3、Q4 全部为空,此时将 Q1 中的消息直接转移至 Q4,下次直接从 Q4 中获取消息,在将消息从 Delta 转移到 Q3 的过程中,是按照索引分段读取的,首先读取某一段,然后判断读取消息的个数与 Delta 中的消息的个数是否相等,如果相等,则可以判断此时 Delta 中无消息,则直接将刚读取到的消息一并放入到 Q3 中;如果不相等,仅将此次读取到的消息转移到 Q3。

TIP

 两处疑问,答案略。

  1. 为什么 Q3 为空则可以认定整个队列为空?
  2. 为什么 Q3 和 Delta 都为空时,则可以认为 Q2、Delta、Q3、Q4 全部为空?

 在系统负载较高时,已接收到的消息如果不能很快被消费掉,这些消息就会进入到很深的队列中去,这样会增加处理每个消息的平均开销。因为要花更多的时间和资源处理“堆积”的消息,如此用来处理新流入的消息的能力就会降低,使得后流入的消息又被积压到很深的队列中继续增大每个消息的平均开销,继而情况变得越来越恶化,使得系统的处理能力大大降低。  应对这个问题有3种措施:

  1. 增加 prefetch_count 的值,即一次发送多条消息给消费者,加快消息被消费的速度。
  2. 采用multiple ack,降低处理ack带来的开销;
  3. 流量控制。

惰性队列

 RabbitMQ 从 3.6.0 版本开始引入了惰性队列(Lazy Queue)的概念。惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或者是由于维护而关闭等)而致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。
 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存之中,这样可以更加快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间,也会阻塞队列的操作,进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法,但是效果始终不太理想,尤其是在消息量特别大的时候。  惰性队列会将接收到的消息直接存入文件系统中,而不管是持久化的或者是非持久化的,这样可以减少了内存的消耗,但是会增加 I/O 的使用,如果消息是持久化的,那么这样的 I/O 操作不可避免,惰性队列和持久化消息可谓是“最佳拍档”。注意如果惰性队列中存储的是非持久化的消息,内存的使用率会一直很稳定,但是重启之后消息一样会丢失
 队列具备两种模式:default 和 lazy。默认的为 default 模式,在 3.6.0 之前的版本无需做任何变更。lazy 模式即为惰性队列的模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。如果要通过声明的方式改变已有队列的模式的话,那么只能先删除队列,然后再重新声明一个新的。
 设置队列为惰性队列的命令:
rabbitmqctl set_policy Lazy "^myqueue$" '{"queue-mode":"lazy"}' --apply-to queues

 惰性队列和普通队列相比,只有很小的内存开销。
 根据官网测试数据显示,对于普通队列,如果要发送 1 千万条消息,需要耗费 801 秒,平均发送速度约为 13000 条/秒。如果使用惰性队列,那么发送同样多的消息时,耗时是 421 秒,平均发送速度约为 24000 条/秒。出现性能偏差的原因是普通队列会由于内存不足而不得不将消息换页至磁盘。如果有消费者消费时,惰性队列会耗费将近 40 MB 的空间来发送消息,对于一个消费者的情况,平均的消费速度约为 14000 条/秒。
 如果要将普通队列转变为惰性队列,那么我们需要忍受同样的性能损耗。当转变为惰性队列的时候,首先需要将缓存中的消息换页至磁盘中,然后才能接收新的消息。反之,当将一个惰性队列转变为普通队列的时候,和恢复一个队列执行同样的操作,会将磁盘中的消息批量的导入到内存中。

内存及磁盘告警

 当内存使用超过配置的阈值或者磁盘剩余空间地狱配置的阈值时,RabbitMQ 都会暂时阻塞(block)客户端的连接并停止接收客户端发来的消息,以此避免服务崩溃。于此同时,客户端与服务端的心跳检测也会失效,可以通过rabbitmqctl list_connections命令查看,web 也可。  被阻塞的 connection 的有两种状态:

  • blocking: 对应于并不试图发送消息的 connection, 比如消费者关联的 connection, 这种状态下, connection 可以正常运行。
  • blocked: 对应于一直有消息发送的 connection, 这种状态下的 connection 会被停止发送消息。

 注意:

  1. 集群中一个节点的内存或者磁盘受限,会引起整个集群不可用。
  2. 建议生产和消费逻辑分摊到独立的 connection 之上,不发生任何交集。
  3. 客户端可以通过添加 BlockedListener 来监听相应连接阻塞的信息。

内存告警

 可参考如上文献。

磁盘告警

 可参考如上文献。

流控

 RabbitMQ 可以对内存和磁盘使用量设置阈值,当达到阈值侯,生产者将被阻塞(block),直到对应项恢复正常。除了这两个阈值,从 2.8.0 版本开始,RabbitMQ 还引入了流控(Flow Control)机制来确保稳定性。流控机制是用来避免消息的发送速率过快而导致服务器难以支撑的情形。内存和磁盘告警相当于全局的流控(Global Flow Control),一旦触发会阻塞集群中所有的 Connection,本届的流控是针对单个 Connection 的,可以称之为 Per-Connection Flow Control 或者 Internal Flow Control。

流控的原理

 Erlang 进程之间并不共享内存(binary 类型的除外),而是通过消息传递来通信,每个进程都有自己的进程邮箱(mailbox)。默认情况下,Erlang 并没有对进程邮箱的大小进行限制,所以当又大量消息持续发往某个进程时,会导致该进程邮箱过大,最终内存溢出并崩溃。在 RabbitMQ 中,如果生产者持续高速发送,而消费者消费速度较低时,如果没有流控,很快就会使内部进程邮箱的大小达到内存阈值。  RabbitMQ 使用了一种基于信用证算法(credit-based algorithm)的流控机制来限制发送消息的速率以解决前面所提出的问题。它通过监控各个进程的进程邮箱,当某个进程负载过高而来不及处理消息时,这个进程的进程邮箱就会开始堆积消息。当堆积到一定量时,就会阻塞而不接收上有的新消息。从而慢慢地,上游进程的进程邮箱也会开始堆积消息。当堆积到一定量时也会阻塞而停止接收上游的消息,最后就会使负责网络数据包接收的进程阻塞而暂停接收新的数据。  以下图为例说明“信用证算法”:
credit_based_algorithm.png
 进程 A 接收消息并转发至进程 B,进程 B 接收消息并转发至进程 C。每个进程中都有一堆关于收发消息的 credit 值。以进程B为例,{{credit_from,C},value}表示能发送多少条消息给 C,每发送一条消息该值减 1,当为 0 时,进程 B 不再往进程 C 发送消息也不在接收进程 A 的消息。{{credit_to,A},value}表示再接收多少条消息就向进程 A 发送增加 credit 值的通知,进程 A 接收到该通知后就增加{{credit_from,B},value}所对应的值,这样进程 A 就能持续发送消息。当上游发送速率高于下游接收速率时,credit 值就会被逐渐耗光,这时进程就会被阻塞,阻塞的情况一直会传递到最上游。当上游进程收到来自下游进程的增加 credit 值的通知时,若此时上游进程处于阻塞状态则解除阻塞,开始接收更上游进程的消息,一个一个传到最终能够解除最上游的阻塞状态。
 一个连接触发流控时会处于 flow 的状态,也就意味着这个 Connection 的状态每秒在 blocked 和 unblocked 之间来回切换数次,这样可以将消息发送的速率控制在服务器能够支撑的范围之内。可以通过rabbitmqctl list_connections命令或者 Web 管理界面来查看 Connection 的状态。  处于 flow 状态的 Connection 和处于 running 状态的 Connection 并没有什么不同,这个状态只是告诉系统管理员相应的发送速率受限了。而对于客户端而言,它看到的知识服务器的带宽要比正常情况下要小一些。
 流控机制不只是作用于 Connection,同样作用于信道和队列。从 Connection 到 Channel,再到队列,最后是消息持久化存储形成一个完整的流控链,对于处于整个流控链中的任意进程,只要该进程阻塞,上游的进程必定全部被阻塞。也就是说,如果某个进程达到性能瓶颈,必然会导致上游所有的进程被阻塞。处理消息的几个关键进程及其对应的顺序关系如下图:
flow_control_chain.png

  • rabbit_reader:Connection 的处理进程,负责接收、解析 AMQP 协议数据包等。
  • rabbit_channel:Channel 的处理进程,负责处理 AMQP 协议的各种方法、进行路由解析等。
  • rabbit_amqqueue_process:队列的处理进程,负责实现队列的所有逻辑。
  • rabbit_msg_store:负责实现消息的持久化。

案例:打破队列的瓶颈

 略,直接看书吧。  一些杂七杂八记录的笔记要点如下:

  • 如何提升队列的性能?  第一种是开启 Erlang 语言的 HiPE 功能。  第二种是寻求打破 rabbit_amqqueue_process 的性能瓶颈(其实就是封装了多个队列来处理)。

镜像队列

 单节点集群和多节点集群遇到节点故障时,会导致其服务不可用或对应的队列不可用。引入镜像队列(Mirror Queue)的机制,可以将队列镜像到集群中的其他 Broker 节点之上,如果集群中的一个节点失效了,队列能自动地切换到镜像中的另一个节点上以保证服务的可用性。在通常的用法中,针对每一个配置镜像的队列都包含一个主节点和若干个从节点(类似主副数据冗余存储这一套逻辑)。
 slave 会准确地按照 master 执行命令的顺序进行动作,故 slave 与 master 上维护的状态应该是相同的。如果 master 由于某种原因失效,那么“资历最老”的 slave 会被提升为新的 master。根据 slave 加入的时间排序,时间最长的 slave 即为“资历最老”。发送到镜像队列的所有消息会被同时发往 master 和 所有的 slave 上,如果此时 master 挂掉了,消息还会在 slave 上,这样 slave 提升为 master 的时候消息也不会丢失。除发送消息(Basic.Publish)外的所有动作都只会向 master 发送,然后再由 master 将命令执行的结果广播给各个 slave。
 如果消费者与 slave 建立连接并进行订阅消费,其实质上都是从 master 上获取消息,只不过看似是从 slave 上消费而已。另外这里的负载均衡是通过队列均匀地散落在集群的各个 Broker 节点中实现的。

TIP

 RabbitMQ 的镜像队列同时支持 publisher confirm 和事务两种机制。在事务机制中,只有当前事务在全部镜像中执行之后,客户端才会收到 Tx.Commit-Ok 的消息。同样的,在 publisher confirm 机制中,生产者进行当前消息确认的前提是该消息被全部进行接收了。

 镜像队列的结构和原理见小节开始的参考文献。

参考文献

Last Updated 2/18/2025, 5:05:12 PM