消息中间件

felix.shao2025-04-26消息中间件

消息中间件

概念

消息中间件是什么(什么是消息队列?)

 消息:应用间传送的数据。如文本字符串、JSON、对象等。
 消息中间件定义(Message Queue Middleware,简称为 MQ):利用高效可靠的消息传递机制进行与平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。
 消息中间件也可以理解为一个使用队列来通信的组件。它的本质,就是个转发器,包含发消息、存消息、消费消息的过程。最简单的消息队列模型如下。

message_queue.png

 消息中间件一般有两种传递模式。

  1. 点对点(P2P,Point-to-Point)模式:基于队列的,消息生成者发送消息到队列,消息消费者从队列中接收消息。
  2. 发布/订阅(Pub/Sub)模式:定义了如何向一个内容节点发布和订阅消息,这个内容节点称为主题(topic),主题可以认为是消息传递的中介,消息发布者将消息发布到某个主题,而消息订阅者则从主题中订阅消息。一般在消息的一对多广播时采用。

 我们通常说的消息队列,简称 MQ(Message Queue),它其实就指消息中间件,当前业界比较流行的开源消息中间件包括:RabbitMQ、RocketMQ、Kafka。

消息中间件的作用
  • 解耦
  • 冗余(存储)
  • 扩展性
  • 削峰
  • 可恢复性
  • 顺序保证
  • 缓冲
  • 异步通信
消息队列怎么选型?

 特性对比表格如下,数据来源于 DeepSeek 搜索。

参数ActiveMQRabbitMQRocketMQKafka
吞吐量中等(约 1k-10k msg/s)中等(约 10k-50k msg/s)高(约 50k-100k msg/s)极高(100k+ msg/s)
延迟毫秒级(10-100ms)微秒级(<1ms)- 毫秒级毫秒级(1-10ms)毫秒级(2-20ms)
持久化性能中等(依赖存储引擎)中等(需配置优化)高(基于文件顺序写)极高(顺序 I/O)
单机队列容量受限于内存/磁盘受限于内存/磁盘高(百亿级消息)极高(万亿级消息)
集群扩展性一般(主从模式)中等(镜像队列有瓶颈)高(天然分布式)极高(分区+副本)
协议支持多协议(OpenWire, STOMP等)AMQP自定义协议自定义协议
消息顺序保证单队列有序严格顺序(分区内)严格顺序(分区内)
事务支持支持支持(性能较低)支持支持(但推荐批量)
消息回溯不支持不支持支持(按时间/偏移量)支持(按偏移量)
消费者模型Push/PullPushPullPull
生态集成一般(Java 生态为主)广泛(多语言支持)阿里生态(国内为主)极广泛(大数据场景)

 关键说明。

  • 吞吐量:Kafka 和 RocketMQ 在批量消息场景下表现最佳,RabbitMQ 适合中小规模实时消息。
  • 延迟:RabbitMQ 在低延迟场景(如金融交易)表现最优,Kafka/RocketMQ 更适合高吞吐容忍稍高延迟的场景。
  • 扩展性:Kafka 和 RocketMQ 的分布式架构更适合水平扩展,ActiveMQ/RabbitMQ 垂直扩展有限。
  • 可靠性:四者均支持持久化,但 Kafka/RocketMQ 的副本机制和刷盘策略更适应高可靠需求。

 适用场景。

  • ActiveMQ:传统企业应用,多协议支持。官方社区现在对 ActiveMQ 5.x 维护越来越少,较少在大规模吞吐的场景中使用。
  • RabbitMQ:实时性要求高的业务(如支付通知)。结合 erlang 语言本身的并发优势,性能较好,社区活跃度也比较高,但是不利于做二次开发和维护,不过 RabbitMQ 的社区十分活跃,可以解决开发过程中遇到的 bug。如果你的数据量没有那么大,小公司优先选择功能比较完备的 RabbitMQ。
  • RocketMQ:电商/金融等高并发、顺序消息场景。天生为金融互联网领域而生,对于可靠性要求很高的场景 ,尤其是电商里面的订单扣款,以及业务削峰,在大量交易涌入时,后端可能无法及时处理的情况。RocketMQ 在稳定性上可能更值得信赖,这些业务场景在阿里双 11 已经经历了多次考验,如果你的业务有上述并发场景,建议可以选择 RocketMQ。
  • Kafka:追求高吞吐量,一开始的目的就是用于日志收集和传输,适合产生大量数据的互联网服务的数据收集业务 ,大型公司建议可以选用,如果有日志采集功能,肯定是首选 Kafka。
