主頁 > 軟體設計 > Airflow2.0+celery+redis任務調度部署及使用

Airflow2.0+celery+redis任務調度部署及使用

2021-04-11 12:03:42 軟體設計

Airflow任務調度

(本檔案內容有同事貢獻部分,該部分標記為藍色,對同事表示感謝)

目錄

一、環境

二、基礎引數

三、任務型別

四、使用步驟

五、需要解決的問題(綠色表示已解決)

六、注意事項


一、環境

版本:airflow 2.0.0;python 3.6

部署方式:集群部署,運行在anaconda3的虛擬環境 (airflow)

* 節點7 [webserver、schuduler、worker]

* 節點8 [worker]

* 節點9 [worker、schuduler]

官網檔案(最新):http://airflow.apache.org/docs/apache-airflow/stable/start.html

非官方翻譯中文檔案(1.10.2):https://airflow.apachecn.org/#/

二、基礎引數

default_args = {

'owner': '***',

'start_date': days_ago(1),

'email': ['xxx@qq.com'],

'email_on_failure': True,

'email_on_retry': False,

'retries': 1,

'retry_delay': timedelta(seconds=50),

'pool': 'test',

'priority_weight': 100

}

baseoperator(

:param task_id: a unique, meaningful id for the task

:type task_id: str

:param owner: the owner of the task, using the unix username is recommended

:type owner: str

:param email: the 'to' email address(es) used in email alerts. This can be a

single email or multiple ones. Multiple addresses can be specified as a

comma or semi-colon separated string or by passing a list of strings.

:type email: str or list[str]

:param email_on_retry: Indicates whether email alerts should be sent when a

task is retried

:type email_on_retry: bool

:param email_on_failure: Indicates whether email alerts should be sent when

a task failed

:type email_on_failure: bool

:param retries: the number of retries that should be performed before

failing the task

:type retries: int

:param retry_delay: delay between retries

:type retry_delay: datetime.timedelta

:param retry_exponential_backoff: allow progressive longer waits between

retries by using exponential backoff algorithm on retry delay (delay

will be converted into seconds)

:type retry_exponential_backoff: bool

:param max_retry_delay: maximum delay interval between retries

:type max_retry_delay: datetime.timedelta

:param start_date: The ``start_date`` for the task, determines

...詳見baseoperator原始碼,注:baseoperator即基礎operator,

)

三、任務型別

  1. Bashoperator(運行方式為執行bash命令),例如:

run_this = BashOperator(

task_id='run_after_loop',

bash_command='echo 1',

dag=dag

)

注:可以通過ssh命令在遠程機器上執行腳本或命令

2.ExternalTaskSensor(可以用作dag之間依賴,感知前置dag或task執行狀態,不必重復執行上層依賴),例如:

child_1 = ExternalTaskSensor (

task_id = 'henry_1',

external_dag_id = 'henry_test',

# external_task_id = "task_1",

dag = dag

)

3.LatestOnlyOperator(只運行最新的),可以跳過在 DAG 的最近計劃運行期間未運行的任務,例如:

dag = DAG(

dag_id='latest_only_with_trigger',

schedule_interval=dt.timedelta(hours=4),

start_date=dt.datetime(2016, 9, 20),

)

latest_only = LatestOnlyOperator(task_id='latest_only', dag=dag)

4.ExternalTaskMarker(繼承自DummyOperator, 該task被clear之后,下游的依賴任務也會遞回的全都clear,默認深度10)暫時棄用,

parent_task = ExternalTaskMarker(

task_id="parent_task",

external_dag_id="example_external_task_marker_child",

external_task_id="child_task1",

dag = dag

)

5. TriggerDagRunOperator(直接觸發下游dag運行)

t2 = TriggerDagRunOperator(

task_id='trigger_dag',

trigger_dag_id='dag_1',

# 被觸發執行的dag的execution_date,str / datetime.datetime,加這個引數執行報錯...

# execution_date=datetime.datetime(2021, 3, 5, 8, 20),

# reset_dag_run=True,

# wait_for_completion=False,

# Poke interval to check dag run status when wait_for_completion=True.

# poke_interval=60,

dag=dag,

)

四、使用步驟

使用時,依次執行命令:

1)source /home/***/anaconda3/bin/activate airflow # 激活虛擬環境

2)cd /home/***/airflow/dags # 進入任務dags目錄,然后創建自己名稱的檔案夾,將任務放入自己名下便于管理

3)構建.py任務檔案

