我正在嘗試讀取突觸表,其中列名中有空格。
讀取表一直在作業,直到我選擇沒有空格或特殊字符的列:
%%spark
val df = spark.read.synapsesql("<Pool>.<schema>.<table>").select("TYPE", "Year").limit(100)
df.show()
輸出:
------ ----
| TYPE|Year|
------ ----
|BOUGHT|LAST|
|BOUGHT|LAST|
|BOUGHT|LAST|
|BOUGHT|LAST|
當我開始選擇帶有空格的列時,出現錯誤。我嘗試了很多變體:
.select(col("""`Country Code`"""))
.select(col("`Country Code`"))
.select(col("""[Country Code]"""))
.select(col("Country Code"))
.select($"`Country Code`")
.select("`Country Code`")
.select("""`Country Code`""")
將回傳此錯誤: 錯誤:com.microsoft.sqlserver.jdbc.SQLServerException:列名“國家/地區”無效。
`例如,如果我在選擇中省略:
.select("[Country Code]")
錯誤:com.microsoft.sqlserver.jdbc.SQLServerException:無效的列名“[國家/地區代碼]”。
在突觸中使用反勾號 spark 只需將第一個單詞作為列。
有什么經驗嗎?
uj5u.com熱心網友回復:
在select其自己的意愿作業,加入show(或任何其他行動像count)不會。Synapse synapsesqlAPI似乎確實存在問題。Invalid column name 'country' 錯誤來自 SQL 引擎,因為似乎無法將方括號傳回給它。此外,鑲木地板檔案不支持列名稱中的空格,因此它可能已連接。
解決方法是在列名中不使用空格。如果需要,請修復之前 Synapse 管道步驟中的表。我會研究一下,但可能沒有其他答案。
如果要重命名資料庫中的現有列,可以使用sp_rename,例如
EXEC sp_rename 'dbo.countries.country Type', 'countryType', 'COLUMN';
此代碼已在 Synapse 專用 SQL 池上進行了測驗。
sysnapsesql.read不幸的是,那個特定的 API ( ) 無法處理視圖。您必須實作它,例如在之前的 Synapse Pipeline 步驟中使用 CTAS。API 對簡單模式很有用(獲取表 -> 處理 -> 放回),但非常有限。您甚至無法管理表分布(散列、round_robin、復制)或索引(聚集列存盤、聚集索引、堆)或磁區,但您永遠不知道它們有一天會添加到其中。無論如何,我會在下一次 MS 會議期間密切關注。
uj5u.com熱心網友回復:
我已經創建了使用JDBC. 感謝這一點,我能夠從視圖中閱讀。我已經添加了如何從 KeyVault 獲取密碼的 saplme 代碼,使用TokenLibrary.
def spark_query(db, query):
jdbc_hostname = "<synapse_db>.sql.azuresynapse.net"
user = "<spark_db_client>"
password = "<strong_password>"
# password_from_kv = TokenLibrary.getSecret("<Linked_Key_Vault_Service_Name>", "<Key_Vault_Key_Name>", "<Key_Vault_Name>")
return spark.read.format("jdbc") \
.option("url", f"jdbc:sqlserver://{jdbc_hostname }:1433;databaseName={db};user={user};password={password}") \
.option("query", query) \
.load()
然后我創建VIEW了沒有空格的列名:
CREATE VIEW v_my_table
AS
SELECT [Country code] as country_code from my_table
授予訪問權限<spark_db_client>:
GRANT SELECT ON v_my_table to <spark_db_client>
在整個準備之后,我能夠從 VIEW 讀取表并保存到spark資料庫:
query = """
SELECT country_code FROM dbo.v_my_table
"""
df = spark_query(db="<my_database>", query=query)
spark.sql("CREATE DATABASE IF NOT EXISTS spark_poc")
df.write.mode("overwrite").saveAsTable("spark_poc.my_table")
df.registerTempTable("my_table")
這是 <placeholder_variables>
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/322866.html
