我有一個包含“id”列和“publication”列的資料框。“id”列包含重復項,代表研究人員。“出版物”列包含有關研究人員發表的學術著作的一些資訊。
我想轉換此資料框以將出版物收集到一個陣列中,從而減少行數。我可以使用 groupBy 和 collect_list 來做到這一點。這將使“id”列只包含唯一值。
myDataframe
.groupBy("id")
.agg(
collect_list("publication").as("publications")
).select("id", "publications")
但是,出于我的目的,這對于一行來說資料太多了。我想限制收集的出版物數量,并將資料分成多行。
讓我的資料框看起來像這樣,其中 1 的 id 出現在 10 行中:
| id | publication |
| ----| -------------- |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 2 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
| 1 | "foobar" |
我想 groupBy id 并將出版物收集到一個串列中,但將其限制為每組最多 5 個出版物:
| id | publication |
| ----| -------------- |
| 1 | ["foobar",...] |
| 1 | ["foobar",...] |
| 2 | ["foobar"] |
我將如何在 spark scala 中實作這一點?
uj5u.com熱心網友回復:
如果您想要每行固定數量的出版物,您必須首先計算每個研究人員每個出版物的中間桶數。您可以通過出版物排名的整數除法/5(或每個串列所需的出版物數量)來確定存盤桶編號。然后,您可以按 id 和存盤桶編號進行分組。這是我運行的一個示例spark-shell:
val testDF = Seq(
(1, "pub1"),
(1, "pub2"),
(1, "pub3"),
(1, "pub4"),
(1, "pub5"),
(1, "pub6"),
(1, "pub7"),
(1, "pub8"),
(2, "pub9"),
(2, "pub10"),
(2, "pub11"),
(2, "pub12"),
(2, "pub13")).toDF("id", "publication")
testDF.withColumn("rn", row_number().over(Window.partitionBy("id").orderBy("id")) - 1)
.withColumn("bucket", floor(col("rn") / 5))
.groupBy("id", "bucket").agg(collect_list("publication").as("publications"))
.select("id", "publications")
.show(false)
輸出:
--- ----------------------------------
|id |publications |
--- ----------------------------------
|1 |[pub1, pub2, pub3, pub4, pub5] |
|1 |[pub6, pub7, pub8] |
|2 |[pub9, pub10, pub11, pub12, pub13]|
--- ----------------------------------
uj5u.com熱心網友回復:
在您的 df 中添加 row_number() 列,通過與您的組具有相同鍵的視窗
.withColumn('col', row_number().over(Window.partitionBy('id'))
使用此行 num 模 5 或除以 5 并截斷為整數創建新 id
然后對此進行分組
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/480338.html