4)執行(依實際需要操作,可在web頁面操作)

* python 任務檔案確保編譯沒問題

* 運行task:airflow dags run <dag_id> <task_id> <execution_date>

* 重跑/回溯歷史任務:airflow dags backfill <dag_id>-s START_DATE -e END_DATE

五、需要解決的問題(綠色表示已解決)

1.打通airflow和任務的時間引數,讓頁面操作的時間范圍能正確帶入到任務腳本

# The execution date as YYYY-MM-DD date ="{{ ds }}"t = BashOperator( task_id='test_env', bash_command='/tmp/test.sh ', dag=dag, env={'EXECUTION_DATE': date})

這里, {{ ds }}是一個宏,并且由于BashOperator的env引數是使用 Jinja 模板化的,因此執行日期將作為 Bash 腳本中名為EXECUTION_DATE的環境變數提供,

您可以將 Jinja 模板與檔案中標記為“模板化”的每個引數一起使用,模板替換發生在呼叫運算子的 pre_execute 函式之前,

注意,由于airflow實際上接管了日期引數,多日重跑或者回溯的資料,實際上是由多個單日的任務組合而成,也就意味著原有的本身支持批量跑數的python腳本要改成單天執行的腳本,或者直接在bashoperator的bash_command中添加兩個一樣的時間引數{{ ds }},如 bash_command="python test.py {{ ds }} {{ ds }}",(不過原有腳本的一些功能可能就會發生改變,比如原來在時間范圍回圈內執行完多天統一發送郵件的,現在的效果則變成每天一封郵件),

2.任務報警(郵件需要修改組態檔,配置郵件服務,才能使任務腳本中的郵件報警生效)

直接修改airflow.cfg保存退出即可生效,不需要執行任何airflow命令,不要執行airflow initdb,

# smtp server here

smtp_host = smtp.exmail.qq.com(注意這里不要輸錯,第二個位置是exmail而不是email)

smtp_starttls = False

smtp_ssl = True

# Example: smtp_user = airflow

smtp_user = 發件用戶,一般和下面的發件人一致即可(之后改為mmribao)

# Example: smtp_password = airflow

smtp_password =

smtp_port = 465

smtp_mail_from = 發件人

任務檔案中示例:

default_args = {

'owner': '***',

'start_date': days_ago(1),

'email': ['xxx@.com'],

'email_on_failure': True,

'email_on_retry': False,

'retries': 1,

'retry_delay': timedelta(seconds=5),

}

郵件內容效果:

3.任務佇列以及優先級問題(先搭建集群后使用celery的queue)

default_args里面增加了:

'queue':'ribao','pool':'daily','priority_weight':100

之后,任務執行例外,之前會正常執行各部分task且失敗后會發送郵件,但是增加了這三個引數之后,重跑和例行都不會執行子task也不會發送失敗郵件(看起來壓根沒有按照正常的步驟執行任務),增加之后洗掉這3項引數,當天執行重跑也會產生相應的例外,第二天例行之后才會正常執行失敗并且發送郵件,

注:這里的queue和pool含義不同,假如啟動celery的worker的時候指定了 -q 引數,那么該worker就會專門被指定用來跑該queue的任務,之后提交該名稱的queue任務的時候,就會由該worker來執行,

實際使用的時候,只需要添加pool和priority_weight屬性即可實作日常需求,

4.任務命名規范

web頁面是按照字母a-z排序的,同時后幾位也會按按位比較大小,

同時dag名稱是唯一的(task_id只作用在本dag內,不同dag的taskid可以同名),所以正式的dag命名:

年月日_人名首字母_根據業務或功能自行命名,如:

20210303_人名首字母_業務

年月日首先避免了絕大多數重名風險,人名首字母進一步將名稱重名的可能性鎖定在本人任務重,極大程度減少和別人任務同名的可能,

5.是否更新到airflow2.0(使用節點7、節點8/節點9另外搭建2.0,不影響之前的單點1.10)

已解決,直接部署了2.0

6.使用celery構建集群

celery的監控頁面flower:http://***:5555,執行單位是task,同一個dag的不同task可能被分配到不同的worker執行,可以從flower頁面看到執行節點,

考慮節點7作為主節點,節點8作為子節點,節點9作為子節點,