消息队列使用场景有哪些?
  • 异步&解耦。
  • 削峰。
  • 消息总线。所谓总线,就是像主板里的数据总线一样, 具有数据的传递和交互能力,各方不直接通信,使用总线(MQ)作为标准通信接口。
  • 延时任务。
  • 广播消费。
  • 分布式事务。
  • 数据中转枢纽。典型的就是 ELK 的 Kafka 日志采集功能。
消息重复消费怎么解决?

 生产端为了保证消息发送成功,可能会重复推送(直到收到成功 ACK),会产生重复消息。但是一个成熟的 MQ Server 框架一般会想办法解决,避免存储重复消息(比如:空间换时间,存储已处理过的message_id),给生产端提供一个幂等性的发送消息接口。
 但是消费端却无法根本解决这个问题,在高并发标准要求下,拉取消息+业务处理+提交消费位移需要做事务处理,另外消费端服务可能宕机,很可能会拉取到重复消息。

消息丢失怎么解决的?

 使用一个消息队列,其实就分为三大块:生产者、中间件、消费者,所以要保证消息就是保证三个环节都不能丢失数据。
message_queue.png

  • 消息生产阶段:生产者会不会丢消息,取决于生产者对于异常情况的处理是否合理。从消息被生产出来,然后提交给 MQ 的过程中,只要能正常收到 ( MQ 中间件) 的 ack 确认响应,就表示发送成功,所以只要处理好返回值和异常,如果返回异常则进行消息重发,那么这个阶段是不会出现消息丢失的。
  • 消息存储阶段:Kafka 在使用时是部署一个集群,生产者在发布消息时,队列中间件通常会写「多个节点」,也就是有多个副本,这样一来,即便其中一个节点挂了,也能保证集群的数据不丢失。
  • 消息消费阶段:消费者接收消息+消息处理之后,才回复 ack 的话,那么消息阶段的消息不会丢失。不能收到消息就回 ack,否则可能消息处理中途挂掉了,消息就丢失了。
使用消息队列还应该注意哪些问题?

 需要考虑消息可靠性和顺序性方面的问题。

消息队列的可靠性、顺序性怎么保证?

 消息可靠性可以通过下面这些方式来保证。

  • 消息持久化:确保消息队列能够持久化消息是非常关键的。在系统崩溃、重启或者网络故障等情况下,未处理的消息不应丢失。例如,像 RabbitMQ 可以通过配置将消息持久化到磁盘,通过将队列和消息都设置为持久化的方式(设置 durable = true),这样在服务器重启后,消息依然可以被重新读取和处理。
  • 消息确认机制:消费者在成功处理消息后,应该向消息队列发送确认(acknowledgment)。消息队列只有收到确认后,才会将消息从队列中移除。如果没有收到确认,消息队列可能会在一定时间后重新发送消息给其他消费者或者再次发送给同一个消费者。以 Kafka 为例,消费者通过 commitSync 或者 commitAsync 方法来提交偏移量(offset),从而确认消息的消费。
  • 消息重试策略:当消费者处理消息失败时,需要有合理的重试策略。可以设置重试次数和重试间隔时间。例如,在第一次处理失败后,等待一段时间(如 5 秒)后进行第二次重试,如果重试多次(如 3 次)后仍然失败,可以将消息发送到死信队列,以便后续人工排查或者采取其他特殊处理。

 消息顺序性保证的方式如下。

  • 有序消息处理场景识别:首先需要明确业务场景中哪些消息是需要保证顺序的。例如,在金融交易系统中,对于同用户的转账操作顺序是不能打乱的。对于需要顺序处理的消息,要确保消息队列和消费者能够按照特定的顺序进行处理。
  • 消息队列对顺序性的支持:部分消息队列本身提供了顺序性保证的功能。比如 Kafka 可以通过将消息划分到同一个分区(Partition)来保证消息在分区内是有序的,消费者按照分区顺序读取消息就可以保证消息顺序。但这也可能会限制消息的并行处理程度,需要在顺序性和吞吐量之间进行权衡。
  • 消费者顺序处理策略:消费者在处理顺序消息时,应该避免并发处理可能导致顺序打乱的情况。例如,可以通过单线程或者使用线程池并对顺序消息进行串行化处理等方式,确保消息按照正确的顺序被消费。
