我正在嘗試創建一個繼承自 Airflow BaseOperator 的 CustomBaseOperator。CustomBaseOperator 作業正常,但是當我嘗試創建繼承自 CustomBaseOperator 的 ChildOperator 時,Airflow 將其視為 CustomBaseOperator。
CustomBaseOperator 有一個如下所示的執行函式:
def make_request(self):
print("Parent request")
def execute(self, context):
self.make_request()
在子運算子中,我重新定義了 make_request:
class ChildOperator(CustomBaseOperator):
@apply_defaults
def make_request(self):
print("Child Request")
每當我運行使用 ChildOperator 的任務時,它都會列印“父請求”,并且圖例將其顯示為 CustomBaseOperator ...我的操作員位于“插件”檔案夾中的“操作員”檔案夾中。我猜在該檔案夾中創建自定義運算子時,我只能從“官方”運算子繼承。你知道我怎樣才能使繼承作業嗎?
uj5u.com熱心網友回復:
這有效:
import datetime
from airflow import DAG
from airflow.models import BaseOperator
class CustomBaseOperator(BaseOperator):
def make_request(self):
print("Parent request")
def execute(self, context):
self.make_request()
class ChildOperator(CustomBaseOperator):
def make_request(self):
print("Child Request")
with DAG(dag_id="test_dag", start_date=datetime.datetime(2022, 1, 1), schedule_interval=None) as dag:
test = ChildOperator(task_id="test")
這將列印Child Request并顯示ChildOperator在 Airflow UI 中。另外,請注意該設定@apply_defaults自 Airflow 2.0 以來已棄用,現在它會自動應用。
uj5u.com熱心網友回復:
我發現了問題:我正在使用一個回圈到 locals() 的函式來為每個引數設定 self.param = param。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/414099.html
標籤:
