目标节点接收请求数据同步的请求地址为,task.url=
http://192.168.8.16:8848/nacos/v1/cs/communication/dataChange?dataId=log.yaml&group=DEFAULT_GROUP
@GetMapping("/dataChange")public Boolean notifyConfigInfo(HttpServletRequest request, @RequestParam("dataId") String dataId,@RequestParam("group") String group,@RequestParam(value = https://www.isolves.com/it/wl/js/2022-03-30/"tenant", required = false, defaultValue = StringUtils.EMPTY) String tenant,@RequestParam(value = "tag", required = false) String tag) {dataId = dataId.trim();group = group.trim();String lastModified = request.getHeader(NotifyService.NOTIFY_HEADER_LAST_MODIFIED);long lastModifiedTs = StringUtils.isEmpty(lastModified) ? -1 : Long.parseLong(lastModified);String handleIp = request.getHeader(NotifyService.NOTIFY_HEADER_OP_HANDLE_IP);String isBetaStr = request.getHeader("isBeta");if (StringUtils.isNotBlank(isBetaStr) && trueStr.equals(isBetaStr)) {dumpService.dump(dataId, group, tenant, lastModifiedTs, handleIp, true);} else {//dumpService.dump(dataId, group, tenant, tag, lastModifiedTs, handleIp);}return true;}dumpService.dump用来实现配置的更新,代码如下
当前任务会被添加到DumpTaskMgr中管理 。
public void dump(String dataId, String group, String tenant, String tag, long lastModified, String handleIp,boolean isBeta) {String groupKey = GroupKey2.getKey(dataId, group, tenant);String taskKey = String.join("+", dataId, group, tenant, String.valueOf(isBeta), tag);dumpTaskMgr.addTask(taskKey, new DumpTask(groupKey, tag, lastModified, handleIp, isBeta));DUMP_LOG.info("[dump-task] add task. groupKey={}, taskKey={}", groupKey, taskKey);}TaskManager.addTask, 先调用父类去完成任务添加 。
@Overridepublic void addTask(Object key, AbstractDelayTask newTask) {super.addTask(key, newTask);MetricsMonitor.getDumpTaskMonitor().set(tasks.size());}在这种场景设计中,一般都会采用生产者消费者模式来完成,因此这里不难猜测到,任务会被保存到一个队列中,然后有另外一个线程来执行 。
NacosDelayTaskExecuteEngineTaskManager的父类是
NacosDelayTaskExecuteEngine,
这个类中有一个成员属性 protected final ConcurrentHashMap<Object, AbstractDelayTask> tasks; ,专门来保存延期执行的任务类型AbstractDelayTask.
在这个类的构造方法中,初始化了一个延期执行的任务,其中具体的任务是ProcessRunnable.
public NacosDelayTaskExecuteEngine(String name, int initCapacity, Logger logger, long processInterval) {super(logger);tasks = new ConcurrentHashMap<Object, AbstractDelayTask>(initCapacity);processingExecutor = ExecutorFactory.newSingleScheduledExecutorService(new NameThreadFactory(name));processingExecutor.scheduleWithFixedDelay(new ProcessRunnable(), processInterval, processInterval, TimeUnit.MILLISECONDS);}ProcessRunnableprivate class ProcessRunnable implements Runnable {@Overridepublic void run() {try {processTasks();} catch (Throwable e) {getEngineLog().error(e.toString(), e);}}}processTasksprotected void processTasks() {//获取所有的任务Collection<Object> keys = getAllTaskKeys();for (Object taskKey : keys) {AbstractDelayTask task = removeTask(taskKey);if (null == task) {continue;}//获取任务处理器,这里返回的是DumpProcessorNacosTaskProcessor processor = getProcessor(taskKey);if (null == processor) {getEngineLog().error("processor not found for task, so discarded. " + task);continue;}try {// ReAdd task if process failed//执行具体任务if (!processor.process(task)) {retryFailedTask(taskKey, task);}} catch (Throwable e) {getEngineLog().error("Nacos task execute error : " + e.toString(), e);retryFailedTask(taskKey, task);}}}DumpProcessor.process读取数据库的最新数据,然后更新本地缓存和磁盘 。
原文
https://www.cnblogs.com/mic112/p/16071963.html
推荐阅读
- 【乐凯来国际商务中心】乐凯来国际商务中心房地产介绍
- 你能介绍一下环球中心地产吗
- AMD|1.2万元锐龙9装机不配独显 再疯狂的配置也不能没常识
- 云南省普洱市职业教育中心 普洱市中小学继续教育
- 思雅财富中心的物业好不好?思雅财富中心物业怎么样
- AMOLED|vivo X80 Pro配置曝光:6.78寸E5柔性曲屏、天玑9000加持
- 赠刘景文中心思想感情 赠刘景文表达了作者什么样的感情
- 【现代汽车维修】北京现代汽车维修中心介绍
- 显卡|摩尔线程发布首款全能国产数据中心GPU:4096核心、32GB显存
- 买笔记本电脑主要看什么配置
