3.3 持久化数据
Broker 会定时持久化消费偏移量、Topic 配置、定阅组配置等,默认 10s 一次(可以配置) 。代码如下:
//ScheduleMessageService.java this.deliverExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { if (started.get()) { ScheduleMessageService.this.persist(); } } catch (Throwable e) { log.error("scheduleAtFixedRate flush exception", e); } } }, 10000, this.defaultMessageStore.getMessageStoreConfig().getFlushDelayOffsetInterval(), TimeUnit.MILLISECONDS); 3.4 失效过期请求
Broker 会定时扫描缓存在本地的请求,如果请求开始时间加超时时间(再加 1s)小于当前时间,则这个请求过期 。通过定时任务(3s 一次)让过期请求失效,并且触发回调函数 。
//NettyRemotingServer.java this.timer.scheduleAtFixedRate(new TimerTask() { @Override public void run() { try { NettyRemotingServer.this.scanResponseTable(); } catch (Throwable e) { log.error("scanResponseTable exception", e); } } }, 1000 * 3, 1000); 3.5 过滤服务
消费者可能会向 Broker 注册 filterClass 用来过滤消息 。Broker 收到消费者注册的 filterClass 后会用定时任务来创建 FilterServer 。代码如下:
//FilterServerManager.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { FilterServerManager.this.createFilterServer(); } catch (Exception e) { log.error("", e); } } }, 1000 * 5, 1000 * 30, TimeUnit.MILLISECONDS);
这样消费者拉取消息时首先从 FilterServer 拉取消息,FilterServer 从 Broker 拉取消息后进行过滤,只把消费者感兴趣的消息返回给消费者 。一个 Broker 可以有多个 FilterServer 。如下图:

文章插图
3.6 记录消息总量
Broker 每天会记录前一天收发消息的总数量,定时任务如下(period 是 1 天):
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.getBrokerStats().record(); } catch (Throwable e) { log.error("schedule record error.", e); } } }, initialDelay, period, TimeUnit.MILLISECONDS); 3.7 持久化 OffsetBroker 默认每隔 5s(可以配置) 会持久化一次消息的 Offset,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerOffsetManager.persist(); } catch (Throwable e) { log.error("schedule persist consumerOffset error.", e); } } }, 1000 * 10, this.brokerConfig.getFlushConsumerOffsetInterval(), TimeUnit.MILLISECONDS); 3.8 持久化过滤参数上面提到过,消费者可能会向 Broker 注册 filterClass,Broker 解析消费者注册的 filterClass 后,会把解析后的 FilterData 持久化到文件,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.consumerFilterManager.persist(); } catch (Throwable e) { log.error("schedule persist consumer filter error.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); 3.9 Broker 自我保护当消费者读取消息缓慢时,Broker 为了保护自己,会把这个消费者设置为不允许读取的状态,这样这个消费组就不能再拉取消息了,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.protectBroker(); } catch (Throwable e) { log.error("protectBroker error.", e); } } }, 3, 3, TimeUnit.MINUTES); 3.10 Broker 打印水位Broker 会每隔 1s 打印一次水位,包括发送消息的延迟、接收消息的延迟、事务消息的延迟、查询消息的延迟,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { BrokerController.this.printWaterMark(); } catch (Throwable e) { log.error("printWaterMark error.", e); } } }, 10, 1, TimeUnit.SECONDS); 3.11 Broker 打印 Offset 差Broker 会定时打印最新的消息 Offset 和已经分发给 MessageQueue 和 Index 索引的 Offset 差距,代码如下:
//BrokerController.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { log.info("dispatch behind commit log {} bytes", BrokerController.this.getMessageStore().dispatchBehindBytes()); } catch (Throwable e) { log.error("schedule dispatchBehindBytes error.", e); } } }, 1000 * 10, 1000 * 60, TimeUnit.MILLISECONDS);
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- 古达克副本任务在哪里接?古达克副本入口在哪?
- 想要给电脑定时关机怎么设置? 怎样设置定时关机
- 紫色曲玉任务多少级?DNF里的紫色曲玉怎么获得?
- 定时发送微信的软件——如何做到微信定时自动发送消息?
- windows 7旗舰版电脑计划任务服务的设置方法 计划任务服务
- 百川任务平台中赚的钱什么时候能提现 百川任务平台是真的吗
- 祖达克任务大全,魔兽世界 祖达克任务?
- w10定时关机在哪里,详细教程分解图一看就会
- 问下WOW怎么冲星界财团的声望啊?从0到崇敬。纳格兰是那个NPC有个循环任务是交念珠和象牙的?具体位置说下 星界财团声望速刷攻略
- 流浪地球2|《狂飙》剧情解析:老默执行任务,高启强左右为难
