我有一個 SQL 查詢,我正在嘗試將其轉換為 PySpark。在 SQL 查詢中,我們連接三個表并更新匹配的列。SQL 查詢如下所示:
UPDATE [DEPARTMENT_DATA]
INNER JOIN ([COLLEGE_DATA]
INNER JOIN [STUDENT_TABLE]
ON COLLEGE_DATA.UNIQUEID = STUDENT_TABLE.PROFESSIONALID)
ON DEPARTMENT_DATA.PUBLICID = COLLEGE_DATA.COLLEGEID
SET STUDENT_TABLE.PRIVACY = "PRIVATE"
我試過的邏輯:
df_STUDENT_TABLE = (
df_STUDENT_TABLE.alias('a')
.join(
df_COLLEGE_DATA('b'),
on=F.col('a.PROFESSIONALID') == F.col('b.UNIQUEID'),
how='left',
)
.join(
df_DEPARTMENT_DATA.alias('c'),
on=F.col('b.COLLEGEID') == F.col('c.PUBLICID'),
how='left',
)
.select(
*[F.col(f'a.{c}') for c in df_STUDENT_TABLE.columns],
F.when(
F.col('b.UNIQUEID').isNotNull() & F.col('c.PUBLICID').isNotNull()
F.lit('PRIVATE')
).alias('PRIVACY')
)
)
此代碼正在添加一個新列“PRIVACY”,但在運行后給出空值。
uj5u.com熱心網友回復:
- 我已經獲取了一些示例資料,當我使用條件應用連接時,我得到的結果如下(要求需要將以下記錄的隱私設定為
PRIVATE)
%sql
select student.*,college.*,department.* from department INNER JOIN college INNER JOIN student
ON college.unique_id = student.professional_id and department.public_id = college.college_id

- 當我使用您的代碼(相同的邏輯)時,我得到了相同的輸出,即在資料框中添加了一個具有所需值的附加列,并且實際
privacy列具有空值。
from pyspark.sql.functions import col,when,lit
df_s = df_s.alias('a').join(df_c.alias('b'), col('a.professional_id') == col('b.unique_id'),'left').join(df_d.alias('c'), col('b.college_id') == col('c.public_id'),'left').select(*[col(f'a.{c}') for c in df_s.columns],when(col('b.unique_id').isNotNull() & col('c.public_id').isNotNull(), 'PRIVATE').otherwise(col('a.privacy')).alias('req_value'))
df_s.show()

- 由于從上面看,
req_value是具有所需值的列,并且這些值需要反映在 中privacy,因此您可以直接使用以下代碼。
final = df_s.withColumn('privacy',col('req_value')).select([column for column in df_s.columns if column!='req_value'])
final.show()

更新:
您還可以使用以下代碼,我使用withColumn而不是選擇更新了列。
df_s = df_s.alias('a').join(df_c.alias('b'), col('a.professional_id') == col('b.unique_id'),'left').join(df_d.alias('c'), col('b.college_id') == col('c.public_id'),'left').withColumn('privacy',when(col('b.unique_id').isNotNull() & col('c.public_id').isNotNull(), 'PRIVATE').otherwise(col('privacy'))).select(*df_s.columns)
#or you can use this as well, without using alias.
#df_s = df_s.join(df_c, df_s['professional_id'] == df_c['unique_id'],'left').join(df_d, df_c['college_id'] == df_d['public_id'],'left').withColumn('privacy',when(df_c['unique_id'].isNotNull() & df_d['public_id'].isNotNull(), 'PRIVATE').otherwise(df_s['privacy'])).select(*df_s.columns)
df_s.show()

uj5u.com熱心網友回復:
加入后,您可以使用nvl2. 它可以檢查與最后一個資料幀 ( df_dept) 的連接是否成功,如果是,則可以回傳“PRIVATE”,否則回傳df_stud.PRIVACY.
輸入:
from pyspark.sql import functions as F
df_stud = spark.createDataFrame([(1, 'x'), (2, 'STAY')], ['PROFESSIONALID', 'PRIVACY'])
df_college = spark.createDataFrame([(1, 1)], ['COLLEGEID', 'UNIQUEID'])
df_dept = spark.createDataFrame([(1,)], ['PUBLICID'])
df_stud.show()
# -------------- -------
# |PROFESSIONALID|PRIVACY|
# -------------- -------
# | 1| x|
# | 2| STAY|
# -------------- -------
腳本:
df = (df_stud.alias('s')
.join(df_college.alias('c'), F.col('s.PROFESSIONALID') == F.col('c.UNIQUEID'), 'left')
.join(df_dept.alias('d'), F.col('c.COLLEGEID') == F.col('d.PUBLICID'), 'left')
.select(
*[f's.`{c}`' for c in df_stud.columns if c != 'PRIVACY'],
F.expr("nvl2(d.PUBLICID, 'PRIVATE', s.PRIVACY) PRIVACY")
)
)
df.show()
# -------------- -------
# |PROFESSIONALID|PRIVACY|
# -------------- -------
# | 1|PRIVATE|
# | 2| STAY|
# -------------- -------
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/522800.html