mysql取消使用docker的原因:docker可以部署mysql服務,但是本地物理機需要安裝mysql客戶端,但是物理機安裝客戶端的時候,會對已有的mariadb進行依賴升級,而已有的mariadb的一些依賴被hadoop一些組件所依賴,害怕影響集群,所以這種方式也不保險,索性使用運維提供的二進制安裝包方式繞過依賴問題安裝mysql5.7以及客戶端,依賴問題不存在了,也就沒有使用docker的必要了,

但是經過測驗,mysql5.7及之前版本,容易發生死鎖問題,由于行級鎖,并且也不支持scheduler HA,所以直接換為docker的mysql8,但是mysql8又有其他問題,比如原始碼無法正確識別mysql8版本,導致執行不合版本的sql陳述句,故重新測驗安裝postgresql9.6/10,最后現在使用postgresql10,并啟動2個scheduler,

說明:redis使用docker搭建了哨兵,但是airflow配置broker里面需要填寫一個redis地址,但是哨兵是3個,分別監控各自的redis,并不能起到一個類似zookeeper的作用(它通過api可以告知主節點的ip,但是不能自動直接連接到主節點,所以暫時仍然在airflow.cfg里面手動填寫一個redis主節點的地址),

然后redis也更換為rabbitmq,但是未能解決web頁面的按鈕功能失效,比如重跑回溯任務偶發失敗,所以應該是airflow的bug問題,而非redis不如rabbitmq,并且,rabbitmq在使用時,celery不能很好檢測到其worker運行狀態,必須重新啟動rabbitmq和scheduler,然后worker才能作業,但是flower始終顯示worker離線,但是換成redis就立即能夠識別出worker在線狀態,后來也將result_backend也換為redis和amqp,都未能解決按鈕失效功能,并且官方強烈建議backend存入傳統意義上的資料庫,索性換回redis+postgresql組合,

最重要的還是airflow的dag腳本檔案,

老架構:

現有架構:

7.配置日志組件(先測驗hdfs路徑是否能夠使用)

應該是不行了,,

各節點的logs檔案夾的內容是不一樣的,也就是不同的機器執行任務產生的日志不同(執行什么,產生什么),

8.dag檔案必須放在啟動scheduler的節點(節點7),或者修改組態檔,目前組態檔都是本機的dags目錄,但是worker節點不檢測任務檔案,

9.目前的任務都是先登錄到節點10上然后進行操作,需要注意大批量遷移后,大量連接登錄的問題(連接數限制問題),

10.celery的flower的時區是否需要修改(需要修改celery的原始碼,最后再說)

11.測驗externaltaskmarker和externaltasksensor,在上游任務重跑后,會有怎樣的效果,并且在這兩個實體中execution_date的使用,

這兩個部件在web頁面使用有問題,客戶端命令可以較為正常運行,但是sensor仍然不會隨著marker的清理而自動重跑,基本需要客戶端手動執行命令才能正常,基本上只有第一次執行遇到前置失敗會報警, 1

12.編輯一個任務,定期清理logs檔案夾的歷史檔案,因為批量使用后,會產生大量的日志,后來查看發現,日志量大的主要是scheduler產生的,所以設定了任務,每天自動清理logs/scheduler下的當日的前天的目錄,

13.撰寫一個dags檔案夾分發的任務,每次有人更新了自己的任務檔案,就要手動重跑該任務,更新dags檔案夾到節點8和節點9.

14.任務超時設定問題,因為現有任務尤其是spark的任務通常都要幾分鐘甚至更久,但是目前airflow的默認的超時判定好像都很短,這個值要

置的大一點,

