我正在尋找使用膠水將關系資料庫中的多個表攝取到 s3。表詳細資訊存在于組態檔中。組態檔是一個json檔案。擁有一個可以遍歷多個表名并將這些表攝取到 s3 中的代碼會很有幫助。膠水腳本是用python撰寫的(pyspark)
這是組態檔的示例:
{"main_key":{
"source_type": "rdbms",
"source_schema": "DATABASE",
"source_table": "DATABASE.Table_1",
}}
uj5u.com熱心網友回復:
只需撰寫一個普通的 for 回圈來回圈您的資料庫配置,然后按照Spark JDBC 檔案按順序連接到它們中的每一個。
uj5u.com熱心網友回復:
假設您的 Glue 作業可以連接到資料庫,并且已向其中添加了 Glue 連接。這是從我的腳本中提取的一個示例,它執行類似的操作,您需要更新適用于您的資料庫的 jdbc url 格式,該格式使用 sql server、獲取組態檔的實作細節、回圈專案等。
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from datetime import datetime
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
jdbc_url = f"jdbc:sqlserver://{hostname}:{port};databaseName={db_name}"
connection_details = {
"user": 'db_user',
"password": 'db_password',
"driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
}
tables_config = get_tables_config_from_s3_as_dict()
date_partition = datetime.today().strftime('%Y%m%d')
write_date_partition = f'year={date_partition[0:4]}/month={date_partition[4:6]}/day={date_partition[6:8]}'
for key, value in tables_config.items():
table = value['source_table']
df = spark.read.jdbc(url=jdbc_url, table=table, properties=connection_details)
write_path = f's3a://bucket-name/{table}/{write_date_partition}'
df.write.parquet(write_path)
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/342434.html
