40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!( 四 )

 3.12 获取 NameServer 地址
Broker 会定期获取 NameServer 的地址,并更新本地缓存,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.brokerOuterAPI.fetchNameServerAddr(); } catch (Throwable e) { log.error("ScheduledTask fetchNameServerAddr exception", e); } } }, 1000 * 10, 1000 * 60 * 2, TimeUnit.MILLISECONDS); 3.13 打印主从偏移量差距
Broker 会定时打印 master 节点和 slave 节点消息 Offset 的差距,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printMasterAndSlaveDiff(); } catch (Throwable e) { log.error("schedule printMasterAndSlaveDiff error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS); 3.14 向 NameServer 注册
Broker 会定时向(默认 30s,可配置,最高不超过 60s)所有 NameServer 发送注册消息,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.registerBrokerAll(true, false, brokerConfig.isForceRegister()); } catch (Throwable e) { log.error("registerBrokerAll Exception", e); } } }, 1000 * 10, Math.max(10000, Math.min(brokerConfig.getRegisterNameServerPeriod(), 60000)), TimeUnit.MILLISECONDS); 3.15 同步 Slave
Broker 的 Master 节点会每间隔 10s 向 Slave 节点同步数据,包括 Topic 配置、消费偏移量、延迟偏移量、消费组配置,代码如下:
//BrokerController.java slaveSyncFuture = this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.slaveSynchronize.syncAll(); } catch (Throwable e) { log.error("ScheduledTask SlaveSynchronize syncAll error.", e); } } }, 1000 * 3, 1000 * 10, TimeUnit.MILLISECONDS); 3.16 删除过期文件
Broker 会周期性(默认 10s,可以配置)地执行删除任务,删除过期的 CommitLog 文件和 ConsumeQueue 文件,代码如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.cleanFilesPeriodically(); } }, 1000 * 60, this.messageStoreConfig.getCleanResourceInterval(), TimeUnit.MILLISECONDS); 3.17 文件大小检查
Broker 会每隔 10 分钟检查 CommitLog 文件和 ConsumeQueue 文件,用当前文件的最小(起始) Offset 减去上一个文件最小(起始) Offset,如果不等于一个文件的大小,就说明文件存在问题 。代码如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { DefaultMessageStore.this.checkSelf(); } }, 1, 10, TimeUnit.MINUTES); 3.18 保存堆栈映射
Broker 会每隔 1s 记录所有存活线程的堆栈映射信息,前提是 debugLockEnable 开关配置是打开的 。代码如下:
//DefaultMessageStore.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { if (DefaultMessageStore.this.getMessageStoreConfig().isDebugLockEnable()) { try { if (DefaultMessageStore.this.commitLog.getBeginTimeInLock() != 0) { long lockTime = System.currentTimeMillis() - DefaultMessageStore.this.commitLog.getBeginTimeInLock(); if (lockTime > 1000 && lockTime < 10000000) { String stack = UtilAll.jstack(); final String fileName = System.getProperty("user.home") + File.separator + "debug/lock/stack-" + DefaultMessageStore.this.commitLog.getBeginTimeInLock() + "-" + lockTime; MixAll.string2FileNotSafe(stack, fileName); } } } catch (Exception e) { } } } }, 1, 1, TimeUnit.SECONDS); 3.19 检查物理磁盘
Broker 会每隔 10s 检查保存 CommitLog 的磁盘空间是否达到阈值,如果达到,会打印 error 级别的日志 。代码如下:
//DefaultMessageStore.java this.diskCheckScheduledExecutorService.scheduleAtFixedRate(new Runnable() { public void run() { DefaultMessageStore.this.cleanCommitLogService.isSpaceFull(); } }, 1000L, 10000L, TimeUnit.MILLISECONDS); 3.20 持久化延时消息偏移量
RocketMQ 的延时消费分为 18 个级别,定义如下:
//ScheduleMessageService.java private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
RocketMQ 会为每个延时级别定义要给 ConsumeQueue,每隔 ConsumeQueue 都会有一个 Offset,通过 offsetTable(ConcurrentMap) 来记录不同延时级别对应的 Offset 。
RocketMQ 会周期性地(默认 10s,可以配置)把 offsetTable 中保存的 Offset 持久化到文件 。代码如下:
//DefaultMessageStore.java this.deliverExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { if (started.get()) { ScheduleMessageService.this.persist(); } } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS);


推荐阅读