我有一個如下的資料框。我想在每個組中分組device和排序。start_time然后,對于組中的每一行,從它之前 3 行的視窗中獲取最頻繁出現的站點(包括它自己)。
columns = ['device', 'start_time', 'station']
data = [("Python", 1, "station_1"), ("Python", 2, "station_2"), ("Python", 3, "station_1"), ("Python", 4, "station_2"), ("Python", 5, "station_2"), ("Python", 6, None)]
test_df = spark.createDataFrame(data).toDF(*columns)
rolling_w = Window.partitionBy('device').orderBy('start_time').rowsBetween(-2, 0)
期望的輸出:
------ ---------- --------- --------------------
|device|start_time| station|rolling_mode_station|
------ ---------- --------- --------------------
|Python| 1|station_1| station_1|
|Python| 2|station_2| station_2|
|Python| 3|station_1| station_1|
|Python| 4|station_2| station_2|
|Python| 5|station_2| station_2|
|Python| 6| null| station_2|
------ ---------- --------- --------------------
由于 Pyspark 沒有mode()函式,我知道如何在靜態中獲取最頻繁的值,如此處groupby所示,但我不知道如何使其適應滾動視窗。
uj5u.com熱心網友回復:
您可以使用collect_list函式使用定義的視窗從最后 3 行獲取站點,然后為每個結果陣列計算最頻繁的元素。
要獲取陣列中最常見的元素,您可以將其分解,然后按照您已經看到的鏈接帖子中的鏈接進行分組并計數,或者使用如下的一些 UDF:
import pyspark.sql.functions as F
test_df.withColumn(
"rolling_mode_station",
F.collect_list("station").over(rolling_w)
).withColumn(
"rolling_mode_station",
F.udf(lambda x: max(set(x), key=x.count))(F.col("rolling_mode_station"))
).show()
# ------ ---------- --------- --------------------
#|device|start_time| station|rolling_mode_station|
# ------ ---------- --------- --------------------
#|Python| 1|station_1| station_1|
#|Python| 2|station_2| station_1|
#|Python| 3|station_1| station_1|
#|Python| 4|station_2| station_2|
#|Python| 5|station_2| station_2|
#|Python| 6| null| station_2|
# ------ ---------- --------- --------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/410204.html
標籤:
