Airflow任務調度
(本檔案內容有同事貢獻部分,該部分標記為藍色,對同事表示感謝)
目錄
一、環境
二、基礎引數
三、任務型別
四、使用步驟
五、需要解決的問題(綠色表示已解決)
六、注意事項
一、環境
版本:airflow 2.0.0;python 3.6
部署方式:集群部署,運行在anaconda3的虛擬環境 (airflow)
* 節點7 [webserver、schuduler、worker]
* 節點8 [worker]
* 節點9 [worker、schuduler]
官網檔案(最新):http://airflow.apache.org/docs/apache-airflow/stable/start.html
非官方翻譯中文檔案(1.10.2):https://airflow.apachecn.org/#/
二、基礎引數
default_args = {
'owner': '***',
'start_date': days_ago(1),
'email': ['xxx@qq.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=50),
'pool': 'test',
'priority_weight': 100
}
baseoperator(
:param task_id: a unique, meaningful id for the task
:type task_id: str
:param owner: the owner of the task, using the unix username is recommended
:type owner: str
:param email: the 'to' email address(es) used in email alerts. This can be a
single email or multiple ones. Multiple addresses can be specified as a
comma or semi-colon separated string or by passing a list of strings.
:type email: str or list[str]
:param email_on_retry: Indicates whether email alerts should be sent when a
task is retried
:type email_on_retry: bool
:param email_on_failure: Indicates whether email alerts should be sent when
a task failed
:type email_on_failure: bool
:param retries: the number of retries that should be performed before
failing the task
:type retries: int
:param retry_delay: delay between retries
:type retry_delay: datetime.timedelta
:param retry_exponential_backoff: allow progressive longer waits between
retries by using exponential backoff algorithm on retry delay (delay
will be converted into seconds)
:type retry_exponential_backoff: bool
:param max_retry_delay: maximum delay interval between retries
:type max_retry_delay: datetime.timedelta
:param start_date: The ``start_date`` for the task, determines
...詳見baseoperator原始碼,注:baseoperator即基礎operator,
)
三、任務型別
-
Bashoperator(運行方式為執行bash命令),例如:
run_this = BashOperator(
task_id='run_after_loop',
bash_command='echo 1',
dag=dag
)
注:可以通過ssh命令在遠程機器上執行腳本或命令
2.ExternalTaskSensor(可以用作dag之間依賴,感知前置dag或task執行狀態,不必重復執行上層依賴),例如:
child_1 = ExternalTaskSensor (
task_id = 'henry_1',
external_dag_id = 'henry_test',
# external_task_id = "task_1",
dag = dag
)
3.LatestOnlyOperator(只運行最新的),可以跳過在 DAG 的最近計劃運行期間未運行的任務,例如:
dag = DAG(
dag_id='latest_only_with_trigger',
schedule_interval=dt.timedelta(hours=4),
start_date=dt.datetime(2016, 9, 20),
)
latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)
4.ExternalTaskMarker(繼承自DummyOperator, 該task被clear之后,下游的依賴任務也會遞回的全都clear,默認深度10)暫時棄用,
parent_task = ExternalTaskMarker(
task_id="parent_task",
external_dag_id="example_external_task_marker_child",
external_task_id="child_task1",
dag = dag
)
5. TriggerDagRunOperator(直接觸發下游dag運行)
t2 = TriggerDagRunOperator(
task_id='trigger_dag',
trigger_dag_id='dag_1',
# 被觸發執行的dag的execution_date,str / datetime.datetime,加這個引數執行報錯...
# execution_date=datetime.datetime(2021, 3, 5, 8, 20),
# reset_dag_run=True,
# wait_for_completion=False,
# Poke interval to check dag run status when wait_for_completion=True.
# poke_interval=60,
dag=dag,
)
四、使用步驟
使用時,依次執行命令:
1)source /home/***/anaconda3/bin/activate airflow # 激活虛擬環境
2)cd /home/***/airflow/dags # 進入任務dags目錄,然后創建自己名稱的檔案夾,將任務放入自己名下便于管理
3)構建.py任務檔案
4)執行(依實際需要操作,可在web頁面操作)
* python 任務檔案確保編譯沒問題
* 運行task:airflow dags run <dag_id> <task_id> <execution_date>
* 重跑/回溯歷史任務:airflow dags backfill <dag_id>-s START_DATE -e END_DATE
五、需要解決的問題(綠色表示已解決)
1.打通airflow和任務的時間引數,讓頁面操作的時間范圍能正確帶入到任務腳本
# The execution date as YYYY-MM-DD date ="{{ ds }}"t = BashOperator( task_id='test_env', bash_command='/tmp/test.sh ', dag=dag, env={'EXECUTION_DATE': date})
這里, {{ ds }}是一個宏,并且由于BashOperator的env引數是使用 Jinja 模板化的,因此執行日期將作為 Bash 腳本中名為EXECUTION_DATE的環境變數提供,
您可以將 Jinja 模板與檔案中標記為“模板化”的每個引數一起使用,模板替換發生在呼叫運算子的 pre_execute 函式之前,
注意,由于airflow實際上接管了日期引數,多日重跑或者回溯的資料,實際上是由多個單日的任務組合而成,也就意味著原有的本身支持批量跑數的python腳本要改成單天執行的腳本,或者直接在bashoperator的bash_command中添加兩個一樣的時間引數{{ ds }},如 bash_command="python test.py {{ ds }} {{ ds }}",(不過原有腳本的一些功能可能就會發生改變,比如原來在時間范圍回圈內執行完多天統一發送郵件的,現在的效果則變成每天一封郵件),
2.任務報警(郵件需要修改組態檔,配置郵件服務,才能使任務腳本中的郵件報警生效)
直接修改airflow.cfg保存退出即可生效,不需要執行任何airflow命令,不要執行airflow initdb,
# smtp server here
smtp_host = smtp.exmail.qq.com(注意這里不要輸錯,第二個位置是exmail而不是email)
smtp_starttls = False
smtp_ssl = True
# Example: smtp_user = airflow
smtp_user = 發件用戶,一般和下面的發件人一致即可(之后改為mmribao)
# Example: smtp_password = airflow
smtp_password =
smtp_port = 465
smtp_mail_from = 發件人
任務檔案中示例:
default_args = {
'owner': '***',
'start_date': days_ago(1),
'email': ['xxx@.com'],
'email_on_failure': True,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(seconds=5),
}
郵件內容效果:

3.任務佇列以及優先級問題(先搭建集群后使用celery的queue)
default_args里面增加了:
'queue':'ribao','pool':'daily','priority_weight':100
之后,任務執行例外,之前會正常執行各部分task且失敗后會發送郵件,但是增加了這三個引數之后,重跑和例行都不會執行子task也不會發送失敗郵件(看起來壓根沒有按照正常的步驟執行任務),增加之后洗掉這3項引數,當天執行重跑也會產生相應的例外,第二天例行之后才會正常執行失敗并且發送郵件,
注:這里的queue和pool含義不同,假如啟動celery的worker的時候指定了 -q 引數,那么該worker就會專門被指定用來跑該queue的任務,之后提交該名稱的queue任務的時候,就會由該worker來執行,
實際使用的時候,只需要添加pool和priority_weight屬性即可實作日常需求,
4.任務命名規范
web頁面是按照字母a-z排序的,同時后幾位也會按按位比較大小,


同時dag名稱是唯一的(task_id只作用在本dag內,不同dag的taskid可以同名),所以正式的dag命名:
年月日_人名首字母_根據業務或功能自行命名,如:
20210303_人名首字母_業務
年月日首先避免了絕大多數重名風險,人名首字母進一步將名稱重名的可能性鎖定在本人任務重,極大程度減少和別人任務同名的可能,
5.是否更新到airflow2.0(使用節點7、節點8/節點9另外搭建2.0,不影響之前的單點1.10)
已解決,直接部署了2.0
6.使用celery構建集群
celery的監控頁面flower:http://***:5555,執行單位是task,同一個dag的不同task可能被分配到不同的worker執行,可以從flower頁面看到執行節點,
考慮節點7作為主節點,節點8作為子節點,節點9作為子節點,
mysql取消使用docker的原因:docker可以部署mysql服務,但是本地物理機需要安裝mysql客戶端,但是物理機安裝客戶端的時候,會對已有的mariadb進行依賴升級,而已有的mariadb的一些依賴被hadoop一些組件所依賴,害怕影響集群,所以這種方式也不保險,索性使用運維提供的二進制安裝包方式繞過依賴問題安裝mysql5.7以及客戶端,依賴問題不存在了,也就沒有使用docker的必要了,
但是經過測驗,mysql5.7及之前版本,容易發生死鎖問題,由于行級鎖,并且也不支持scheduler HA,所以直接換為docker的mysql8,但是mysql8又有其他問題,比如原始碼無法正確識別mysql8版本,導致執行不合版本的sql陳述句,故重新測驗安裝postgresql9.6/10,最后現在使用postgresql10,并啟動2個scheduler,
說明:redis使用docker搭建了哨兵,但是airflow配置broker里面需要填寫一個redis地址,但是哨兵是3個,分別監控各自的redis,并不能起到一個類似zookeeper的作用(它通過api可以告知主節點的ip,但是不能自動直接連接到主節點,所以暫時仍然在airflow.cfg里面手動填寫一個redis主節點的地址),
然后redis也更換為rabbitmq,但是未能解決web頁面的按鈕功能失效,比如重跑回溯任務偶發失敗,所以應該是airflow的bug問題,而非redis不如rabbitmq,并且,rabbitmq在使用時,celery不能很好檢測到其worker運行狀態,必須重新啟動rabbitmq和scheduler,然后worker才能作業,但是flower始終顯示worker離線,但是換成redis就立即能夠識別出worker在線狀態,后來也將result_backend也換為redis和amqp,都未能解決按鈕失效功能,并且官方強烈建議backend存入傳統意義上的資料庫,索性換回redis+postgresql組合,
最重要的還是airflow的dag腳本檔案,
老架構:

