Python 实现定时任务的八种方案( 六 )


Airflow使用Python开发,它通过DAGs(Directed Acyclic Graph, 有向无环图)来表达一个工作流中所要执行的任务,以及任务之间的关系和依赖 。比如,如下的工作流中,任务T1执行完成,T2和T3才能开始执行,T2和T3都执行完成,T4才能开始执行 。

Python 实现定时任务的八种方案

文章插图
 
Airflow提供了各种Operator实现,可以完成各种任务实现:
  • BashOperator – 执行 bash 命令或脚本 。
  • SSHOperator – 执行远程 bash 命令或脚本(原理同 paramiko 模块) 。
  • PythonOperator – 执行 Python 函数 。
  • EmailOperator – 发送 Email 。
  • HTTPOperator – 发送一个 HTTP 请求 。
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, 等,执行 SQL 任务 。
  • DockerOperator, HiveOperator, S3FileTransferOperator, PrestoToMysqlOperator, SlackOperator…
除了以上这些 Operators 还可以方便的自定义 Operators 满足个性化的任务需求 。
一些情况下,我们需要根据执行结果执行不同的任务,这样工作流会产生分支 。如:
Python 实现定时任务的八种方案

文章插图
 
这种需求可以使用BranchPythonOperator来实现 。
Airflow 产生的背景通常,在一个运维系统,数据分析系统,或测试系统等大型系统中,我们会有各种各样的依赖需求 。包括但不限于:
  • 时间依赖:任务需要等待某一个时间点触发 。
  • 外部系统依赖:任务依赖外部系统需要调用接口去访问 。
  • 任务间依赖:任务 A 需要在任务 B 完成后启动,两个任务互相间会产生影响 。
  • 资源环境依赖:任务消耗资源非常多,或者只能在特定的机器上执行 。
crontab 可以很好地处理定时执行任务的需求,但仅能管理时间上的依赖 。Airflow 的核心概念 DAG(有向无环图)—— 来表现工作流 。
  • Airflow 是一种 WMS,即:它将任务以及它们的依赖看作代码,按照那些计划规范任务执行,并在实际工作进程之间分发需执行的任务 。
  • Airflow 提供了一个用于显示当前活动任务和过去任务状态的优秀 UI,并允许用户手动管理任务的执行和状态 。
  • Airflow 中的工作流是具有方向性依赖的任务集合 。
  • DAG 中的每个节点都是一个任务,DAG 中的边表示的是任务之间的依赖(强制为有向无环,因此不会出现循环依赖,从而导致无限执行循环) 。
Airflow 核心概念
  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序 。
  • Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事 。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求,SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性 。
  • Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node 。
  • Task Instance:task的一次运行 。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等 。
  • Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task2了 。通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow) 。
Airflow 的架构在一个可扩展的生产环境中,Airflow 含有以下组件:
  • 元数据库:这个数据库存储有关任务状态的信息 。
  • 调度器:Scheduler 是一种使用 DAG 定义结合元数据中的任务状态来决定哪些任务需要被执行以及任务执行优先级的过程 。调度器通常作为服务运行 。
  • 执行器:Executor 是一个消息队列进程,它被绑定到调度器中,用于确定实际执行每个任务计划的工作进程 。有不同类型的执行器,每个执行器都使用一个指定工作进程的类来执行任务 。例如,LocalExecutor 使用与调度器进程在同一台机器上运行的并行进程执行任务 。其他像 CeleryExecutor 的执行器使用存在于独立的工作机器集群中的工作进程执行任务 。
  • Workers:这些是实际执行任务逻辑的进程,由正在使用的执行器确定 。

Python 实现定时任务的八种方案

文章插图
 


推荐阅读