Airflow任务集成部署
Airflow核心概念
-
DAGs:有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序;
-
Operators:Airflow内置了很多operators
-
BashOperator 执行一个bash 命令
-
PythonOperator 调用任意的 Python 函数
-
EmailOperator 用于发送邮件
-
HTTPOperator 用于发送HTTP请求
-
SqlOperator 用于执行SQL命令
-
自定义Operator
-
-
Tasks:Task 是 Operator的一个实例;
-
Task Instance:由于Task会被重复调度,每次task的运行就是不同的 Task instance。Task instance 有自己的状态,包括 success 、 running 、 failed 、 skipped 、 up_for_reschedule 、 up_for_retry 、 queued 、 no_status 等;
-
Task Relationships:DAGs中的不同Tasks之间可以有依赖关系;
-
执行器(Executor)。Airflow支持的执行器就有四种:
-
SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试
-
LocalExecutor:多进程本地执行任务
-
CeleryExecutor:分布式调度,生产常用。Celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,如RabbitMQ
-
DaskExecutor :动态任务调度,主要用于数据分析
-
执行器的修改。修改 $AIRFLOW_HOME/airflow.cfg 第 70行: executor = LocalExecutor 。修改后启动服务
-
入门案例
创建helloworld.py放置在 $AIRFLOW_HOME/dags 目录下,内容如下:
1 | from datetime import datetime, timedelta |
启动Python3虚拟环境,并在此环境中执行以下:
1 | # 执行命令检查脚本是否有错误。如果命令行没有报错,就表示没问题 |
核心交易调度任务集成
核心交易分析
1 | # 加载ODS数据(DataX迁移数据) |
创建coretrade.py放置在 $AIRFLOW_HOME/dags 下,内容如下:
1 | from datetime import timedelta |
备注: depends_on_past ,设置为True时,上一次调度成功了,才可以触发。
1 | (env) [root@Linux122 dags]# python $AIRFLOW_HOME/dags/coretrade.py |