下面看一下顺序消息的消费端处理逻辑 。
2.1 注册监听上面的代码定义了顺序消息监听器 MessageListenerOrderly,并且注册到 DefaultMQPushConsumer,这个注册同时也注册到了 DefaultMQPushConsumerImpl 。
2.2 PushConsumer 初始化在 DefaultMQPushConsumerImpl 类初始化的时候,会判断注册的 MessageListener 是不是 MessageListenerOrderly,如果是,就把 consumeOrderly 变量设置为 true,以此来标记是顺序消息拉取还是并发消息拉取 。然后把 ConsumeMessageService 初始化为 ConsumeMessageOrderlyService 。
2.3 锁定 mq要保证消息的顺序性,就需要保证同一个 MessageQueue 只能被同一个 Consumer 消费 。
ConsumeMessageOrderlyService 初始化的时候,会启动一个定时任务,周期性(默认 20s)地向 Broker 发送锁定消息(请求类型 LOCK_BATCH_MQ),Broker 收到后,就会把 MessageQueue、group 和 clientId 进行绑定,这样其他客户端就不能从这个 MessageQueue 拉取消息 。
注意:Broker 锁定是有过期时间的,默认 60s,可以配置,锁定过期后,有可能被其他 Consumer 进行消费 。
Broker 端锁结构如下图:

文章插图
2.4 拉取消息消费者启动时,启动消费拉取线程 PullMessageService,里面死循环不停地从 Broker 拉取消息 。这里调用了 DefaultMQPushConsumerImpl 类的 pullMessage 方法 。这里拉取消息的逻辑跟并发消息的逻辑是一样的 。
拉取到消息后,调用 PullCallback 的 onSuccess 方法处理结果,这里调用了 ConsumeMessageOrderlyService 的 submitConsumeRequest 方法,里面用线程池提交了 ConsumeRequest 线程 。
PullCallback pullCallback = new PullCallback() { @Override public void onSuccess(PullResult pullResult) {if (pullResult != null) {pullResult = DefaultMQPushConsumerImpl.this.pullAPIWrApper.processPullResult(pullRequest.getMessageQueue(), pullResult,subscriptionData);switch (pullResult.getPullStatus()) {case FOUND://省略if (pullResult.getMsgFoundList() == null || pullResult.getMsgFoundList().isEmpty()) {DefaultMQPushConsumerImpl.this.executePullRequestImmediately(pullRequest);} else {//省略boolean dispatchToConsume = processQueue.putMessage(pullResult.getMsgFoundList());DefaultMQPushConsumerImpl.this.consumeMessageService.submitConsumeRequest(pullResult.getMsgFoundList(),processQueue,pullRequest.getMessageQueue(),dispatchToConsume);//省略}//省略break;//省略}} }//省略};上面拉取到消息后,先把消息放到了 ProcessQueue,然后调用了 submitConsumeRequest 方法 。跟并发消息处理方式不同的是,submitConsumeRequest 方法并没有处理拉取到的消息,而真正处理的时候是从 ProcessQueue 获取 。2.5 处理消息处理消息的逻辑在 ConsumeMessageOrderlyService 的内部类 ConsumeRequest,这是一个线程类,run 方法如下:
public void run() { //省略部分逻辑 //1.获取到 MessageQueueLock 对应的锁 final Object objLock = messageQueueLock.fetchLockObject(this.messageQueue); synchronized (objLock) {if (MessageModel.BROADCASTING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())|| (this.processQueue.isLocked() && !this.processQueue.isLockExpired())) {final long beginTime = System.currentTimeMillis();for (boolean continueConsume = true; continueConsume; ) {//省略延后执行的逻辑final int consumeBatchSize =ConsumeMessageOrderlyService.this.defaultMQPushConsumer.getConsumeMessageBatchMaxSize();//2.从 processQueue 拉取消息List<MessageExt> msgs = this.processQueue.takeMessages(consumeBatchSize);if (!msgs.isEmpty()) {final ConsumeOrderlyContext context = new ConsumeOrderlyContext(this.messageQueue);ConsumeOrderlyStatus status = null;//省略部分逻辑boolean hasException = false;try {//3.获取处理锁this.processQueue.getConsumeLock().lock();//4.执行消费处理逻辑status = messageListener.consumeMessage(Collections.unmodifiableList(msgs), context);} catch (Throwable e) {log.warn(String.format("consumeMessage exception: %s Group: %s Msgs: %s MQ: %s",RemotingHelper.exceptionSimpleDesc(e),ConsumeMessageOrderlyService.this.consumerGroup,msgs,messageQueue), e);hasException = true;} finally {//5.释放处理锁this.processQueue.getConsumeLock().unlock();}//省略部分逻辑continueConsume = ConsumeMessageOrderlyService.this.processConsumeResult(msgs, status, context, this);} else {continueConsume = false;}}} else {//省略部分逻辑ConsumeMessageOrderlyService.this.tryLockLaterAndReconsume(this.messageQueue, this.processQueue, 100);} }}上面的代码总结一下,Consumer 消费消息的逻辑如下:- 对 MessageQueueLock 进行加锁,这样就保证只有一个线程在处理当前 MessageQueue;
- 从 ProcessQueue 拉取一批消息;
推荐阅读
- 新手怎么加入自媒体,学会这4个运营技巧,你也可以出爆文
- 如何重启MySQL服务
- 来自远程技术提供者的建议,如何阻止他人远程访问你的移动设备?
- 苹果|iPhone开售15周年:34款机型 你用过哪些?
- 大排怎么油炸,油炸大排怎么做法-
- 为什么想应聘文员,面试官问你为什么不做销售想做文员-
- 大爷|59岁离异大妈带简历相亲,要彩礼,还得工资上交:我优秀,你不亏
- 我的解放日志|看了《我的解放日志》,你是致郁了还是治愈了?
- 国产|国产桌面操作系统开发者平台发布:你知道哪些国产系统?
- 大学|高考填报志愿时,省外大学值得报考么?这些你值得了解
