Airflow简介与安装部署
Airflow简介
Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015年春季开源,2016 年加入 Apache 软件基金会的孵化计划。
Airflow将一个工作流制定为一组任务的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。Airflow 有以下优势:
-
灵活易用。Airflow 是 Python 编写的,工作流的定义也使用 Python 编写;
-
功能强大。支持多种不同类型的作业,可自定义不同类型的作业。如 Shell、Python、Mysql、Oracle、Hive等;
-
简洁优雅。作业的定义简单明了;
-
易扩展。提供各种基类供扩展,有多种执行器可供选择;
体系架构
-
Webserver 守护进程
接受 HTTP 请求,通过 Python Flask Web 应用程序与 airflow 进行交互。
Webserver 提供功能的功能包括:中止、恢复、触发任务;监控正在运行的任务,断点续跑任务;查询任务的状态,日志等详细信息。
-
Scheduler 守护进程
周期性地轮询任务的调度计划,以确定是否触发任务执行。
-
Worker 守护进程
Worker负责启动机器上的executor来执行任务。使用celeryExecutor后可以在多个机器上部署worker服务。
重要概念
DAG(Directed Acyclic Graph)有向无环图
-
在Airflow中,一个DAG定义了一个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。
-
参数:
-
dag_id:唯一识别DAG
-
default_args:默认参数,如果当前DAG实例的作业没有配置相应参数,则采用DAG实例的default_args中的相应参数
-
schedule_interval:配置DAG的执行周期,可采用crontab语法
-
Task
-
Task为DAG中具体的作业任务,依赖于DAG,必须存在于某个DAG中。Task在DAG中可以配置依赖关系
-
参数:
-
dag:当前作业属于相应DAG
-
task_id:任务标识符
-
owner:任务的拥有者
-
start_date:任务的开始时间
-
Airflow安装部署
环境要求
-
CentOS 7.X
-
Python 3.5或以上版本(推荐)
-
MySQL 5.7.x
-
Apache-Airflow 1.10.11
-
虚拟机可上网,需在线安装包
正式安装软件之前给虚拟机做一个快照
按照讲义中指定的软件安装
按照讲义的步骤执行对应的命令,命令的遗漏会对后面的安装造成影响
Python环境准备
备注:提前下载 Python-3.6.6.tgz
备注:使用Linux122安装
1 | # 卸载 mariadb |
安装Airflow
1 | # 设置目录(配置文件) |
备注:
-
apache-airflow==1.10.11,需要指定安装的版本,重要!!!
-
软件安装路径在$AIRFLOW_HOME(缺省为~/airflow),此时目录不存在
-
安装的是版本是1.10.11,不指定下载源时下载过程非常慢
创建数据库用户并授权
1 | -- 创建数据库 |
修改Airflow DB配置
1 | # python3 环境中执行 |
安装mysqlclient==1.4.6 报错,
1 | ERROR: Command errored out with exit status 1: |
解决方法,执行以下方法,成功后再安装mysqlclient==1.4.6
1 | (env) [root@Linux122 bin]# yum install mysql-devel |
解决方法报错
1 | 从 file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql 检索密钥 |
解决方案,执行以下方法,成功后再安装mysqlclient==1.4.6
GPG验证不通过,我理解是本机配置的这个软件包对应的公钥不对,签名验证失败。(我也不知道这个公钥是在安装过程哪一步自动配置的)。我在mysql官网搜关键字GPG,找到了解决方案,大意是如果使用的4.1以上版本的rpm的话,除了import mysql的公钥到个人用户的配置中,还需要import mysql的公钥到RPM的配置中。
1 | (env) [root@Linux122 bin]# rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022 |
1 | (env) [root@Linux122 bin]# airflow initdb |
在执行 airflow initdb 命令时,如遇上如下报错:
1 | ModuleNotFoundError: No module named 'wtforms.compat' |
这是由于 wtforms升级了3.0.0版本的原因导致的错误。执行以下命令后,重新执行airflow initdb 命令。
1 | (env) [root@Linux122 bin]# pip install WTForms==2.3.3 |
在执行 airflow initdb 命令时,如遇上如下报错:
1 | ModuleNotFoundError: No module named 'sqlalchemy.ext.declarative.clsregistry' |
这是由于 SQLAlchemy 模块版本低导致的错误。执行以下命令后,重新执行airflow initdb 命令。
1 | (env) [root@Linux122 bin]# pip install SQLAlchemy==1.3.23 |
修改 $AIRFLOW_HOME/airflow.cfg:
1 | # 约 75 行 |
安装密码模块
安装password组件:
1 | (env) [root@Linux122 bin]# pip install apache-airflow[password] |
修改 airflow.cfg 配置文件(第一行修改,第二行增加):
1 | # 约 281 行 [webserver] |
添加密码文件,python命令,执行一遍;添加用户登录,设置口令
1 | (env) [root@Linux122 servers]# python |
启动服务
1 | # 备注:要先进入python3的运行环境 |
备注:airflow命令所在位置:/opt/lagou/servers/python3.6/bin/env/bin/airflow
安装完成,可以使用浏览器登录 Linux122:8080;输入用户名、口令:airflow / airflow123
修改时区
Airflow默认使用UTC时间,在中国时区需要用+8小时。将UTC修改为中国时区,需要修改Airflow源码。
-
1、在修改 $AIRFLOW_HOME/airflow.cfg 文件
1
2# 约 65 行
default_timezone = Asia/Shanghai -
2、修改 timezone.py
1
2
3
4
5
6# 进入Airflow包的安装位置
(env) [root@Linux122 bin]# cd /opt/lagou/servers/python3.6/bin/env/lib/python3.6/site-packages/
# 修改airflow/utils/timezone.py
(env) [root@Linux122 site-packages]# cd airflow/utils
(env) [root@Linux122 utils]# vi timezone.py第27行注释,增加29-37行:
1
2
3
4
5
6
7
8
9
10
1127 utc = pendulum.timezone('UTC')
28
29 from airflow import configuration as conf
30 try:
31 tz = conf.get("core", "default_timezone")
32 if tz == "system":
33 utc = pendulum.local_timezone()
34 else:
35 utc = pendulum.timezone(tz)
36 except Exception:
37 pass备注:以上的修改方式有警告,可以使用下面的方式(推荐):
1
2
3
4
5
6
7
8
9
10
1127 utc = pendulum.timezone('UTC')
28
29 from airflow import configuration as conf
30 try:
31 tz = configuration.conf("core", "default_timezone")
32 if tz == "system":
33 utc = pendulum.local_timezone()
34 else:
35 utc = pendulum.timezone(tz)
36 except Exception:
37 pass修改utcnow()函数 (注释掉72行,增加73行内容)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
1562 def utcnow():
63 """
64 Get the current date and time in UTC
65
66 :return:
67 """
68
69 # pendulum utcnow() is not used as that sets a TimezoneInfo object
70 # instead of a Timezone. This is not pickable and also creates issues
71 # when using replace()
72 # d = dt.datetime.utcnow()
73 d = dt.datetime.now()
74 d = d.replace(tzinfo=utc)
75
76 return d -
3、修改 airflow/utils/sqlalchemy.py
1
2
3
4
5# 进入Airflow包的安装位置
(env) [root@Linux122 ~]# cd /opt/lagou/servers/python3.6/bin/env/lib/python3.6/site-packages/
# 修改 airflow/utils/sqlalchemy.py
(env) [root@Linux122 site-packages]# cd airflow/utils
(env) [root@Linux122 utils]# vi sqlalchemy.py在38行之后增加 39 - 47 行的内容:
1
2
3
4
5
6
7
8
9
1038 utc = pendulum.timezone('UTC')
39 from airflow import configuration as conf
40 try:
41 tz = conf.get("core", "default_timezone")
42 if tz == "system":
43 utc = pendulum.local_timezone()
44 else:
45 utc = pendulum.timezone(tz)
46 except Exception:
47 pass备注:以上的修改方式有警告,可以使用下面的方式(推荐):
1
2
3
4
5
6
7
8
9
1038 utc = pendulum.timezone('UTC')
39 from airflow import configuration
40 try:
41 tz = configuration.conf("core", "default_timezone")
42 if tz == "system":
43 utc = pendulum.local_timezone()
44 else:
45 utc = pendulum.timezone(tz)
46 except Exception:
47 pass -
4、修改airflow/www/templates/admin/master.html
1
2
3
4
5# 进入Airflow包的安装位置
(env) [root@Linux122 ~]# cd /opt/lagou/servers/python3.6/bin/env/lib/python3.6/site-packages/
# 修改 airflow/www/templates/admin/master.html
(env) [root@Linux122 site-packages]# cd airflow/www/templates/admin
(env) [root@Linux122 utils]# vi master.html1
2
3
4
5# 将第40行修改为以下内容:
40 var UTCseconds = x.getTime();
# 将第43行修改为以下内容:
43 "timeFormat":"H:i:s", -
5、重启airflow webserver
1
2
3
4
5
6
7
8
9
10
11
12
13# 关闭 airflow webserver 对应的服务
(env) [root@Linux122 utils]# ps -ef | grep 'airflow-webserver' | grep -v 'grep' | awk '{print $2}' | xargs -i kill -9 {}
# 关闭 airflow scheduler 对应的服务
(env) [root@Linux122 utils]# ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs -i kill -9 {}
# 删除对应的pid文件
(env) [root@Linux122 utils]# cd $AIRFLOW_HOME
(env) [root@Linux122 airflow]# rm -rf *.pid
# 重启服务(在python3.6虚拟环境中执行)
(env) [root@Linux122 airflow]# airflow scheduler -D
(env) [root@Linux122 airflow]# airflow webserver -D
Airflow的web界面
Trigger Dag:人为执行触发
Tree View:当dag执行的时候,可以点入,查看每个task的执行状态(基于树状视图)。状态:success、running、failed、skipped、retry、queued、no status
Graph View:基于图视图(有向无环图),查看每个task的执行状态
Tasks Duration:每个task的执行时间统计,可以选择最近多少次执行
Task Tries:每个task的重试次数
Gantt View:基于甘特图的视图,每个task的执行状态
Code View:查看任务执行代码
Logs:查看执行日志,比如失败原因
Refresh:刷新dag任务
Delete Dag:删除该dag任务
禁用自带的DAG任务
停止服务:
1 | # 关闭 airflow webserver 对应的服务 |
修改文件 $AIRFLOW_HOME/airflow.cfg:
1 | # 修改文件第 136 行 |
1 | # 重新设置db |
重新设置账户、口令:
1 | (env) [root@Linux122 airflow]# python |
重启服务
1 | # 重启服务 |
crontab简介
Linux 系统则是由 cron (crond) 这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。
Linux 系统也提供了Linux用户控制计划任务的命令:crontab 命令。
-
日志文件:ll /var/log/cron*
-
编辑文件:vim /etc/crontab
-
进程:ps -ef | grep crond ==> /etc/init.d/crond restart
-
作用:任务(命令)定时调度(如:定时备份,实时备份)
-
简要说明:cat /etc/crontab
在以上各个字段中,还可以使用以下特殊字符:
* 代表所有的取值范围内的数字。如月份字段为*,则表示1到12个月;
/ 代表每一定时间间隔的意思。如分钟字段为*/10,表示每10分钟执行1次;
- 代表从某个区间范围,是闭区间。如2-5表示2,3,4,5,小时字段中0-23/2表示在0~23点范围内每2个小时执行一次;
, 分散的数字(不连续)。如1,2,3,4,7,9;
注:由于各个地方每周第一天不一样,因此Sunday=0(第1天)或Sunday=7(最后1天)。
crontab配置实例
1 | # 每一分钟执行一次command(因cron默认每1分钟扫描一次,因此全为*即可) |