我們有一個應用程式在一個 pod 中運行,我想用氣流觸發。該應用程式與大量物體一起運行并花費大量時間。我們設定的本質是其中一些可能會失敗,我們希望能夠僅使用一個或幾個物體重新運行:
my_program # Run full application
my_program -e entity1 -e entity2 # Run application limited to entity1 and entety2.
我的計劃是允許用戶使用 Airflow UI 中的“使用配置觸發”的物體串列再次觸發 DAG,并使用{{ dag_run.conf }}選項限制 DAG 。
我現在面臨的問題是KubernetesPodOperator需要一個字串串列,我不明白如何使用 jinja 來構造一個我以前不知道它的長度的串列。
這是我嘗試過的,但是當然 jinja 不會被模板化。我了解如何將模板化字串插入串列中,但是現在當我事先不知道串列的長度時我該怎么做。
with DAG(
"my_dag",
description="Run my dag",
schedule_interval="@daily",
start_date=datetime.datetime(2021, 10, 14),
default_args=default_args,
) as dag:
entities = """{%- for entity in dag_run.conf['entities'] -%} -p {{ entity }} {% endfor %}"""
arguments = list(filter(None, ['my_program', *entities.split(' ')]))
t1 = KubernetesPodOperator(
task_id="my_task_id",
image="url_to_docker_image:latest",
name="my_task_name",
arguments=arguments,
is_delete_operator_pod=True,
env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
)
編輯:這是我第二次嘗試使用 jinja 和 render_template_as_native_obj=True,
with DAG(
"my_dag",
description="Run my dag",
schedule_interval="@daily",
start_date=datetime.datetime(2021, 10, 14),
default_args=default_args,
render_template_as_native_obj=True,
) as dag:
arguments = """['my_program', {% if entities is defined %}
{%- for entity in entities-%} '-p', '{{ entity }}', {% endfor %}
{%- endif %}]
"""
t1 = KubernetesPodOperator(
task_id="my_task_id",
image="url_to_docker_image:latest",
name="my_task_name",
arguments=arguments, # type: ignore
is_delete_operator_pod=True,
env_vars={"AIRFLOW_RUN_ID": "{{ run_id }}"},
)
但這似乎沒有正確轉換為串列:
HTTP response body: {"kind":"Status","apiVersion":"v1","metadata":{},"status":"Failure","message":"Pod in version \"v1\" cannot be handled as a Pod: v1.Pod.Spec: v1.PodSpec.Containers: []v1.Container: v1.Container.Args: []string: decode slice: expect [ or n, but found \", error found in #10 byte of ...|{\"args\": \"['my_program|...
uj5u.com熱心網友回復:
第二種方法經過微小的調整。當然變數并沒有在本例中(經過了精簡)可用,引數是從獲取dag_run.conf['entities'],而不是只entities。
第二個問題是什么是 jinja 轉換為 python 物件的有效輸入,我必須洗掉字串末尾的空格以及洗掉換行符:
arguments = """['my_program', {% if dag_run.conf['entities'] is defined %}
{%- for entity in dag_run.conf['entities']-%} '-p', '{{ entity }}', {% endfor %}
{%- endif %}]
""".replace('\n','').strip()
uj5u.com熱心網友回復:
您第二次嘗試是在正確的軌道上,但引數變數中的模板在最后一個物體的末尾有一個額外的逗號 (',')。
import jinja2
from jinja2.nativetypes import NativeEnvironment
env = NativeEnvironment()
template = env.from_string(arguments)
print (template.render(entities=range(5)) )
輸出: ['my_program', '-p', '0', '-p', '1', '-p', '2', '-p', '3', '-p', '4', ]
如果您將引數變數更改為:
arguments = """
['my_program' {% if entities is defined %}
{%- for entity in entities-%}, '-p', '{{ entity }}' {% endfor %}
{%- endif %}]
"""
輸出現在是一個字串,Jinja 可以將其轉換為 python 陣列:
['my_program' , '-p', '0' , '-p', '1' , '-p', '2' , '-p', '3' , '-p', '4' ]
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/326447.html
下一篇:引數包排序(或等效行為)
