Airflow简介

Airflow 是 Airbnb 开源的一个用 Python 编写的调度工具。于 2014 年启动,2015年春季开源,2016 年加入 Apache 软件基金会的孵化计划。

Airflow将一个工作流制定为一组任务的有向无环图(DAG),并指派到一组计算节点上,根据相互之间的依赖关系,有序执行。Airflow 有以下优势:

  • 灵活易用。Airflow 是 Python 编写的,工作流的定义也使用 Python 编写;

  • 功能强大。支持多种不同类型的作业,可自定义不同类型的作业。如 Shell、Python、Mysql、Oracle、Hive等;

  • 简洁优雅。作业的定义简单明了;

  • 易扩展。提供各种基类供扩展,有多种执行器可供选择;

体系架构


  • Webserver 守护进程

    接受 HTTP 请求,通过 Python Flask Web 应用程序与 airflow 进行交互。

    Webserver 提供功能的功能包括:中止、恢复、触发任务;监控正在运行的任务,断点续跑任务;查询任务的状态,日志等详细信息。

  • Scheduler 守护进程

    周期性地轮询任务的调度计划,以确定是否触发任务执行。

  • Worker 守护进程

    Worker负责启动机器上的executor来执行任务。使用celeryExecutor后可以在多个机器上部署worker服务。

重要概念

DAG(Directed Acyclic Graph)有向无环图

  • 在Airflow中,一个DAG定义了一个完整的作业。同一个DAG中的所有Task拥有相同的调度时间。

  • 参数:

    • dag_id:唯一识别DAG

    • default_args:默认参数,如果当前DAG实例的作业没有配置相应参数,则采用DAG实例的default_args中的相应参数

    • schedule_interval:配置DAG的执行周期,可采用crontab语法

Task

  • Task为DAG中具体的作业任务,依赖于DAG,必须存在于某个DAG中。Task在DAG中可以配置依赖关系

  • 参数:

    • dag:当前作业属于相应DAG

    • task_id:任务标识符

    • owner:任务的拥有者

    • start_date:任务的开始时间

Airflow安装部署

环境要求

  • CentOS 7.X

  • Python 3.5或以上版本(推荐)

  • MySQL 5.7.x

  • Apache-Airflow 1.10.11

  • 虚拟机可上网,需在线安装包

正式安装软件之前给虚拟机做一个快照

按照讲义中指定的软件安装

按照讲义的步骤执行对应的命令,命令的遗漏会对后面的安装造成影响

Python环境准备

备注:提前下载 Python-3.6.6.tgz

备注:使用Linux122安装

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
# 卸载 mariadb
[root@Linux122 ~]# rpm -qa | grep mariadb
mariadb-5.5.68-1.el7.x86_64
mariadb-devel-5.5.68-1.el7.x86_64
mariadb-libs-5.5.68-1.el7.x86_64

[root@Linux122 ~]# yum remove mariadb
[root@Linux122 ~]# yum remove mariadb-libs

# 安装依赖
# mysql57-community-release-el7-11.noarch.rpm上传到/opt/lagou/software
[root@Linux122 ~]# cd /opt/lagou/software
[root@Linux122 software]# rpm -ivh mysql57-community-release-el7-11.noarch.rpm

[root@Linux122 software]# yum install readline readline-devel -y
[root@Linux122 software]# yum install gcc -y
[root@Linux122 software]# yum install zlib* -y
[root@Linux122 software]# yum install openssl openssl-devel -y
[root@Linux122 software]# yum install sqlite-devel -y
[root@Linux122 software]# yum install python-devel mysql-devel -y

# 提前到python官网下载好包,上传到/opt/lagou/software
[root@Linux122 software]# tar -zxvf Python-3.6.6.tgz

# 安装 python3 运行环境
[root@Linux122 software]# cd Python-3.6.6/

# configure文件是一个可执行的脚本文件。如果配置了--prefix,安装后的所有 资源文件都会放在目录中
[root@Linux122 Python-3.6.6]# ./configure --prefix=/opt/lagou/servers/python3.6
[root@Linux122 Python-3.6.6]# make && make install
[root@Linux122 Python-3.6.6]# /opt/lagou/servers/python3.6/bin/pip3 install virtualenv

# 启动 python3 环境
[root@Linux122 Python-3.6.6]# cd /opt/lagou/servers/python3.6/bin/
[root@Linux122 bin]# ./virtualenv env
[root@Linux122 bin]# . env/bin/activate

