好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读( 三 )

存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同 。
整个消息重试的逻辑示意图如下所示:

好奇延时消息如何实现吗?来看RocketMQ是怎么做的-源码解读

文章插图
 
如图所示
  1. Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息 。
  2. 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker 。
  3. broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中 。
  4. ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程 。
可以对比之前的延时消息流程去看,其实重试消息就是将失败的消息当作延时消息进行处理,只不过最后投入的是专门的重试消息队列中 。
四、总结延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的 。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的 。




推荐阅读