5张图带你理解 RocketMQ 顺序消息实现机制( 三 )

  • 获取 ProcessQueue 锁,这样保证了只有当前线程可以进行消息处理,同时也可以防止 Rebalance 线程把当前处理的 MessageQueue 移除掉;
  • 执行消费处理逻辑;
  • 释放 ProcessQueue 处理锁;6.processConsumeResult 方法更新消息偏移量 。
  • 注意:ProcessQueue 中的锁是 ReentrantLock 。
    3 重试跟并发消息不一样的是,顺序消息消费失败后并不会把消息发送到 Broker,而是直接在 Consumer 端进行重试,如果重试次数超过了最大重试次数(16 次),则发送到 Broker,Broker 则将消息推入死信队列 。如下图:
    5张图带你理解 RocketMQ 顺序消息实现机制

    文章插图
     
    4 总结RocketMQ 顺序消息的原理是在 Producer 端把一批需要保证顺序的消息发送到同一个 MessageQueue,Consumer 端则通过加锁的机制来保证消息消费的顺序性,Broker 端通过对 MessageQueue 进行加锁,保证同一个 MessageQueue 只能被同一个 Consumer 进行消费 。
    根据实现原理可以看到,RocketMQ 的顺序消息可能存在两个坑:
    1. 有顺序性的消息需要发送到同一个 MessageQueue,可能导致单个 MessageQueue 消息量很大,而 Consumer 端消费的时候只能单线程消费,很可能导致当前 MessageQueue 消息积压;
    2. 如果顺序消息 MessageQueue 所在的 broker 挂了,这时 Producer 只能把消息发送到其他 Broker 的 MessageQueue 上,而如果新的 MessageQueue 被其他 Consumer 消费,这样两个 Consumer 消费的消息就不能保证顺序性了 。如下图:

    5张图带你理解 RocketMQ 顺序消息实现机制

    文章插图
     
    【5张图带你理解 RocketMQ 顺序消息实现机制】Broker1 发生故障,把订单出库的消息发送到了 Broker2,由 Consumer2 来进行消费,消息顺序很可能会错乱 。




    推荐阅读