40个定时任务!这次带你彻底理解 RocketMQ 设计精髓!( 五 )

 3.21 关闭异常连接
Broker 会定时扫描所有的长连接,主要包括生产者、消费者和 FilterServer,如果连接不活跃,则关闭该连接,并从本地连接列表中移除 。代码如下:
//ClientHousekeepingService.java this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() { @Override public void run() { try { ClientHousekeepingService.this.scanExceptionChannel(); } catch (Throwable e) { log.error("Error occurred when scan not active client channels.", e); } } }, 1000 * 10, 1000 * 10, TimeUnit.MILLISECONDS); 3.22 清理过期消息
如果 Broker 配置了允许快速失败(brokerFastFailureEnable),则会每隔 10ms 定时清理过期请求,包括要发送的消息、接收的消息、心跳消息、要结束的事务消息 。代码如下:
scheduledExecutorService.scheduleAtFixedRate( new Runnable() { @Override public void run() { try { fetchTopicMessageQueuesAndCompare(); } catch (Exception e) { log.error("ScheduledTask fetchMessageQueuesAndCompare exception", e); } } }, 1000 * 10, this.getDefaultLitePullConsumer().getTopicMetadataCheckIntervalMillis(), TimeUnit.MILLISECONDS);
注意:清理消息前会判断是否系统繁忙,如果系统繁忙,会给发送队列中的消息直接返回系统繁忙,暂时不做过期消息清理 。
4 NameServer 4.1 检查过期 Broker
在 3.14 节中讲过,Broker 会跟 NameServer 建立长连接,定时向 NameServer 发送注册消息 。NameServer 会在本地维护一个 Broker 列表,定时任务会轮询本地保存的 Broker 列表,检查注册消息是否过期(超过 120s),如果注册消息过期,则关闭长连接,从本地缓存删除这个 Broker 。代码如下:
//NamesrvController.java this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.routeInfoManager::scanNotActiveBroker, 5, 10, TimeUnit.SECONDS); 4.2 打印配置
NameServer 启动时,会加载 KV 格式的配置文件到 configTable 这个变量,NameServer 客户端也可以发送一个 KV 配置请求给 NameServer,NameServer 收到请求后也会保存到 configTable 。
NameServer 会定时打印 configTable 中的配置,代码如下:
//NamesrvController.java this.scheduledExecutorService.scheduleAtFixedRate(NamesrvController.this.kvConfigManager::printAllPeriodically, 1, 10, TimeUnit.MINUTES); 5 总结
RocketMQ 的定时任务很多,这些定时任务的加入让 RocketMQ 的设计更加完备,包括业务处理、监控日志、心跳、清理任务、关闭连接、持久化数据等 。通过对定时任务的理解,能够更深入地理解 RocketMQ 的设计理念 。
 

原文:https://mp.weixin.qq.com/s/DcBTYFDj2wMq9BYbjyLvmQ
 
如果感觉本文对你有帮助,点赞关注支持一下




推荐阅读