如何保证幂等写?

 幂等性是指同一操作的多次执行对系统状态的影响与一次执行结果一致。例如,支付接口若因网络重试被多次调用,最终应确保仅扣款一次。实现幂等写的核心方案。

  • 唯一标识(幂等键):客户端为每个请求生成全局唯一 ID(如 UUID、业务主键),服务端校验该 ID 是否已处理,适用场景接口调用、消息消费等。
  • 数据库事务 + 乐观锁:通过版本号或状态字段控制并发更新,确保多次更新等同于单次操作,适用场景数据库记录更新(如余额扣减、订单状态变更)。
  • 数据库唯一约束:利用数据库唯一索引防止重复数据写入,适用场景数据插入场景(如订单创建)。
  • 分布式锁:通过锁机制保证同一时刻仅有一个请求执行关键操作,适用场景高并发下的资源抢夺(如秒杀)。
  • 消息去重:消息队列生产者为每条消息生成唯一的消息 ID,消费者在处理消息前,先检查该消息 ID 是否已经处理过,如果已经处理过则丢弃该消息。
如何处理消息队列的消息积压问题?

 消息积压是因为生产者的生产速度,大于消费者的消费速度。遇到消息积压问题时,我们需要先排查,是不是有 bug 产生了。
 如果不是 bug,我们可以优化一下消费的逻辑,比如之前是一条一条消息消费处理的话,我们可以确认是不是可以优为批量处理消息。如果还是慢,我们可以考虑水平扩容,增加Topic的队列数,和消费组机器的数量,提升整体消费能力。
 如果是 bug 导致几百万消息持续积压几小时。有如何处理呢?需要解决 bug,临时紧急扩容,大概思路如下。

  1. 先修复 consumer 消费者的问题,以确保其恢复消费速度,然后将现有 consumer 都停掉。
  2. 新建一个 topic,partition 是原来的 10 倍,临时建立好原先 10 倍的 queue 数量。
  3. 然后写一个临时的分发数据的 consumer 程序,这个程序部署上去消费积压的数据,消费之后不做耗时的处理,直接均匀轮询写入临时建立好的 10 倍数量的 queue。
  4. 接着临时征用 10 倍的机器来部署 consumer,每一批 consumer 消费一个临时 queue 的数据。这种做法相当于是临时将 queue 资源和 consumer 资源扩大 10 倍,以正常的 10 倍速度来消费数据。
  5. 等快速消费完积压数据之后,得恢复原先部署的架构,重新用原先的 consumer 机器来消费消息。
如何保证数据一致性,事务消息如何实现?

 一条普通的 MQ 消息,从产生到被消费,大概流程如下。
message_consumer_process.png

  1. 生产者产生消息,发送给 MQ 服务器。
  2. MQ 收到消息后,将消息持久化到存储系统。
  3. MQ 服务器返回 Ack 到生产者。
  4. MQ 服务器把消息 push 给消费者。
  5. 消费者消费完消息,响应 ACK。
  6. MQ 服务器收到 ACK,认为消息消费成功,即在存储中删除消息。

 我们举个下订单的例子吧。订单系统创建完订单后,再发送消息给下游系统。如果订单创建成功,然后消息没有成功发送出去,下游系统就无法感知这个事情,出导致数据不一致。
 如何保证数据一致性呢?可以使用事务消息。一起来看下事务消息是如何实现的吧。

