1、說明
- 依賴python環境、基于pip安裝apache-airflow
- 安裝程序可能會缺少系統依賴報錯如gcc、mysql-devel 之類, 缺什么就 yum install 什么即可
2、airflow + celery架構

3、集群規劃
| 服務器hadoop100 | 服務器hadoop101 | 服務器hadoop102 | |
|---|---|---|---|
| web server | V | ||
| scheduler | V | ||
| worker | V | V | V |
注意
- 撰寫DAG檔案需要保證在集群每個節點都要同步,因為每個worker都是從本地進行讀取執行的, 不像oozie那樣上傳到HDFS. scheduler僅僅是發送一條要執行哪個DAG下的哪個Task的命令到Queue Broker下, 然后worker再根據命令去指定指定的那個Task.
- 集群里面的airflow.cfg組態檔也需要保持同步
4、安裝
4.1、下載apache-airflow、celery、mysql、redis包
1、在3臺機器上都要下載一次
以下是在hadoop101上執行, 在hadoop100,hadoop102一樣的下載
[hadoop@hadoop101 ~]$ pip3 install apache-airflow==2.0.0
[hadoop@hadoop101 ~]$ pip3 install apache-airflow[celery]
[hadoop@hadoop101 ~]$ pip3 install apache-airflow[reids] -i
http://pypi.douban.com/simple/ --trusted-host pypi.douban.com
2、配置Airflow作業目錄的環境變數
- 在3臺機器上都要配置一次、最好使用xsync腳本同步
[hadoop@hadoop101 ~]$ vim /etc/profile
添加如下
export AIRFLOW_HOME=~/app/airflow
3 配置airflow命令
- 下載apache-airflow包后,在python環境的bin目錄下會生成airflow命令, 需要自己配置到全域呼叫, 我是用軟連接掛在到/usr/local/bin
如
[hadoop@hadoop101 bin]$ pwd
/usr/local/python3/bin
[hadoop@hadoop101 bin]$ ll | grep airflow
-rwxr-xr-x. 1 hadoop hadoop 231 1月 6 19:58 airflow
# 創建軟連接
[hadoop@hadoop101 bin]$ ln -s /usr/local/python3/bin/airflow /usr/local/bin/airflow
[hadoop@hadoop101 bin]$ cd /usr/local/bin/
[hadoop@hadoop101 bin]$ ll
lrwxrwxrwx. 1 root root 30 1月 7 10:26 airflow -> /usr/local/python3/bin/airflow
使用命令airflow --help看是否執行成功, 執行airflow的任何命令都會初始化airflow的作業目錄的生成(在${AIRFLOW_HOME}目錄下)
[hadoop@hadoop101 airflow]$ pwd
/home/hadoop/app/airflow
[hadoop@hadoop101 airflow]$ ll
總用量 24488
-rw-rw-r--. 1 hadoop hadoop 38857 1月 7 20:09 airflow.cfg
drwxrwxr-x. 9 hadoop hadoop 179 1月 7 20:05 logs
-rw-rw-r--. 1 hadoop hadoop 2619 1月 7 10:26 unittests.cfg
-rw-rw-r--. 1 hadoop hadoop 4279 1月 7 10:26 webserver_config.py
4.2、配置元資料庫
- 修改
${AIRFLOW_HOME}/airflow.cfg組態檔, 配置遠程連接資料庫的地址, 需要先創建一個叫airflow的資料庫. 因為默認使用的是sqlite作為元資料庫不支持DAG任務的并發執行、絕不能用于生產環境 - 需要設定mysql的my.cnf 檔案下的 `[mysqld] 的 explicit_defaults_for_timestamp=1 才能正常連接成功
[core]
# 時區設定
default_timezone = Asia/Shanghai
# 資料庫連接設定
sql_alchemy_conn = mysql+mysqldb://airflow:123456@www.burukeyou.com:3306/airflow?charset=utf8
# 資料庫編碼
sql_engine_encoding = utf-8
4.3、配置celery
- 修改
${AIRFLOW_HOME}/airflow.cfg組態檔, 配置celery和executor相關配置
# 設定執行策略、可選SequentialEXecutor(默認)、LocalExecutor(適合單機)、Celery Executor
executor = CeleryExecutor
# 配置celery的broker_url (存盤要執行的命令然后celery的worker去消費)
broker_url = redis://127.0.0.1:6379/0
# 配置celery的result_backend (存盤任務執行狀態)、 也可以用redis存盤
result_backend = db+mysql://airflow:123456@www.burukeyou.com:3306/airflow?charset=utf8
#result_backend = redis://127.0.0.1:6379/1
4、其他airflow.cfg 配置(可選)
# web ui 界面使用的時區
default_ui_timezone=Asia/Shanghai
# 是否加載案例demo
load_examples = False
# 是否執行以前未執行的DAG Run, 定義每個DAG物件可傳遞catchup引數覆寫
catchup_by_default = False
5、啟動
5.1 初始化元資料庫表生成
- 執行如下命令成功后, 查看資料庫airflow下是否生成各個表
[hadoop@hadoop101 ~]$ airflow db init

5.2 啟動web server、scheduler、worker
- 在hadoop101上啟動web server、scheduler、worker
- 在hadoop100和102上啟動worker
[hadoop@hadoop101 ~]$ airflow webserver --port 9988 # -D 引數以守護行程啟動(以下均適用)
[hadoop@hadoop101 ~]$ airflow scheduler -D
[hadoop@hadoop101 ~]$ airflow celery worker -D # -q 可指定worker監聽消費的佇列,默認是default佇列
[hadoop@hadoop100 ~]$ airflow celery worker -D
[hadoop@hadoop102 ~]$ airflow celery worker -D
訪問 hadoop101:9988的web server 的UI界面

發現需要賬號密碼、創建一個即可.
[hadoop@hadoop101 ~]$airflow users create \
--username hadoop \
--firstname hadoop \
--lastname hadoop \
--role Admin \
--email xx@xx.com

之后在${AIRFLOW_HOME}/dags 撰寫自己的DAG任務即可, DAG檔案名以包含dag或者airflow就會被scheduler去調度執行. 會根據 DAG物件的start_date和schedule_interval兩個引數去生成每個DAG RUN的時間點, 時間到了就會觸發執行.
tip:
- start_date和schedule_interval 兩個引數去配置定時的周期可不是從字面上看上那么簡單.
- 見 Airflow 官方檔案 FAQ
- 見 Airflow 官方檔案 DAG RUN
打賞

轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/247181.html
標籤:其他
