40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!( 三 )

 3.3 持久化数据
Broker 会定时持久化消费偏移量、Topic 配置、定阅组配置等,默认 10s 一次(可以配置) 。代码如下:
//ScheduleMessageService.java this.deliverExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { if (started.get()) { ScheduleMessageService.this.persist(); } } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); 3.4 失效过期请求
Broker 会定时扫描缓存在本地的请求,如果请求开始时间加超时时间(再加 1s)小于当前时间,则这个请求过期 。通过定时任务(3s 一次)让过期请求失效,并且触发回调函数 。
//NettyRemotingServer.java this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); 3.5 过滤服务
消费者可能会向 Broker 注册 filterClass 用来过滤消息 。Broker 收到消费者注册的 filterClass 后会用定时任务来创建 FilterServer 。代码如下:
//FilterServerManager.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { FilterServerManager.this.createFilterServer(); } catch (Exception e) { log.error("", e); } } }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
这样消费者拉取消息时首先从 FilterServer 拉取消息,FilterServer 从 Broker 拉取消息后进行过滤,只把消费者感兴趣的消息返回给消费者 。一个 Broker 可以有多个 FilterServer 。如下图:

40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!

文章插图
 
3.6 记录消息总量
Broker 每天会记录前一天收发消息的总数量,定时任务如下(period 是 1 天):
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); 3.7 持久化 Offset
Broker 默认每隔 5s(可以配置) 会持久化一次消息的 Offset,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 3.8 持久化过滤参数
上面提到过,消费者可能会向 Broker 注册 filterClass,Broker 解析消费者注册的 filterClass 后,会把解析后的 FilterData 持久化到文件,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); 3.9 Broker 自我保护
当消费者读取消息缓慢时,Broker 为了保护自己,会把这个消费者设置为不允许读取的状态,这样这个消费组就不能再拉取消息了,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); 3.10 Broker 打印水位
Broker 会每隔 1s 打印一次水位,包括发送消息的延迟、接收消息的延迟、事务消息的延迟、查询消息的延迟,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); 3.11 Broker 打印 Offset 差
Broker 会定时打印最新的消息 Offset 和已经分发给 MessageQueue 和 Index 索引的 Offset 差距,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);


推荐阅读