我有兩個資料框df_1和df_2:
rdd = spark.sparkContext.parallelize([
(1, '', '5647-0394'),
(2, '', '6748-9384'),
(3, '', '9485-9484')])
df_1 = spark.createDataFrame(rdd, schema=['ID', 'UPDATED_MESSAGE', 'ZIP_CODE'])
# --- --------------- ---------
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# --- --------------- ---------
# | 1| |5647-0394|
# | 2| |6748-9384|
# | 3| |9485-9484|
# --- --------------- ---------
rdd = spark.sparkContext.parallelize([
('JAMES', 'INDIA_WON', '6748-9384')])
df_2 = spark.createDataFrame(rdd, schema=['NAME', 'CODE', 'ADDRESS_CODE'])
# ----- --------- ------------
# | NAME| CODE|ADDRESS_CODE|
# ----- --------- ------------
# |JAMES|INDIA_WON| 6748-9384|
# ----- --------- ------------
我需要df_1使用 df_2 列 'CODE' 中的值 'INDIA_WON' 更新列 'UPDATED MESSAGE'。當前,“UPDATED_MESSAGE”列為 Null。我需要將每一行的值更新為“INDIA_WON”,我們如何在 PySpark 中做到這一點?這里的條件是如果我們在df_1“ZIP_CODE”列中找到“ADDRESS_CODE”值,我們需要填充“UPDATED_MESSAGE”=“INDIA_WON”中的所有值。
uj5u.com熱心網友回復:
下面的 Python 方法df_1在未ZIP_CODE找到匹配項時回傳原始值,或者在使用列中的值填充列時回傳df_2修改后的列:df_1UPDATED_MESSAGEdf_2.CODE
from pyspark.sql.functions import lit
def update_df1(df_1, df_2):
if (df_1.join(df_2, on=(col("ZIP_CODE") == col("ADDRESS_CODE")), how="inner").count() == 0):
return df_1
code = df_2.collect()[0]["CODE"]
return df_1.withColumn("UPDATED_MESSAGE", lit(code))
update_df1(df_1, df_2).show()
--- --------------- ---------
| ID|UPDATED_MESSAGE| ZIP_CODE|
--- --------------- ---------
| 1| INDIA_WON|5647-0394|
| 2| INDIA_WON|6748-9384|
| 3| INDIA_WON|9485-9484|
--- --------------- ---------
uj5u.com熱心網友回復:
我希望我已經很好地解釋了您的需求。如果是,那么您的邏輯似乎很奇怪。看來,你的桌子很小。Spark 是大資料(數百萬到數十億條記錄)的引擎。如果您的表很小,請考慮在 Pandas 中執行操作。
from pyspark.sql import functions as F
df_2 = df_2.groupBy('ADDRESS_CODE').agg(F.first('CODE').alias('CODE'))
df_joined = df_1.join(df_2, df_1.ZIP_CODE == df_2.ADDRESS_CODE, 'left')
df_filtered = df_joined.filter(~F.isnull('ADDRESS_CODE'))
if bool(df_filtered.head(1)):
df_1 = df_1.withColumn('UPDATED_MESSAGE', F.lit(df_filtered.head()['CODE']))
df_1.show()
# --- --------------- ---------
# | ID|UPDATED_MESSAGE| ZIP_CODE|
# --- --------------- ---------
# | 1| INDIA_WON|5647-0394|
# | 2| INDIA_WON|6748-9384|
# | 3| INDIA_WON|9485-9484|
# --- --------------- ---------
uj5u.com熱心網友回復:
我建議在這種情況下使用廣播連接以避免過度洗牌。
下面的代碼和邏輯
new=(df_1.drop('UPDATED_MESSAGE').join(broadcast(df_2.drop('NAME')),how='left', on=df_1.ZIP_CODE==df_2.ADDRESS_CODE)#Drop the null column and join
.drop('ADDRESS_CODE')#Drop column no longer neede
.toDF('ID', 'ZIP_CODE', 'UPDATED_MESSAGE')#rename new df
).show()
uj5u.com熱心網友回復:
當 Spark SQL 如此簡單時,為什么要使用資料幀?
將資料框轉換為臨時視圖。
%python
df_1.createOrReplaceTempView("tmp_zipcodes")
df_2.createOrReplaceTempView("tmp_person")
撰寫簡單的 Spark SQL 以獲得答案。
%sql
select
a.id,
case when b.code is null then '' else b.code end as update_message,
a.zip_code
from tmp_zipcodes as a
left join tmp_person as b
on a.zip_code = b.address_code
查詢的輸出。如果需要寫入磁盤,請使用 spark.sql() 創建資料幀。

轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/527339.html
上一篇:如何在熊貓資料框中提取特定句子?
