我目前正在使用 Airflow(版本:1.10.10),
我有興趣創建一個每小時運行一次的 DAG,
這將帶來一個 Docker 容器的使用資訊(磁盤使用情況),
(可通過 docker CLI 命令 (df -h) 獲得的資訊)。
我明白:“如果 xcom_push 為 True,那么當 bash 命令完成時,寫入 stdout 的最后一行也將被推送到 XCom”
但我的目標是從 bash 命令中獲取特定值,而不是最后一行。
例如,我想得到這條線(見 screeeshot)
“tmpfs 6.2G 0 6.2G 0% /sys/fs/cgroup”
進入我的 Xcom 值,以便我可以編輯并從中提取特定值,
如何將 Xcom 值推送到 PythonOperator,以便我可以對其進行編輯?
我在下面添加了我的示例 DAG 腳本,

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="df -h", xcom_push=True)
bash_task
是否適用?非常感謝,
uj5u.com熱心網友回復:
你應該做的作業:
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime,timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.sensors.file_sensor import FileSensor
from airflow.operators.bash_operator import BashOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
default_args = {
'retry': 5,
'retry_delay': timedelta(minutes=5)
}
with DAG(dag_id='bash_dag', schedule_interval="@once", start_date=datetime(2020, 1, 1), catchup=False) as dag:
# Task 1
bash_task = BashOperator(task_id='bash_task', bash_command="docker stats --no-stream --format '{{ json .}}' <container-id>", xcom_push=True)
bash_task
uj5u.com熱心網友回復:
可以通過outputoperator的屬性獲取推送到XCom store的值。
在下面的代碼片段中,bash.output是一個XComArg,在執行任務實體時將被拉取并作為可呼叫函式的第一個引數傳遞。
from airflow.models.xcom_arg import XComArg
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
from airflow.models import DAG
with DAG(dag_id='bash_dag') as dag:
bash_task = BashOperator(
task_id='bash_task', bash_command="df -h", xcom_push=True)
def format_fun(stat_terminal_output):
pass
format_task = PythonOperator(
python_callable=format_fun,
task_id="format_task",
op_args=[bash_task.output],
)
bash_task >> format_task
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/344454.html
上一篇:dockerrun停頓