六、注意事項

  1. 不要在bash_command中使用nohup,否則airflow會認為該任務已經執行完畢,無法正常檢測結果,直接把nohup去掉執行即可,日志會自動記錄在airflow的執行日志中,
  2. 命令要在operator(task)中執行,因為不同的task可能會被分配到不同的節點分別執行,比如不要在在兩個task中間執行一個os.system(),
  3. 每新加、修改dags之后,都重跑一下sync_all_dags這個任務的最新一次,這是基礎任務,用于同步三臺幾點的任務檔案,同時也備份到hdfs了,如果忘記重跑的話,每個小時也會自動重繪一次,新任務沒同步,不會影響老任務,不同步A,直接運行A,會報錯,因為worker節點本身并沒有任務檔案,airflow本身的例子不報錯,是因為每臺節點都有例子檔案,一個檔案中的dag的任務會分到不同worker執行,是的,隨機的,并且調度節點本身不執行任務, 步驟:建立.py任務檔案,用airflow環境的python執行該檔案編譯,在airflow的web頁面將sync_all_dags的最后一次任務的狀態置為clear重跑,
  4. 一些會引發Scheduler行程退出的操作,務必避免:(1)List Dag Run 頁面,標記一個已完成任務為running后,再洗掉該任務;
  5. 有時修改完老的dag,web頁面會顯示不正常,即便洗掉也不正常,可用客戶端命令進行一次回溯,頁面便可恢復正常,
  6. 手動觸發dag會改變最新的execution_date, 打亂預定執行計劃,可通過airflow dags next-execution <dag_id> 查看下次調度時間
  7. Airflow調度dag時,將dag檔案中配置的start_date(當interval是間隔)或者start_date后第一個滿足cron運算式的時間(當interval是cron運算式)視為基準時間,前者第一次實際運行的時間為:start_date加上一個周期的scheduler_interval,而后者第一次實際運行時間是start_date后第二個滿足cron運算式的時間,之后的調度根據上一次的execution_date來進行,就不再依賴dag檔案中的配置,
  8. utc和cst時間:和execution有關的時間基本都是utc(是celery的時區)時間,需要減去8小時,注意當start_date加上一個周期的scheduler_interval是utc時間,比如start_date=days_ago(1)+scheduler_interval='0 0 * * *'的時候,實際執行時間是今早的8:00,所以scheduler_interval要減去8小時,如果跨天比如想設定一個凌晨的2:19,那么就把原來的days_ago()括號里面加一,然后scheduler_interval設定為"19 18 * * *",

轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/274881.html

標籤:其他

上一篇:分布式事務

下一篇:Mybatis復習小結

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 面試突擊第一季,第二季,第三季

    第一季必考 https://www.bilibili.com/video/BV1FE411y79Y?from=search&seid=15921726601957489746 第二季分布式 https://www.bilibili.com/video/BV13f4y127ee/?spm_id_fro ......

    uj5u.com 2020-09-10 05:35:24 more
  • 第三單元作業總結

    1.前言 這應該是本學期最后一次寫作業總結了吧。總體來說,對作業的節奏也差不多掌握了,作業做起來的效率也更高了。雖然和之前的作業一樣,作業中都要用到新的知識,但是相比之前,更加懂得了如何利用工具以及資料。雖然之間卡過殼,但總體而言,這幾次作業還算完成的比較好。 2.作業程序總結 相比前兩個單元,此單 ......

    uj5u.com 2020-09-10 05:35:41 more
  • 北航OO(2020)第四單元博客作業暨課程總結博客

    北航OO(2020)第四單元博客作業暨課程總結博客 本單元作業的架構設計 在本單元中,由于UML圖具有比較清晰的樹形結構,因此我對其中需要進行查詢操作的元素進行了包裝,在樹的父節點中存盤所有孩子的參考。考慮到性能問題,我采用了快取機制,一次查詢后盡可能快取已經遍歷過的資訊,以減少遍歷次數。 本單元我 ......

    uj5u.com 2020-09-10 05:35:48 more
  • BUAA_OO_第四單元

    一、UML決議器設計 ? 先看下題目:第四單元實作一個基于JDK 8帶有效性檢查的UML(Unified Modeling Language)類圖,順序圖,狀態圖分析器 MyUmlInteraction,實際上我們要建立一個有向圖模型,UML中的物件(元素)可能與同級元素連接,也可與低級元素相連形成 ......

    uj5u.com 2020-09-10 05:35:54 more
  • 6.1邏輯運算子

    邏輯運算子 1. && 短路與 運算式1 && 運算式2 01.運算式1為true并且運算式2也為true 整體回傳為true 02.運算式1為false,將不會執行運算式2 整體回傳為false 03.只要有一個運算式為false 整體回傳為false 2. || 短路或 運算式1 || 運算式2 ......

    uj5u.com 2020-09-10 05:35:56 more
  • BUAAOO 第四單元 & 課程總結

    1. 第四單元:StarUml檔案決議 本單元采用了圖模型決議UML。 UML檔案可以抽象為圖、子圖、邊的邏輯結構。 在實作中,圖的節點包括類、介面、屬性,子圖包括狀態圖、順序圖等。 采用了三次遍歷UML元素的方法建圖,第一遍遍歷建點,第二、三次遍歷設定屬性、連邊,實作圖物件的初始化。這里借鑒了一些 ......

    uj5u.com 2020-09-10 05:36:06 more
  • 談談我對C# 多型的理解

    面向物件三要素:封裝、繼承、多型。 封裝和繼承,這兩個比較好理解,但要理解多型的話,可就稍微有點難度了。今天,我們就來講講多型的理解。 我們應該經常會看到面試題目:請談談對多型的理解。 其實呢,多型非常簡單,就一句話:呼叫同一種方法產生了不同的結果。 具體實作方式有三種。 一、多載 多載很簡單。 p ......

    uj5u.com 2020-09-10 05:36:09 more
  • Python 資料驅動工具:DDT

    背景 python 的unittest 沒有自帶資料驅動功能。 所以如果使用unittest,同時又想使用資料驅動,那么就可以使用DDT來完成。 DDT是 “Data-Driven Tests”的縮寫。 資料:http://ddt.readthedocs.io/en/latest/ 使用方法 dd. ......

    uj5u.com 2020-09-10 05:36:13 more
  • Python里面的xlrd模塊詳解

    那我就一下面積個問題對xlrd模塊進行學習一下: 1.什么是xlrd模塊? 2.為什么使用xlrd模塊? 3.怎樣使用xlrd模塊? 1.什么是xlrd模塊? ?python操作excel主要用到xlrd和xlwt這兩個庫,即xlrd是讀excel,xlwt是寫excel的庫。 今天就先來說一下xl ......

    uj5u.com 2020-09-10 05:36:28 more
  • 當我們創建HashMap時,底層到底做了什么?

    jdk1.7中的底層實作程序(底層基于陣列+鏈表) 在我們new HashMap()時,底層創建了默認長度為16的一維陣列Entry[ ] table。當我們呼叫map.put(key1,value1)方法向HashMap里添加資料的時候: 首先,呼叫key1所在類的hashCode()計算key1 ......

    uj5u.com 2020-09-10 05:36:38 more