# 检查 python 版本
(env) [root@Linux122 bin]# python -V

安装Airflow

1
2
3
4
5
6
7
8
9
# 设置目录(配置文件)
# 添加到配置文件/etc/profile。未设置是缺省值为 ~/airflow
(env) [root@Linux122 bin]# vim /etc/profile
export AIRFLOW_HOME=/opt/lagou/servers/airflow

(env) [root@Linux122 bin]# source /etc/profile

# 使用豆瓣源非常快。-i: 指定库的安装源(可选选项)
(env) [root@Linux122 bin]# pip install apache-airflow==1.10.11 -i https://pypi.douban.com/simple

备注:

  • apache-airflow==1.10.11,需要指定安装的版本,重要!!!

  • 软件安装路径在$AIRFLOW_HOME(缺省为~/airflow),此时目录不存在

  • 安装的是版本是1.10.11,不指定下载源时下载过程非常慢

创建数据库用户并授权

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
-- 创建数据库
create database airflowlinux122;

-- 创建用户airflow,设置所有ip均可以访问
-- 修改密码的安全等级,密码最少八位
set global validate_password_policy=0;
create user 'airflow'@'%' identified by '12345678';
create user 'airflow'@'localhost' identified by '12345678';

-- 用户授权,为新建的airflow用户授予Airflow库的所有权限
grant all on airflowlinux122.* to 'airflow'@'%';

-- 数据行更新时,timestamp类型字段不更新为当前时间
SET GLOBAL explicit_defaults_for_timestamp = 1;

-- 刷新
flush privileges;

修改Airflow DB配置

1
2
# python3 环境中执行
(env) [root@Linux122 bin]# pip install mysqlclient==1.4.6

安装mysqlclient==1.4.6 报错,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ERROR: Command errored out with exit status 1:
command: /opt/lagou/servers/python3.6/bin/env/bin/python -c 'import io, os, sys, setuptools, tokenize; sys.argv[0] = '"'"'/tmp/pip-install-cx4mknox/mysqlclient_e06944f41161469b81ba1d3b5451ea27/setup.py'"'"'; __file__='"'"'/tmp/pip-install-cx4mknox/mysqlclient_e06944f41161469b81ba1d3b5451ea27/setup.py'"'"';f = getattr(tokenize, '"'"'open'"'"', open)(__file__) if os.path.exists(__file__) else io.StringIO('"'"'from setuptools import setup; setup()'"'"');code = f.read().replace('"'"'\r\n'"'"', '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))' egg_info --egg-base /tmp/pip-pip-egg-info-wceblx57
cwd: /tmp/pip-install-cx4mknox/mysqlclient_e06944f41161469b81ba1d3b5451ea27/
Complete output (12 lines):
/bin/sh: mysql_config: 未找到命令
/bin/sh: mariadb_config: 未找到命令
/bin/sh: mysql_config: 未找到命令
Traceback (most recent call last):
File "<string>", line 1, in <module>
File "/tmp/pip-install-cx4mknox/mysqlclient_e06944f41161469b81ba1d3b5451ea27/setup.py", line 16, in <module>
metadata, options = get_config()
File "/tmp/pip-install-cx4mknox/mysqlclient_e06944f41161469b81ba1d3b5451ea27/setup_posix.py", line 61, in get_config
libs = mysql_config("libs")
File "/tmp/pip-install-cx4mknox/mysqlclient_e06944f41161469b81ba1d3b5451ea27/setup_posix.py", line 29, in mysql_config
raise EnvironmentError("%s not found" % (_mysql_config_path,))
OSError: mysql_config not found
----------------------------------------
WARNING: Discarding https://files.pythonhosted.org/packages/d0/97/7326248ac8d5049968bf4ec708a5d3d4806e412a42e74160d7f266a3e03a/mysqlclient-1.4.6.tar.gz#sha256=f3fdaa9a38752a3b214a6fe79d7cae3653731a53e577821f9187e67cbecb2e16 (from https://pypi.org/simple/mysqlclient/). Command errored out with exit status 1: python setup.py egg_info Check the logs for full command output.
ERROR: Could not find a version that satisfies the requirement mysqlclient==1.4.6 (from versions: 1.3.0, 1.3.1, 1.3.2, 1.3.3, 1.3.4, 1.3.5, 1.3.6, 1.3.7, 1.3.8, 1.3.9, 1.3.10, 1.3.11rc1, 1.3.11, 1.3.12, 1.3.13, 1.3.14, 1.4.0rc1, 1.4.0rc2, 1.4.0rc3, 1.4.0, 1.4.1, 1.4.2, 1.4.2.post1, 1.4.3, 1.4.4, 1.4.5, 1.4.6, 2.0.0, 2.0.1, 2.0.2, 2.0.3, 2.1.0rc1, 2.1.0, 2.1.1)
ERROR: No matching distribution found for mysqlclient==1.4.6

