我正在嘗試弄清楚如何/如果可能的話,在來自資料塊的遠程 sql server 中執行截斷表命令。我正在為 ETL 腳本使用資料塊,但它正在加載到遠程 ms sql 服務器中。
原始腳本截斷表,然后重復追加到它。它像這樣截斷它:
engine.execution_options(autocommit=True).execute("TRUNCATE TABLE my_table;")
我不知道如何使用 pyspark 復制它。我試圖避免做類似的事情:
first_iteration = True
for item in items_to_query:
df = f(...)
if first_iteration:
df.write.option("mode","overwrite")....
first_iteration = False
else:
df.write.option("mode","append")...
如果我能有類似的東西會更好
truncate_remote_table("table","database")
for item in items_to_query:
df = f(...)
df.write.option("mode","append")....
我希望我解釋得很好。如果你想推薦完全不同的做法,那很好。只是我和很多人一起作業,他們(理所當然地)害怕/很容易害怕將腳本移動到資料塊,所以我真的很想在每一步都盡可能少地改變。搖滾和艱難的地方。
我在谷歌上搜索過,但搜索結果似乎總是從現有資料框開始,然后讓它執行 mode="overwrite" 以截斷表格。Nothing 只是一個簡單的“TRUNCATE TABLE”命令。
uj5u.com熱心網友回復:
首先,我會說除了 SQL Server 本身之外,我對您提到的任何技術一無所知。但是在 SQL Server 上的每個資料庫中,都有一個名為系統存盤程序的系統存盤程序sp_executesql,它允許您構造 SQL 字串并執行它。
如果您能夠呼叫遠程 SQL Server 上的存盤程序,那么您可以呼叫sp_executesql 'USE DatabaseName; TRUNCATE TABLE my_table;'以截斷您想要的任何表。由于資料庫名稱是在查詢的子句中指定的,因此您可以從服務器上的任何資料庫USE執行該程序的任何實體。sp_executesql
uj5u.com熱心網友回復:
你有幾個選擇......
選項 01:通過 JDBC 連接截斷表。
您可以創建到目標 SQL Server 表的 JDBC 連接。按照下面的鏈接,您應該能夠將 TRUNCATE 陳述句傳遞給 JDBC 連接。將 QUERY 陳述句替換為截斷陳述句。即使它是截斷陳述句,也一定要提交。
https://www.tutorialspoint.com/jdbc/jdbc-delete-records.htm
選項 02:下推截斷查詢
根據下面鏈接中 Peter Pan 的回答,您可以使用相同的邏輯將截斷陳述句推送到 SQL Server 表。將該陳述句別名為 dbtable。
https://stackoverflow.com/a/58629994/13280838
選項 03:使用空資料幀截斷
按照下面的鏈接 How to truncate a table in PySpark?
您可以創建一個空資料框,然后使用它來截斷表。請注意,當 truncate 沒有按我預期的那樣作業時,有一些注意事項。還使用不帶截斷的覆寫 - 根據我有限的知識洗掉表并且不會重新創建索引,因此在使用時請小心。
一如既往,即使在實際的 DEV 表中實作任何東西之前,請務必嘗試使用一次性臨時表進行測驗。希望能幫助到你。
uj5u.com熱心網友回復:
正如其他帖子中提到的那樣,有幾種方法可以解決這個問題:
- 直接使用 JDBC 來執行您的代碼。在 PySpark 中,您將需要使用類似這樣的東西(來自這個答案)通過 JVM 網關:
driver_manager = spark._sc._gateway.jvm.java.sql.DriverManager
connection = driver_manager.getConnection(mssql_url, mssql_user, mssql_pass)
connection.prepareCall("TRUNCATE TABLE my_table").execute()
connection.close()
- 在@rainingdistros 指出的模式下使用空資料幀進行截斷
overwrite。唯一需要注意的是,默認情況下它使用洗掉/創建新方法,因此您會丟失索引。但這是通過將truncate選項設定為true而不是默認值來控制的false(請參閱檔案):
# get a dataframe with table schema
df = spark.read.jdbc(....)
# truncate the table
df.limit(0).write.mode("overwrite").option("truncate", "true").jdbc(...)
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/533824.html