最新发布
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:20:47 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:20:25 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:20:17 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:20:10 more
  • 【中介者設計模式詳解】C/Java/JS/Go/Python/TS不同語言實作

    * 中介者模式是一種行為型設計模式,它可以用來減少類之間的直接依賴關系,
    * 將物件之間的通信封裝到一個中介者物件中,從而使得各個物件之間的關系更加松散。
    * 在中介者模式中,物件之間不再直接相互互動,而是通過中介者來中轉訊息。 ......

    uj5u.com 2023-04-20 08:19:44 more
  • 露天煤礦現場調研和交流案例分享

    他們集團的資訊化公司及研究院在一個礦區正在做智能礦山的統一平臺的 試點,專案投資大概1億,包括了礦山的各方面的內容,顯示得我們這次交流有點多余。他們2年前開始做智能礦山的規劃,有很多煤礦行業專家的加持,他們的描述是非常完美,但是去年底應該上線的平臺,現在還沒有看到影子。他們確實有很多場景需求,但是被... ......

    uj5u.com 2023-04-20 08:19:07 more
  • 《社區人員管理》實戰案例設計&個人案例分享

    設計是一個讓人夢想成真程序,開始編碼、測驗、除錯之前進行需求分析和架構設計,才能保證關鍵方面都做正確 ......

    uj5u.com 2023-04-20 08:18:57 more
  • 軟體架構生態化-多角色交付的探索實踐

    作為一個技術架構師,不僅僅要緊跟行業技術趨勢,還要結合研發團隊現狀及痛點,探索新的交付方案。在日常中,你是否遇到如下問題 “ 業務需求排期長研發是瓶頸;非研發角色感受不到研發技改提效的變化;引入ISV 團隊又擔心質量和安全,培訓周期長“等等,基于此我們探索了一種新的技術體系及交付方案來解決如上問題。 ......

    uj5u.com 2023-04-20 08:18:49 more
  • 05單件模式

    #經典的單件模式 public class Singleton { private static Singleton uniqueInstance; //一個靜態變數持有Singleton類的唯一實體。 // 其他有用的實體變數寫在這里 //構造器宣告為私有,只有Singleton可以實體化這個類! ......

    uj5u.com 2023-04-19 08:42:51 more
  • 【架構與設計】常見微服務分層架構的區別和落地實踐

    軟體工程的方方面面都遵循一個最基本的道理:沒有銀彈,架構分層模型更是如此,每一種都有各自優缺點,所以請根據不同的業務場景,并遵循簡單、可演進這兩個重要的架構原則選擇合適的架構分層模型即可。 ......

    uj5u.com 2023-04-19 08:42:41 more