解决方法,执行以下方法,成功后再安装mysqlclient==1.4.6

1
(env) [root@Linux122 bin]# yum install mysql-devel

解决方法报错

1
2
3
4
从 file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql 检索密钥
源 "MySQL 5.7 Community Server" 的 GPG 密钥已安装,但是不适用于此软件包。请检查源的公钥 URL 是否配置正确。
失败的软件包是:mysql-community-server-5.7.37-1.el7.x86_64
GPG 密钥配置为:file:///etc/pki/rpm-gpg/RPM-GPG-KEY-mysql

解决方案,执行以下方法,成功后再安装mysqlclient==1.4.6

GPG验证不通过,我理解是本机配置的这个软件包对应的公钥不对,签名验证失败。(我也不知道这个公钥是在安装过程哪一步自动配置的)。我在mysql官网搜关键字GPG,找到了解决方案,大意是如果使用的4.1以上版本的rpm的话,除了import mysql的公钥到个人用户的配置中,还需要import mysql的公钥到RPM的配置中。

1
(env) [root@Linux122 bin]# rpm --import https://repo.mysql.com/RPM-GPG-KEY-mysql-2022
1
(env) [root@Linux122 bin]# airflow initdb

在执行 airflow initdb 命令时,如遇上如下报错:

1
ModuleNotFoundError: No module named 'wtforms.compat'

这是由于 wtforms升级了3.0.0版本的原因导致的错误。执行以下命令后,重新执行airflow initdb 命令。

1
(env) [root@Linux122 bin]# pip install WTForms==2.3.3

在执行 airflow initdb 命令时,如遇上如下报错:

1
ModuleNotFoundError: No module named 'sqlalchemy.ext.declarative.clsregistry'

这是由于 SQLAlchemy 模块版本低导致的错误。执行以下命令后,重新执行airflow initdb 命令。

1
(env) [root@Linux122 bin]# pip install SQLAlchemy==1.3.23

修改 $AIRFLOW_HOME/airflow.cfg:

1
2
3
4
5
# 约 75 行
sql_alchemy_conn = mysql://airflow:12345678@Linux123:3306/airflowlinux122

# 重新执行
airflow initdb

安装密码模块

安装password组件:

1
(env) [root@Linux122 bin]# pip install apache-airflow[password]

修改 airflow.cfg 配置文件(第一行修改,第二行增加):

1
2
3
4
# 约 281 行 [webserver]
# 约 353行
authenticate = True
auth_backend = airflow.contrib.auth.backends.password_auth

添加密码文件,python命令,执行一遍;添加用户登录,设置口令

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
(env) [root@Linux122 servers]# python
Python 3.6.6 (default, Aug 14 2022, 12:03:18)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'airflow@lagou.com'
user.password = 'airflow123'

session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

启动服务

1
2
3
4
5
6
7
8
9
10
11
12
13
# 备注:要先进入python3的运行环境
[root@Linux122 ~]# cd /opt/lagou/servers/python3.6/bin/
[root@Linux122 bin]# ./virtualenv env
[root@Linux122 bin]# . env/bin/activate

# 退出虚拟环境命令
(env) [root@Linux122 bin]# deactivate

# 启动scheduler调度器:
(env) [root@Linux122 bin]# airflow scheduler -D

# 服务页面启动:
(env) [root@Linux122 bin]# airflow webserver -D

备注:airflow命令所在位置:/opt/lagou/servers/python3.6/bin/env/bin/airflow

安装完成,可以使用浏览器登录 Linux122:8080;输入用户名、口令:airflow / airflow123



修改时区

