40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!( 二 )

 2.8 消费者 2.8.1 MessageQueue 加锁
对于顺序消息,要保证同一个 MessageQueue 只能被同一个 Consumer 消费 。消费者初始化的时候,会启动一个定时任务,定时(默认 20s,可以配置)地向 Broker 发送锁定消息,Broker 收到请求后,就会把 MessageQueue、group 和 clientId 进行绑定,这样其他客户端就不能从这个 MessageQueue 拉取消息 。
代码如下:
//ConsumeMessageOrderlyService.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ConsumeMessageOrderlyService.this.lockMQPeriodically(); } catch (Throwable e) { log.error("scheduleAtFixedRate lockMQPeriodically exception", e); } } }, 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
注意:Broker 的加锁是有时效的(默认 60s,可以配置),过期后,有可能被其他 Consumer 进行消费 。
2.8.2 性能快照
Consumer 每秒会记录一次性能快照,比如消息从创建到消费花费的时间,消息从保存到消费花费的时间,接收到消息的总数量,失败总数量 。代码如下:
//Consumer.java executorService.scheduleAtFixedRate(new TimerTask() { @Override public void run() { snapshotList.addLast(statsBenchmarkConsumer.createSnapshot()); if (snapshotList.size() > 10) { snapshotList.removeFirst(); } } }, 1000, 1000, TimeUnit.MILLISECONDS);
上面记录了性能快照后,Consumer 会每隔 10s 进行性能参数计算和打印 。代码如下:
//Consumer.java executorService.scheduleAtFixedRate(new TimerTask() { private void printStats() { if (snapshotList.size() >= 10) { Long[] begin = snapshotList.getFirst(); Long[] end = snapshotList.getLast(); final long consumeTps = (long) (((end[1] - begin[1]) / (double) (end[0] - begin[0])) * 1000L); final double averageB2CRT = (end[2] - begin[2]) / (double) (end[1] - begin[1]); final double averageS2CRT = (end[3] - begin[3]) / (double) (end[1] - begin[1]); final long failCount = end[4] - begin[4]; final long b2cMax = statsBenchmarkConsumer.getBorn2ConsumerMaxRT().get(); final long s2cMax = statsBenchmarkConsumer.getStore2ConsumerMaxRT().get(); statsBenchmarkConsumer.getBorn2ConsumerMaxRT().set(0); statsBenchmarkConsumer.getStore2ConsumerMaxRT().set(0); System.out.printf("Current Time: %s TPS: %d FAIL: %d AVG(B2C) RT(ms): %7.3f AVG(S2C) RT(ms): %7.3f MAX(B2C) RT(ms): %d MAX(S2C) RT(ms): %d%n", System.currentTimeMillis(), consumeTps, failCount, averageB2CRT, averageS2CRT, b2cMax, s2cMax ); } }
通过性能参数的日志输出,可以很方便的对 RocketMQ 的消费者进行监控 。
2.8.3 清除过期消息
消费者会定期检查本地拉取的消息列表,如果列表中的消息已经过期(默认 15 分钟过期,可以配置),则把过期消息再次发送给 Broker,然后从本地消息列表删除 。代码如下:
//ConsumeMessageConcurrentlyService.java this.cleanExpireMsgExecutors.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { cleanExpireMsg(); } catch (Throwable e) { log.error("scheduleAtFixedRate cleanExpireMsg exception", e); } } }, this.defaultMQPushConsumer.getConsumeTimeout(), this.defaultMQPushConsumer.getConsumeTimeout(), TimeUnit.MINUTES); 2.8.4 清除过期消息
消费者会每隔 30s 向 NameServer 拉取 MessageQueue 信息,然后跟本地保存的进行比较,如果不一致,则更新本地缓存信息 。代码如下:
//DefaultLitePullConsumerImpl.java scheduledExecutorService.scheduleAtFixedRate( new Runnable() { @Override public void run() { try { fetchTopicMessageQueuesAndCompare(); } catch (Exception e) { log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); } } }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS); 3 Broker 3.1 状态采样
Broker 端会对状态进行采用,比如一个 Topic、MessageQueue、Group 总共发送了多少条消息,Topic 总共发送的消息大小 。Broker 会对这些状态按照秒、分钟、小时为单位进行采样并且定时打印,这里一共有 6 个定时任务 。比如下面是按照秒进行采样的定时任务:
//StatsItemSet.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { samplingInSeconds(); } catch (Throwable ignored) { } } }, 0, 10, TimeUnit.SECONDS); 3.2 记录消息延时
Broker 读取消息时会记录消息从保存磁盘到被读取的时间差并定时打印 。定时任务代码如下:
//MomentStatsItemSet.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { printAtMinutes(); } catch (Throwable ignored) { } } }, Math.abs(UtilAll.computeNextMinutesTimeMillis() - System.currentTimeMillis()), 1000 * 60 * 5, TimeUnit.MILLISECONDS);


推荐阅读