存储消息的时候,CommitLog.putMessage方法内部判断如果设置了delayLevel,就会重设topic为SCHEDULE_TOPIC,然后将消息存储到延时队列中,后续就是和ScheduleMessageService的逻辑相同 。
整个消息重试的逻辑示意图如下所示:
文章插图
如图所示
- Consumer在消费的时候,都会订阅指定的TOPIC-NORMAL_TOPIC和该ConsumerGroup对应的重试TOPIC-RETRY_GROUP1_TOPIC,同时消费来自这两个topic中的消息 。
- 当发生消费失败后,Client端会调用sendMessageBack方法将失败消息发送回broker 。
- broker端的SendMessageProcessor会根据当前的重试次数确定延时级别,将消息存入延时队列-SCHEDULE_TOPIC中 。
- ScheduleMessageService会将到期的消息重新发送到重试TOPIC-RETRY_GROUP1_TOPIC中,这个时候消息被Consumer消费,就完成了整个重试过程 。
四、总结延时消息都是非常日常业务使用中很重要的功能,而RocketMQ通过时间片分级+多队列+定时任务,就实现了这样的功能,设计上是很巧妙的 。并且消费重试采用退避式的策略,重试时间的梯度刚好与延时消息策略一致,这样就可以直接利用延时队列去完成消息重试的功能,从策略上来说非常合理(消息消费重复失败,在短时间内重试成功的可能性比较低),并且复用了底层代码,这些是值得去学习和借鉴的 。
推荐阅读
- 如何搜索 WhatsApp 聊天消息
- 保时捷|坏消息:16-24岁年轻人,近两成找不到工作!该为自己找条出路了
- ?延时喷剂喷多了会怎样
- 高通|全新自研CPU架构 消息称高通将杀回服务器芯片市场
- 腾讯|领先微信!手机QQ iOS版8.9.5更新发布:视频消息能编辑了
- 考试|研究生迎来好消息,下半年这些编制岗位开始招人,有免笔试岗位
- 安卓|没悬念!iPhone 14真要涨价了:消息人士称安卓性价比竟输苹果
- 智能手环|华为手环7推送固件更新:iOS手机消息通知获优化
- 机器人300024最新消息 机器人板块股票
- 特斯拉|不老男神迎好消息!林志颖顺利完成颜面手术 有消息称特斯拉曾私下询问病情