Airflow默认使用UTC时间,在中国时区需要用+8小时。将UTC修改为中国时区,需要修改Airflow源码。

  • 1、在修改 $AIRFLOW_HOME/airflow.cfg 文件

    1
    2
    # 约 65 行
    default_timezone = Asia/Shanghai
  • 2、修改 timezone.py

    1
    2
    3
    4
    5
    6
    # 进入Airflow包的安装位置
    (env) [root@Linux122 bin]# cd /opt/lagou/servers/python3.6/bin/env/lib/python3.6/site-packages/

    # 修改airflow/utils/timezone.py
    (env) [root@Linux122 site-packages]# cd airflow/utils
    (env) [root@Linux122 utils]# vi timezone.py

    第27行注释,增加29-37行:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    27 utc = pendulum.timezone('UTC')
    28
    29 from airflow import configuration as conf
    30 try:
    31 tz = conf.get("core", "default_timezone")
    32 if tz == "system":
    33 utc = pendulum.local_timezone()
    34 else:
    35 utc = pendulum.timezone(tz)
    36 except Exception:
    37 pass

    备注:以上的修改方式有警告,可以使用下面的方式(推荐):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    27 utc = pendulum.timezone('UTC')
    28
    29 from airflow import configuration as conf
    30 try:
    31 tz = configuration.conf("core", "default_timezone")
    32 if tz == "system":
    33 utc = pendulum.local_timezone()
    34 else:
    35 utc = pendulum.timezone(tz)
    36 except Exception:
    37 pass

    修改utcnow()函数 (注释掉72行,增加73行内容)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    62 def utcnow():
    63 """
    64 Get the current date and time in UTC
    65
    66 :return:
    67 """
    68
    69 # pendulum utcnow() is not used as that sets a TimezoneInfo object
    70 # instead of a Timezone. This is not pickable and also creates issues
    71 # when using replace()
    72 # d = dt.datetime.utcnow()
    73 d = dt.datetime.now()
    74 d = d.replace(tzinfo=utc)
    75
    76 return d
  • 3、修改 airflow/utils/sqlalchemy.py

    1
    2
    3
    4
    5
    # 进入Airflow包的安装位置
    (env) [root@Linux122 ~]# cd /opt/lagou/servers/python3.6/bin/env/lib/python3.6/site-packages/
    # 修改 airflow/utils/sqlalchemy.py
    (env) [root@Linux122 site-packages]# cd airflow/utils
    (env) [root@Linux122 utils]# vi sqlalchemy.py

    在38行之后增加 39 - 47 行的内容:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    38 utc = pendulum.timezone('UTC')
    39 from airflow import configuration as conf
    40 try:
    41 tz = conf.get("core", "default_timezone")
    42 if tz == "system":
    43 utc = pendulum.local_timezone()
    44 else:
    45 utc = pendulum.timezone(tz)
    46 except Exception:
    47 pass

    备注:以上的修改方式有警告,可以使用下面的方式(推荐):

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    38 utc = pendulum.timezone('UTC')
    39 from airflow import configuration
    40 try:
    41 tz = configuration.conf("core", "default_timezone")
    42 if tz == "system":
    43 utc = pendulum.local_timezone()
    44 else:
    45 utc = pendulum.timezone(tz)
    46 except Exception:
    47 pass
  • 4、修改airflow/www/templates/admin/master.html

    1
    2
    3
    4
    5
    # 进入Airflow包的安装位置
    (env) [root@Linux122 ~]# cd /opt/lagou/servers/python3.6/bin/env/lib/python3.6/site-packages/
    # 修改 airflow/www/templates/admin/master.html
    (env) [root@Linux122 site-packages]# cd airflow/www/templates/admin
    (env) [root@Linux122 utils]# vi master.html
    1
    2
    3
    4
    5
    # 将第40行修改为以下内容:
    40 var UTCseconds = x.getTime();

    # 将第43行修改为以下内容:
    43 "timeFormat":"H:i:s",
  • 5、重启airflow webserver

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    # 关闭 airflow webserver 对应的服务
    (env) [root@Linux122 utils]# ps -ef | grep 'airflow-webserver' | grep -v 'grep' | awk '{print $2}' | xargs -i kill -9 {}

    # 关闭 airflow scheduler 对应的服务
    (env) [root@Linux122 utils]# ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs -i kill -9 {}

    # 删除对应的pid文件
    (env) [root@Linux122 utils]# cd $AIRFLOW_HOME
    (env) [root@Linux122 airflow]# rm -rf *.pid

    # 重启服务(在python3.6虚拟环境中执行)
    (env) [root@Linux122 airflow]# airflow scheduler -D
    (env) [root@Linux122 airflow]# airflow webserver -D

