我有一個 SQL 查詢,我正在嘗試將其轉換為 PySpark。在 SQL 查詢中,我們連接兩個表并更新條件匹配的列。SQL 查詢如下所示:
UPDATE [STUDENT_TABLE] INNER JOIN [COLLEGE_DATA]
ON ([STUDENT_TABLE].UNIQUEID = COLLEGE_DATA.PROFESSIONALID)
AND ([STUDENT_TABLE].[ADDRESS] = COLLEGE_DATA.STATE_ADDRESS)
SET STUDENT_TABLE.STUDENTINSTATE = "REGULAR"
WHERE (((STUDENT_TABLE.BLOCKERS) Is Null));
uj5u.com熱心網友回復:
示例輸入:
from pyspark.sql import functions as F
df_stud = spark.createDataFrame(
[(1, 'x', None, 'REG'),
(2, 'y', 'qwe', 'REG')],
['UNIQUEID', 'ADDRESS', 'BLOCKERS', 'STUDENTINSTATE'])
df_college = spark.createDataFrame([(1, 'x'), (2, 'x')], ['PROFESSIONALID', 'STATE_ADDRESS'])
您的查詢將僅更新第一行df_stud- “STUDENTINSTATE”列中的值將變為“REGULAR”。
在以下腳本中,我們執行join,然后執行 中的select所有列df_stud,但必須更新的列“STUDENTINSTATE”除外。如果列“PROFESSIONALID”(來自df_college)不為空(即滿足連接條件),則該列獲取值“REGULAR”。如果不滿足連接條件,則不應更新該值,因此按原樣從“STUDENTINSTATE”列中獲取。
join_on = (df_stud.UNIQUEID == df_college.PROFESSIONALID) & \
(df_stud.ADDRESS == df_college.STATE_ADDRESS) & \
df_stud.BLOCKERS.isNull()
df = (df_stud.alias('a')
.join(df_college.alias('b'), join_on, 'left')
.select(
*[c for c in df_stud.columns if c != 'STUDENTINSTATE'],
F.expr("nvl2(b.PROFESSIONALID, 'REGULAR', a.STUDENTINSTATE) STUDENTINSTATE")
)
)
df.show()
# -------- ------- -------- --------------
# |UNIQUEID|ADDRESS|BLOCKERS|STUDENTINSTATE|
# -------- ------- -------- --------------
# | 1| x| null| REGULAR|
# | 2| y| qwe| REG|
# -------- ------- -------- --------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/519899.html