transaction_message.png
  1. 生产者产生消息,发送一条半事务消息到 MQ 服务器。
  2. MQ 收到消息后,将消息持久化到存储系统,这条消息的状态是待发送状态。
  3. MQ 服务器返回 ACK 确认到生产者,此时 MQ 不会触发消息推送事件。
  4. 生产者执行本地事务。
  5. 如果本地事务执行成功,即 commit 执行结果到 MQ 服务器;如果执行失败,发送 rollback。
  6. 如果是正常的 commit,MQ 服务器更新消息状态为可发送;如果是 rollback,即删除消息。
  7. 如果消息状态更新为可发送,则 MQ 服务器会 push 消息给消费者。消费者消费完就回 ACK。
  8. 如果 MQ 服务器长时间没有收到生产者的 commit 或者 rollback,它会反查生产者,然后根据查询到的结果执行最终状态。
消息队列是参考哪种设计模式?

 是参考了观察者模式和发布订阅模式,两种设计模式思路是一样的。

  • 观察者模式。
    • 观察者模式实际上就是一个一对多的关系,在观察者模式中存在一个主题和多个观察者,主题也是被观察者,当我们主题发布消息时,会通知各个观察者,观察者将会收到最新消息。
  • 发布订阅模式。
    • 发布订阅模式和观察者模式的区别就是发布者和订阅者完全解耦,通过中间的发布订阅中心进行消息通知,发布者并不知道自己发布的消息会通知给谁,因此发布订阅模式有三个重要角色,发布者->发布订阅中心->订阅者。
让你写一个消息队列,该如何进行架构设计?

 这个问题面试官主要考察三个方面的知识点。

  • 你有没有对消息队列的架构原理比较了解。
  • 考察你的个人设计能力。
  • 考察编程思想,如什么高可用、可扩展性、幂等等等。

 遇到这种设计题,大部分人会很蒙圈,因为平时没有思考过类似的问题。大多数人平时埋头增删改啥,不去思考框架背后的一些原理。有很多类似的问题,比如让你来设计一个 Dubbo 框架,或者让你来设计一个MyBatis 框架,你会怎么思考呢?
 回答这类问题,并不要求你研究过那技术的源码,你知道那个技术框架的基本结构、工作原理即可。设计一个消息队列,我们可以从这几个角度去思考。
design_message_queue.png

  1. 首先是消息队列的整体流程,producer 发送消息给 broker,broker 存储好,broker 再发送给 consumer 消费,consumer 回复消费确认等。
  2. producer 发送消息给 broker,broker 发消息给 consumer 消费,那就需要两次 RPC了,RPC 如何设计呢?可以参考开源框架 Dubbo,你可以说说服务发现、序列化协议等等。
  3. broker 考虑如何持久化呢,是放文件系统还是数据库呢,会不会消息堆积呢,消息堆积如何处理呢。
  4. 消费关系如何保存呢?点对点还是广播方式呢?广播关系又是如何维护呢?zk 还是 config server。
  5. 消息可靠性如何保证呢?如果消息重复了,如何幂等处理呢?
  6. 消息队列的高可用如何设计呢?可以参考 Kafka 的高可用保障机制。多副本 -> leader & follower -> broker 挂了重新选举 leader 即可对外服务。
  7. 消息事务特性,与本地业务同个事务,本地消息落库;消息投递到服务端,本地才删除;定时任务扫描本地消息库,补偿发送。
  8. MQ 得伸缩性和可扩展性,如果消息积压或者资源不够时,如何支持快速扩容,提高吞吐?可以参照一下 Kafka 的设计理念,broker -> topic -> partition,每个 partition 放一个机器,就存一部分数据。如果现在资源不够了,简单啊,给 topic 增加 partition,然后做数据迁移,增加机器,不就可以存放更多数据,提供更高的吞吐量了。
Last Updated 4/27/2025, 11:08:56 AM
ON THIS PAGE