這是我嘗試在 apache-airflow UI 中運行程式時顯示的錯誤日志
ERROR [airflow.models.dagbag.DagBag] Failed to import: /d/Program Files/meta airflow/dags/csv-json.py
Traceback (most recent call last):
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/dagbag.py", line 331, in _load_modules_from_file
loader.exec_module(new_module)
File "<frozen importlib._bootstrap_external>", line 678, in exec_module
File "<frozen importlib._bootstrap>", line 219, in _call_with_frames_removed
File "/d/Program Files/meta airflow/dags/csv-json.py", line 39, in <module>
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
File "/home/siva/.local/lib/python3.6/site-packages/airflow/models/baseoperator.py", line 145, in apply_defaults
raise AirflowException("Use keyword arguments when initializing operators")
airflow.exceptions.AirflowException: Use keyword arguments when initializing operators
Initialization done
該程式是關于將 csv 資料集轉換為結構化/嵌套 json 檔案的代碼
import json
import csv
import os
import pandas as pd
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from datetime import datetime, timedelta
from airflow.models import Variable
AIRFLOW_HOME = os.getenv('AIRFLOW_HOME')
csv_file=Variable.get("csv_path")
def load_csv_data():
with open(csv_file,'r') as file:
data = pd.read_csv(file,index_col='show_id')
return("dataframe created")
def process_into_json(data):
data.groupby(['show_id','type','title'])
data.apply(lambda x: x[['director','cast','country','date_added','release_year','rating','duration','listed_in']].to_dict('records'))
data.reset_index()
data.rename(columns={0:'details'})
return ("processing complete")
def store_into_json(data):
data.to_json('data\sample-supermarket.json',indent=5,orient='records')
return(" storing done")
default_args = {'owner': 'airflow','start_date': datetime(2022, 4, 4),}
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator('fetch_data',python_callable=load_csv_data(),dag=dag)
processdata=PythonOperator('process_data',python_callable=process_into_json(),dag=dag)
loaddata=PythonOperator('load_data',python_callable=store_into_json(),dag=dag)
start=DummyOperator("start",dag=dag)
dead=DummyOperator("dead",dag=dag)
end=DummyOperator("end",dag=dag)
start>>collectdata>>[processdata>>loaddata,dead]>>end
檢查代碼并給我一個錯誤的解決方案,如果有更好的方法來撰寫這個程式,請提出你的意見,這是我使用的資料集鏈接
uj5u.com熱心網友回復:
您的代碼中有兩個錯誤:
初始化運算子時不要使用關鍵字引數:使用
PythonOperator(task_id='fetch_data', ...)而不是PythonOperator('fetch_data', ...)您的
python_callable引數必須是可呼叫的:使用PythonOperator(...,python_callable=load_csv_data, ...)而不是PythonOperator(...,python_callable=load_csv_data(), ...)
with DAG("csv-to-json-conversion",
schedule_interval='@daily',
start_date=datetime(2022, 4, 4),
default_args=default_args,
tags=['conversion_of_data_file']) as dag:
fetchdata=PythonOperator(task_id='fetch_data',python_callable=load_csv_data,dag=dag)
processdata=PythonOperator(task_id='process_data',python_callable=process_into_json,dag=dag)
loaddata=PythonOperator(task_id='load_data',python_callable=store_into_json,dag=dag)
start=DummyOperator(task_id="start",dag=dag)
dead=DummyOperator(task_id="dead",dag=dag)
end=DummyOperator(task_id="end",dag=dag)
start>>fetchdata>>[processdata>>loaddata,dead]>>end
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/467842.html
下一篇:Flutter列布局