Airflow的web界面


Trigger Dag:人为执行触发

Tree View:当dag执行的时候,可以点入,查看每个task的执行状态(基于树状视图)。状态:success、running、failed、skipped、retry、queued、no status

Graph View:基于图视图(有向无环图),查看每个task的执行状态

Tasks Duration:每个task的执行时间统计,可以选择最近多少次执行

Task Tries:每个task的重试次数

Gantt View:基于甘特图的视图,每个task的执行状态

Code View:查看任务执行代码

Logs:查看执行日志,比如失败原因

Refresh:刷新dag任务

Delete Dag:删除该dag任务

禁用自带的DAG任务

停止服务:

1
2
3
4
5
6
7
8
9
# 关闭 airflow webserver 对应的服务
(env) [root@Linux122 utils]# ps -ef | grep 'airflow-webserver' | grep -v 'grep' | awk '{print $2}' | xargs -i kill -9 {}

# 关闭 airflow scheduler 对应的服务
(env) [root@Linux122 utils]# ps -ef | grep 'airflow' | grep 'scheduler' | awk '{print $2}' | xargs -i kill -9 {}

# 删除对应的pid文件
(env) [root@Linux122 utils]# cd $AIRFLOW_HOME
(env) [root@Linux122 airflow]# rm -rf *.pid

修改文件 $AIRFLOW_HOME/airflow.cfg:

1
2
3
# 修改文件第 136 行
136 # load_examples = True
137 load_examples = False
1
2
# 重新设置db
(env) [root@Linux122 airflow]# airflow resetdb -y

重新设置账户、口令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
(env) [root@Linux122 airflow]# python
Python 3.6.6 (default, Aug 14 2022, 12:03:18)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux
Type "help", "copyright", "credits" or "license" for more information.
>>>

import airflow
from airflow import models, settings
from airflow.contrib.auth.backends.password_auth import PasswordUser

user = PasswordUser(models.User())
user.username = 'airflow'
user.email = 'airflow@lagou.com'
user.password = 'airflow123'

session = settings.Session()
session.add(user)
session.commit()
session.close()
exit()

重启服务

1
2
3
# 重启服务
airflow scheduler -D
airflow webserver -D

crontab简介

Linux 系统则是由 cron (crond) 这个系统服务来控制的。Linux 系统上面原本就有非常多的计划性工作,因此这个系统服务是默认启动的。

Linux 系统也提供了Linux用户控制计划任务的命令:crontab 命令。


  • 日志文件:ll /var/log/cron*

  • 编辑文件:vim /etc/crontab

  • 进程:ps -ef | grep crond ==> /etc/init.d/crond restart

  • 作用:任务(命令)定时调度(如:定时备份,实时备份)

  • 简要说明:cat /etc/crontab


在以上各个字段中,还可以使用以下特殊字符:

* 代表所有的取值范围内的数字。如月份字段为*,则表示1到12个月;

/ 代表每一定时间间隔的意思。如分钟字段为*/10,表示每10分钟执行1次;

- 代表从某个区间范围,是闭区间。如2-5表示2,3,4,5,小时字段中0-23/2表示在0~23点范围内每2个小时执行一次;

, 分散的数字(不连续)。如1,2,3,4,7,9;

注:由于各个地方每周第一天不一样,因此Sunday=0(第1天)或Sunday=7(最后1天)。

crontab配置实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
# 每一分钟执行一次command(因cron默认每1分钟扫描一次,因此全为*即可)
* * * * * command

# 每小时的第3和第15分钟执行command
3,15 * * * * command

# 每天上午8-11点的第3和15分钟执行command
3,15 8-11 * * * command

# 每隔2天的上午8-11点的第3和15分钟执行command
3,15 8-11 */2 * * command

# 每个星期一的上午8点到11点的第3和第15分钟执行command
3,15 8-11 * * 1 command

# 每晚的21:30执行command
30 21 * * * command

# 每月1、10、22日的4:45执行command
45 4 1,10,22 * * command

# 每周六、周日的1 : 10执行command
10 1 * * 6,0 command

# 每小时执行command
0 */1 * * * command

# 晚上11点到早上7点之间,每隔一小时执行command
* 23-7/1 * * * command