所以這是我的問題:氣流,因為執行一個呼叫 php 腳本(Symfony)的 Python 檔案來執行創建的命令。該命令本身作業正常。后者從 Airflow 的執行也很有效(通過 Airflow 的可視化顯示:運行成功)。
當我在 symfony 命令(exit(1)、throw、...)中導致錯誤時,為了查看 Airflow 的反應,它總是顯示成功。我如何讓他明白腳本不起作用?
這是python代碼:
dag = DAG(
'name fill python',
default_args={
'start_date': datetime(2022, 8, 1),
},
max_active_runs = 1,
description='one description',
schedule_interval='0 7 * * 1',
tags = ["tag1", "tag2"]
)
t1 = SimpleHttpOperator(
http_conn_id='name fill json',
task_id='name task',
endpoint='url route symfony',
method='GET',
data={},
headers={},
dag=dag
)
t1
Symfony 命令的路徑:
public function ImportMasse(string $csv, KernelInterface $kernel)
{
$application = new Application($kernel);
$application->setAutoExit(false);
$input = new ArrayInput([
'command' => 'Import',
'class_name' => $csv,
]);
$output = new BufferedOutput();
$application->run($input, $output);
if(stristr($output->fetch(), '0 errors')){
return new Response(true);
}else{
return new Response(false);
}
}
謝謝你的回歸。
uj5u.com熱心網友回復:
SimpleHttpOperator允許您對請求回應物件進行檢查。如果檢查回傳 false,AirflowException則會引發并且操作員將失敗。
例子:
def check_func(response):
# Implement your check logic here
if condition_is_ok:
return True
return False
SimpleHttpOperator(
...,
response_check=lambda response: True if check_func(response) is True else False
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/513426.html
標籤:php交响乐空气流动
