我有一個 pyspark 資料框,如下所示:
columns = ["id","values"]
data = [("sample1", ["a","b","a"]), ("sample2", ["b","b","a","c"])]
dataframe = spark.sparkContext.parallelize(data)
來源
------- --------------------
| id| values|
------- --------------------
|sample1| ["a","b","a"]|
|sample2| ["b","b","a","c"]|
------- --------------------
我想用陣列中最常見的值構建一個列,并獲得如下所示的資料框:
------- -------------------- ---------
| id| values| common|
------- -------------------- ---------
|sample1| ["a","b","a"]| "a"|
|sample2| ["b","b","a","c"]| "b"|
------- -------------------- ---------
uj5u.com熱心網友回復:
您可以分解該values組的陣列以計算每個值的出現次數,并使用 Window 使用 max count 過濾該值:
from pyspark.sql import Window
import pyspark.sql.functions as F
df1 = df.withColumn(
"common",
F.explode("values")
).groupBy("id", "values", "common").count().withColumn(
"rn",
F.row_number().over(Window.partitionBy("id", "values").orderBy(F.col("count").desc()))
).filter("rn = 1").drop("rn", "count")
df1.show()
# ------- ------------ ------
#|id |values |common|
# ------- ------------ ------
#|sample1|[a, b, a] |a |
#|sample2|[b, b, a, c]|b |
# ------- ------------ ------
另一種不使用explode的方法是使用高階函式transform和filter一些陣列函式:
df1 = df.withColumn(
"common",
F.array_max(
F.expr("""transform(
array_distinct(values),
x -> struct(
size(filter(values, y -> y = x)) as count,
x as value
)
)""")
)["value"]
)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/364799.html
標籤:Python 阿帕奇火花 火花 apache-spark-sql
上一篇:PySpark:迭代資料幀串列
下一篇:在PYSPARK中將行轉換為列