現有架構:

7.配置日志組件(先測驗hdfs路徑是否能夠使用)
應該是不行了,,
各節點的logs檔案夾的內容是不一樣的,也就是不同的機器執行任務產生的日志不同(執行什么,產生什么),
8.dag檔案必須放在啟動scheduler的節點(節點7),或者修改組態檔,目前組態檔都是本機的dags目錄,但是worker節點不檢測任務檔案,
9.目前的任務都是先登錄到節點10上然后進行操作,需要注意大批量遷移后,大量連接登錄的問題(連接數限制問題),
10.celery的flower的時區是否需要修改(需要修改celery的原始碼,最后再說)
11.測驗externaltaskmarker和externaltasksensor,在上游任務重跑后,會有怎樣的效果,并且在這兩個實體中execution_date的使用,
這兩個部件在web頁面使用有問題,客戶端命令可以較為正常運行,但是sensor仍然不會隨著marker的清理而自動重跑,基本需要客戶端手動執行命令才能正常,基本上只有第一次執行遇到前置失敗會報警, 1
12.編輯一個任務,定期清理logs檔案夾的歷史檔案,因為批量使用后,會產生大量的日志,后來查看發現,日志量大的主要是scheduler產生的,所以設定了任務,每天自動清理logs/scheduler下的當日的前天的目錄,
13.撰寫一個dags檔案夾分發的任務,每次有人更新了自己的任務檔案,就要手動重跑該任務,更新dags檔案夾到節點8和節點9.
14.任務超時設定問題,因為現有任務尤其是spark的任務通常都要幾分鐘甚至更久,但是目前airflow的默認的超時判定好像都很短,這個值要
置的大一點,
六、注意事項
- 不要在bash_command中使用nohup,否則airflow會認為該任務已經執行完畢,無法正常檢測結果,直接把nohup去掉執行即可,日志會自動記錄在airflow的執行日志中,
- 命令要在operator(task)中執行,因為不同的task可能會被分配到不同的節點分別執行,比如不要在在兩個task中間執行一個os.system(),
- 每新加、修改dags之后,都重跑一下sync_all_dags這個任務的最新一次,這是基礎任務,用于同步三臺幾點的任務檔案,同時也備份到hdfs了,如果忘記重跑的話,每個小時也會自動重繪一次,新任務沒同步,不會影響老任務,不同步A,直接運行A,會報錯,因為worker節點本身并沒有任務檔案,airflow本身的例子不報錯,是因為每臺節點都有例子檔案,一個檔案中的dag的任務會分到不同worker執行,是的,隨機的,并且調度節點本身不執行任務, 步驟:建立.py任務檔案,用airflow環境的python執行該檔案編譯,在airflow的web頁面將sync_all_dags的最后一次任務的狀態置為clear重跑,
- 一些會引發Scheduler行程退出的操作,務必避免:(1)List Dag Run 頁面,標記一個已完成任務為running后,再洗掉該任務;
- 有時修改完老的dag,web頁面會顯示不正常,即便洗掉也不正常,可用客戶端命令進行一次回溯,頁面便可恢復正常,
- 手動觸發dag會改變最新的execution_date, 打亂預定執行計劃,可通過airflow dags next-execution <dag_id> 查看下次調度時間
- Airflow調度dag時,將dag檔案中配置的start_date(當interval是間隔)或者start_date后第一個滿足cron運算式的時間(當interval是cron運算式)視為基準時間,前者第一次實際運行的時間為:start_date加上一個周期的scheduler_interval,而后者第一次實際運行時間是start_date后第二個滿足cron運算式的時間,之后的調度根據上一次的execution_date來進行,就不再依賴dag檔案中的配置,
- utc和cst時間:和execution有關的時間基本都是utc(是celery的時區)時間,需要減去8小時,注意當start_date加上一個周期的scheduler_interval是utc時間,比如start_date=days_ago(1)+scheduler_interval='0 0 * * *'的時候,實際執行時間是今早的8:00,所以scheduler_interval要減去8小時,如果跨天比如想設定一個凌晨的2:19,那么就把原來的days_ago()括號里面加一,然后scheduler_interval設定為"19 18 * * *",
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/274881.html
標籤:其他
上一篇:分布式事務
下一篇:Mybatis復習小結
