假設我有一個資料框:
val df = Seq(
(1,"A"),
(1,"B"),
(1,"C"),
(1,"D"),
(1,"E"),
(1,"F"),
(1,"G"),
(1,"H"),
(2,"I"),
(2,"J"),
(2,"J"),
(2,"J"),
(3,"K"),
).toDF("id", "code")
我需要根據 id 和某個閾值對其進行排名。例子:
閾值 = 3
id code rank
1 A 1
1 B 1
1 C 1 -- threshold has been reached
1 D 2
1 E 2
1 F 2 -- threshold has been reached
1 G 3
1 H 3
2 I 1
2 J 1
2 J 1 -- threshold has been reached
2 J 2
3 K 1
我該怎么做?
我可以創建一個簡單的排名:
df.withColumn("rank", dense_rank().over(Window.orderBy("id")))
但是如何按閾值劃分排名組?
uj5u.com熱心網友回復:
不需要將所有資料移動到一個磁區的解決方案:
//get the largest number of equal ids
val maxGroupSize = df.groupBy("id").count().agg(max("count")).first().getLong(0)
val threshold = 3
var f = maxGroupSize
while( f % threshold>0) f=f 1
df.withColumn("tmp1", 'id* f)
.withColumn("tmp2", dense_rank().over(Window.partitionBy("id").orderBy("code"))-1)
.withColumn("tmp3", 'tmp1 'tmp2)
.withColumn("rank", ('tmp3 / threshold).cast("int"))
結果:
--- ---- ---- ---- ---- ----
| id|code|tmp1|tmp2|tmp3|rank|
--- ---- ---- ---- ---- ----
| 1| A| 9| 0| 9| 3|
| 1| B| 9| 1| 10| 3|
| 1| C| 9| 2| 11| 3|
| 1| D| 9| 3| 12| 4|
| 1| E| 9| 4| 13| 4|
| 1| F| 9| 5| 14| 4|
| 1| G| 9| 6| 15| 5|
| 1| H| 9| 7| 16| 5|
| 2| I| 18| 0| 18| 6|
| 2| J| 18| 1| 19| 6|
| 3| K| 27| 0| 27| 9|
--- ---- ---- ---- ---- ----
這種方法的缺點是排名不連續。可以用另一個視窗解決這個問題
df.withColumn("rank2", dense_rank().over(Window.orderBy("rank")))
但這會再次將所有資料移動到單個執行程式。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/512161.html
標籤:斯卡拉阿帕奇火花
