Airflow核心概念

  • DAGs:有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行的顺序;

    • Operators:Airflow内置了很多operators

    • BashOperator 执行一个bash 命令

    • PythonOperator 调用任意的 Python 函数

    • EmailOperator 用于发送邮件

    • HTTPOperator 用于发送HTTP请求

    • SqlOperator 用于执行SQL命令

    • 自定义Operator

  • Tasks:Task 是 Operator的一个实例;

  • Task Instance:由于Task会被重复调度,每次task的运行就是不同的 Task instance。Task instance 有自己的状态,包括 success 、 running 、 failed 、 skipped 、 up_for_reschedule 、 up_for_retry 、 queued 、 no_status 等;

  • Task Relationships:DAGs中的不同Tasks之间可以有依赖关系;

  • 执行器(Executor)。Airflow支持的执行器就有四种:

    • SequentialExecutor:单进程顺序执行任务,默认执行器,通常只用于测试

    • LocalExecutor:多进程本地执行任务

    • CeleryExecutor:分布式调度,生产常用。Celery是一个分布式调度框架,其本身无队列功能,需要使用第三方组件,如RabbitMQ

    • DaskExecutor :动态任务调度,主要用于数据分析

    • 执行器的修改。修改 $AIRFLOW_HOME/airflow.cfg 第 70行: executor = LocalExecutor 。修改后启动服务

入门案例

创建helloworld.py放置在 $AIRFLOW_HOME/dags 目录下,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from datetime import datetime, timedelta

from airflow import DAG
from airflow.utils import dates
from airflow.utils.helpers import chain
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator

def default_options():
default_args = {
'owner':'airflow', # 拥有者名称
'start_date': dates.days_ago(1), # 第一次开始执行的时间
'retries': 1, # 失败重试次数
'retry_delay': timedelta(seconds=5) # 失败重试间隔
}
return default_args

# 定义DAG
def task1(dag):
t = "pwd"
# operator支持多种类型,这里使用 BashOperator
task = BashOperator(
task_id='MyTask1', # task_id
bash_command=t, # 指定要执行的命令
dag=dag # 指定归属的dag
)
return task

def hello_world():
current_time = str(datetime.today())
print('hello world at {}'.format(current_time))

def task2(dag):
# Python Operator
task = PythonOperator(
task_id='MyTask2',
python_callable=hello_world, # 指定要执行的函数
dag=dag
)
return task

def task3(dag):
t = "date"
task = BashOperator(
task_id='MyTask3',
bash_command=t,
dag=dag
)
return task

with DAG(
'HelloWorldDag', # dag_id
default_args=default_options(), # 指定默认参数
schedule_interval="*/2 * * * *" # 执行周期,每分钟2次
) as d:
task1 = task1(d)
task2 = task2(d)
task3 = task3(d)
chain(task1, task2, task3) # 指定执行顺序

启动Python3虚拟环境,并在此环境中执行以下:

1
2
3
4
5
6
7
8
9
10
11
# 执行命令检查脚本是否有错误。如果命令行没有报错,就表示没问题
(env) [root@Linux122 dags]# python $AIRFLOW_HOME/dags/helloworld.py

# 查看生效的 dags
(env) [root@Linux122 dags]# airflow list_dags -sd $AIRFLOW_HOME/dags

# 查看指定dag中的task
(env) [root@Linux122 dags]# airflow list_tasks HelloWorldDag

# 测试dag中的task
(env) [root@Linux122 dags]# airflow test HelloWorldDag MyTask2 20200801

核心交易调度任务集成

核心交易分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
# 加载ODS数据(DataX迁移数据)
/data/lagoudw/script/trade/ods_load_trade.sh

# 加载DIM层数据
/data/lagoudw/script/trade/dim_load_product_cat.sh
/data/lagoudw/script/trade/dim_load_shop_org.sh
/data/lagoudw/script/trade/dim_load_payment.sh
/data/lagoudw/script/trade/dim_load_product_info.sh

# 加载DWD层数据
/data/lagoudw/script/trade/dwd_load_trade_orders.sh

# 加载DWS层数据
/data/lagoudw/script/trade/dws_load_trade_orders.sh

# 加载ADS层数据
/data/lagoudw/script/trade/ads_load_trade_order_analysis.sh

创建coretrade.py放置在 $AIRFLOW_HOME/dags 下,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
from datetime import timedelta
import datetime
from airflow import DAG

from airflow.operators.bash_operator import BashOperator
from airflow.utils.dates import days_ago

# 定义dag的缺省参数
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': '2020-06-20',
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

# 定义DAG
coretradedag = DAG(
'coretrade',
default_args=default_args,
description='core trade analyze',
schedule_interval='30 0 * * *',
)

today=datetime.date.today()
oneday=timedelta(days=1)
yesterday=(today-oneday).strftime("%Y-%m-%d")

odstask = BashOperator(
task_id='ods_load_data',
depends_on_past=False,
bash_command='sh /data/script/trade/ods_load_trade.sh ' + yesterday,
dag=coretradedag
)

dimtask1 = BashOperator(
task_id='dimtask_product_cat',
depends_on_past=False,
bash_command='sh /data/script/trade/dim_load_product_cat.sh ' + yesterday,
dag=coretradedag
)

dimtask2 = BashOperator(
task_id='dimtask_shop_org',
depends_on_past=False,
bash_command='sh /data/script/trade/dim_load_shop_org.sh ' + yesterday,
dag=coretradedag
)

dimtask3 = BashOperator(
task_id='dimtask_payment',
depends_on_past=False,
bash_command='sh /data/script/trade/dim_load_payment.sh ' + yesterday,
dag=coretradedag
)

dimtask4 = BashOperator(
task_id='dimtask_product_info',
depends_on_past=False,
bash_command='sh /data/script/trade/dim_load_product_info.sh ' + yesterday,
dag=coretradedag
)

dwdtask = BashOperator(
task_id='dwd_load_data',
depends_on_past=False,
bash_command='sh /data/script/trade/dwd_load_trade_orders.sh '+ yesterday,
dag=coretradedag
)

dwstask = BashOperator(
task_id='dws_load_data',
depends_on_past=False,
bash_command='sh /data/script/trade/dws_load_trade_orders.sh ' + yesterday,
dag=coretradedag
)

adstask = BashOperator(
task_id='ads_load_data',
depends_on_past=False,
bash_command='sh /data/script/trade/ads_load_trade_order_analysis.sh ' + yesterday,
dag=coretradedag
)

odstask >> dimtask1
odstask >> dimtask2
odstask >> dimtask3
odstask >> dimtask4

odstask >> dwdtask

dimtask1 >> dwstask
dimtask2 >> dwstask
dimtask3 >> dwstask
dimtask4 >> dwstask

dwdtask >> dwstask

dwstask >> adstask

备注: depends_on_past ,设置为True时,上一次调度成功了,才可以触发。

1
2
3
4
5
6
7
(env) [root@Linux122 dags]# python $AIRFLOW_HOME/dags/coretrade.py

# 查看生效的 dags
(env) [root@Linux122 dags]# airflow list_dags -sd $AIRFLOW_HOME/dags

# 查看指定dag中的task
(env) [root@Linux122 dags]# airflow list_tasks coretrade