我有兩個資料幀,我必須使用一個資料幀的值來過濾使用該值的第二個資料幀。
例如,下面是資料集
import pyspark
from pyspark.sql import Row
cust = spark.createDataFrame([Row(city='hyd',cust_id=100),
Row(city='blr',cust_id=101),
Row(city='chen',cust_id=102),
Row(city='mum',cust_id=103)])
item = spark.createDataFrame([Row(item='fish',geography=['london','a','b','hyd']),
Row(item='chicken',geography=['a','hyd','c']),
Row(item='rice',geography=['a','b','c','blr']),
Row(item='soup',geography=['a','kol','simla']),
Row(item='pav',geography=['a','del']),
Row(item='kachori',geography=['a','guj']),
Row(item='fries',geography=['a','chen']),
Row(item='noodles',geography=['a','mum'])])
cust 資料集輸出:
---- -------
|city|cust_id|
---- -------
| hyd| 100|
| blr| 101|
|chen| 102|
| mum| 103|
---- -------
專案資料集輸出:
------- ------------------
| item| geography|
------- ------------------
| fish|[london, a, b,hyd]|
|chicken| [a, hyd, c]|
| rice| [a, b, c, blr]|
| soup| [a, kol, simla]|
| pav| [a, del]|
|kachori| [a, guj]|
| fries| [a, chen]|
|noodles| [a, mum]|
------- ------------------
我需要使用來自 cust 資料框中的城市列值來從專案資料集中獲取專案。最終輸出應該是:
---- --------------- -------
|city| items|cust_id|
---- --------------- -------
| hyd|[fish, chicken]| 100|
| blr| [rice]| 101|
|chen| [fries]| 102|
| mum| [noodles]| 103|
---- --------------- -------
uj5u.com熱心網友回復:
在join我將explode陣列列之前。然后,collect_list聚合可以將所有專案移動到一個串列中。
from pyspark.sql import functions as F
df = cust.join(item.withColumn('city', F.explode('geography')), 'city', 'left')
df = (df.groupBy('city', 'cust_id')
.agg(F.collect_list('item').alias('items'))
.select('city', 'items', 'cust_id')
)
df.show(truncate=False)
# ---- --------------- -------
#|city|items |cust_id|
# ---- --------------- -------
#|blr |[rice] |101 |
#|chen|[fries] |102 |
#|hyd |[fish, chicken]|100 |
#|mum |[noodles] |103 |
# ---- --------------- -------
uj5u.com熱心網友回復:
new = (
#join the two columns on city
item.withColumn('city',explode(col('geography')))
.join(cust,how='left',on='city')
#drop null rows and unwanted column
.dropna().drop('geography')
#groupby for the outcome
.groupby('city','cust_id').agg(collect_list('item').alias('items'))
)
new.show()
---- --------------- -------
|city| items|cust_id|
---- --------------- -------
| blr| [rice]| 101|
|chen| [fries]| 102|
| hyd|[fish, chicken]| 100|
| mum| [noodles]| 103|
---- --------------- -------
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/459470.html
標籤:数组 阿帕奇火花 加入 pyspark apache-spark-sql
上一篇:回傳前后值較低的行
