有源码 国庆在家,从0手撸一个依赖任务加载框架( 三 )


}
public DAGProject builder() {
return new DAGProject(this);
}
使用时,我们需要创建对应的IDAGTask,通过addDAGTask、addDAGEdge方法构建出对应有向无环图:
ATask a = new ATask();
BTask b = new BTask();
CTask c = new CTask();
DTask d = new DTask();
ETask e = new ETask();
DAGProject dagProject = new DAGProject.Builder()
.addDAGTask(a)
.addDAGTask(b)
.addDAGTask(c)
.addDAGTask(e)
.addDAGTask(d)
.addDAGEdge(b, a)
.addDAGEdge(c, a)
.addDAGEdge(d, b)
.addDAGEdge(d, c)
.addDAGEdge(e, b)
.builder();
表达任务依赖关系的DAGProject对象就通过建造者模式构建成功了 。
依赖任务加载的调度
当多个任务构建成有向无环图的DAGProject后,我们先不着急丢进线程池,执行对应逻辑前先检测是否有环,这样我们可以在任务加载前抛出相互依赖的错误,大可不必等到执行至有环那一步才抛出 。虽然有环可以靠输入者去保障,但是在一些小细节方面,我们要求输入者保证过于苛刻也过于差体验 。
 

  • 将度为0的任务储存在tempTaskQueue
     
  • while循环将任务取出,存在依赖关系则对应的任务度-1,如果度为0,添加到resultTaskQueue
     
  • 判断最后的resultTaskQueue与之前储存任务的set个数是否相等,false则有环抛出异常
     
public class DAGScheduler {
private void checkCircle(Set TaskSet, Map> TaskMap) {
LinkedList resultTaskQueue = new LinkedList<>();
LinkedList tempTaskQueue = new LinkedList<>();
for (IDAGTask DAGTask : tempTaskSet) {
if (tempTaskMap.get(DAGTask) == null) {
tempTaskQueue.add(DAGTask);
}
}
while (!tempTaskQueue.isEmpty()) {
IDAGTask tempDAGTask = tempTaskQueue.pop();
resultTaskQueue.add(tempDAGTask);
for (IDAGTask DAGTask : tempTaskMap.keySet()) {
Set tempDAGSet = tempTaskMap.get(DAGTask);
if (tempDAGSet != null && tempDAGSet.contains(tempDAGTask)) {
tempDAGSet.remove(tempDAGTask);
if (tempDAGSet.size() == 0) {
tempTaskQueue.add(DAGTask);
}
}
}
}
if (resultTaskQueue.size() != tempTaskSet.size()) {
throw new IllegalArgumentException("相互依赖,玩屁啊,我不跑了!");
}
}
}
检测完环后,开始调度这些依赖任务,将度为0的任务加入阻塞队列,通过newSingleThreadExecutor开启一个线程不断去阻塞队列拿任务 。
 
  • 判断拿出的任务属于主线程执行还是异步执行,主线程执行通过handler.post发送出去,异步执行通过线程池execute
     
  • 所有任务执行完毕,关闭线程池,结束遍历
     
  • 不断将度为0的任务加入阻塞队列
     
public class DAGScheduler {
private void loop() {
for (IDAGTask DAGTask : mTaskSet) {
if (DAGTask.getRely() == 0) {
mTaskBlockingDeque.add(DAGTask);
}
}
ExecutorService singleThreadExecutor = Executors.newSingleThreadExecutor();
singleThreadExecutor.execute(() -> {
for (; ; ) {
try {
while (!mTaskBlockingDeque.isEmpty()) {
IDAGTask executedDAGTsk = (IDAGTask) mTaskBlockingDeque.take();
if (executedDAGTsk.getIsAsync()) {
Handler handler = new Handler(getMainLooper());
handler.post(executedDAGTsk);
} else {
mTaskThreadPool.execute(executedDAGTsk);
}
mTaskSet.remove(executedDAGTsk);
}
if (mTaskSet.isEmpty()) {
singleThreadExecutor.shutdown();
mTaskThreadPool.shutdown();
return;
}
Iterator iterator = mTaskSet.iterator();
while (iterator.hasNext()) {
IDAGTask DAGTask = iterator.next();
if (DAGTask.getRely() == 0) {
mTaskBlockingDeque.put(DAGTask);
iterator.remove();
}
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
});
}
}
至此依赖任务的调度器搭建完毕,配合之前构建好的DAGProject,使用方法如下:
DAGScheduler dagScheduler = new DAGScheduler();dagScheduler.start(dagProject);
/ 使用方式 /
第一步,对应build.gradle配置远程依赖,已经发布到maven central,不用担心jcenter弃用 。
implementation 'work.lingling.dagtask:dagtsk:1.0.0'
第二步,继承IDAGTask类,在run方法中实现对应的初始化逻辑 。
public class ATask extends IDAGTask {
public ATask(String alias) {
super(alias);
}
@Override
public void run() {


推荐阅读