我有一個巨大的資料框,看起來類似于:
---- ------- ------- -----
|name|level_A|level_B|hours|
---- ------- ------- -----
| Bob| 10| 3| 5|
| Bob| 10| 3| 15|
| Bob| 20| 3| 25|
| Sue| 30| 3| 35|
| Sue| 30| 7| 45|
---- ------- ------- -----
我想要的輸出:
---- -------------------- ------------------
|name| map_level_A| map_level_B|
---- -------------------- ------------------
| Bob|{10 -> 20, 20 -> 25}| {3 -> 45}|
| Sue| {30 -> 80}|{7 -> 45, 3 -> 35}|
---- -------------------- ------------------
意思是 group by name,將 2 個 MapType 列添加到level_A和level_B的總和中hours。
我知道我可以使用 UDF 或連接操作獲得該輸出。
但是,在實際中,資料非常大,而且不是2個map列,而是幾十個,所以join/UDF成本太高了。
有沒有更有效的方法來做到這一點?
uj5u.com熱心網友回復:
您可以考慮使用視窗函式。您需要level_X為由兩者磁區的每個視窗規范name并level_X計算hours. 然后分組name并從結構陣列創建映射:
from pyspark.sql import Window
import pyspark.sql.functions as F
df = spark.createDataFrame([("Bob", 10, 3, 5), ("Bob", 10, 3, 15), ("Bob", 20, 3, 25),
("Sue", 30, 3, 35),("Sue", 30, 7, 45), ],
["name", "level_A", "level_B", "hours"])
wla = Window.partitionBy("name", "level_A")
wlb = Window.partitionBy("name", "level_B")
result = df.withColumn("hours_A", F.sum("hours").over(wla)) \
.withColumn("hours_B", F.sum("hours").over(wlb)) \
.groupBy("name") \
.agg(
F.map_from_entries(
F.collect_set(F.struct(F.col("level_A"), F.col("hours_A")))
).alias("map_level_A"),
F.map_from_entries(
F.collect_set(F.struct(F.col("level_B"), F.col("hours_B")))
).alias("map_level_B")
)
result.show()
# ---- -------------------- ------------------
#|name| map_level_A| map_level_B|
# ---- -------------------- ------------------
#| Sue| {30 -> 80}|{3 -> 35, 7 -> 45}|
#| Bob|{10 -> 20, 20 -> 25}| {3 -> 45}|
# ---- -------------------- ------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/402972.html
標籤:
上一篇:困難連接(SQL)
