我正在使用 Airflow v2.2.3 并且apache-airflow-providers-elasticsearch==2.1.0在 Kubernetes 中運行。
我們的日志會自動發送到 Elasticsearch v7.6.2。
我在 Airflow 中設定了以下配置:
AIRFLOW__LOGGING__REMOTE_LOGGING=True
AIRFLOW__ELASTICSEARCH__HOST=<my-elasticsearch-host>:9200
AIRFLOW__ELASTICSEARCH__WRITE_STDOUT=True
AIRFLOW__ELASTICSEARCH__JSON_FORMAT=True
AIRFLOW__ELASTICSEARCH__LOG_ID_TEMPLATE={dag_id}-{task_id}-{execution_date}-{try_number}
我將標準輸出中的日志視為 json:
{
"asctime": "2022-01-20 12:13:52,292",
"filename": "taskinstance.py",
"lineno": 1032,
"levelname": "INFO",
"message": "Dependencies all met for <TaskInstance: spark_jobs_8765280.check_rawevents_output scheduled__2022-01-19T07:00:00 00:00 [queued]>",
"offset": 1642680832292384000,
"dag_id": "spark_jobs_8765280",
"task_id": "check_raweven ts_output",
"execution_date": "2022_01_19T07_00_00_000000",
"try_number": "1",
"log_id": "spark_jobs_8765280-check_rawevents_output-2022_01_19T07_00_00_000000-1"
}
我也確實在 Elasticsearch 中看到了這些領域——到目前為止一切都很好。現在我希望通過 Airflow 的 Web 服務器 UI 看到這些日志,但什么也沒發生(airflow 無法檢索日志)。
當我嘗試手動更新日志的檔案并將“log_id”的格式更改為:
"log_id": "spark_jobs_8765280-check_rawevents_output-2022_01_19T07_00_00_000000-1"
到
"log_id": "spark_jobs_8765280-check_rawevents_output-2022-01-19T07:00:00 00:00-1"
我確實在 UI 中看到了日志!
為什么 Airflow 的 UI 會嘗試log_id使用未轉義log_id的而不是正確的?
PS - 這是 Airflow 網路服務器的日志:
[2022-01-20 14:08:21,170] {base.py:270} INFO - POST http://<my-elasticsearch-host>:9200/_count [status:200 request:0.008s]
10.1.19.65 - - [20/Jan/2022:14:08:21 0000] "GET /get_logs_with_metadata?dag_id=spark_jobs_8765280&task_id=check_rawevents_output&execution_date=2022-01-19T07:00:00+00:00&try_number=1&metadata={"end_of_log":false,"last_log_timestamp":"2022-01-20T14:05:55.439492+00:00","offset":"1691"} HTTP/1.1" 200 119 "https://<my-airflow-webserver-host>/log?dag_id=spark_jobs_8765280&task_id=check_rawevents_output&execution_date=2022-01-19T07:00:00+00:00" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/97.0.4692.71 Safari/537.36"
uj5u.com熱心網友回復:
我發現了問題。我有一個單獨的 pod 用于氣流調度程式和氣流網路服務器。
我將AIRFLOW__ELASTICSEARCH__JSON_FORMAT=True唯一添加到氣流調度程式 作業人員,但沒有添加到氣流網路服務器。
我深入到源代碼,我發現Web服務器檢查的AIRFLOW__ELASTICSEARCH__JSON_FORMAT,以及以變換的log_id以清潔日期以正確的格式:
if self.json_format:
data_interval_start = self._clean_date(dag_run.data_interval_start)
data_interval_end = self._clean_date(dag_run.data_interval_end)
execution_date = self._clean_date(dag_run.execution_date)
else:
data_interval_start = dag_run.data_interval_start.isoformat()
data_interval_end = dag_run.data_interval_end.isoformat()
execution_date = dag_run.execution_date.isoformat()
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/419821.html
標籤:
上一篇:串列之間的交集長度串列串列
