我有一個如下的表格:
| id | item |
| -- | ------------------------------------ |
| 1 | {order_id: 1, item_id: 1, 價格: 10}。|
| 2 | {order_id: 1, item_id: 2, price: 11}。|
| 3 | {order_id: 2, item_id: 3,價格: 12} |
| 4 | {order_id: 2, item_id: 4,價格: 13}。|
我需要將表中的行聚合成以下內容:
| order_id | order ||
| -------- | ------------------------------------------------------------------------ |
| 1 | {order_id: 1, items: [{item_id: 1, price: 10}, {item_id: 2, price: 11}] } |
| 2 | {order_id: 2, items: {fn黑體fs22bord1shad03aHBE4aH00fscx67fscy662cHFFFFFF3cH808080} {fn黑體fs22bord1shad03aHBE4aH00fscx67fscy662cHFFFFFF3cH808080} {fn黑體fs22bord1shad03aHBE4aH00fscx67fscy662cHFFFFFF3cH808080} {fn黑體fs22bord1shad03aHBE4aH00fscx67fscy662cHFFFFFF3cH808080} |
最初我認為UDAF可以做到這一點,但是當我實作一個聚合器的UDAF函式時,我不確定在合并方法中回傳什么,因為如果訂單id不同,它們就不能被合并了。
uj5u.com熱心網友回復:
從Spark 1.6和更高版本開始,你不需要UDAF,你可以使用內置的SQL函式collect_list
如果你的表模式如下:
root
|-- id: integer (nullable = false)
|-- item: struct (nullable = true)
|-- order_id: 整數 (nullable = true)
|-- item_id: 整數 (nullable = true)
|-- 價格: double (nullable = true)
在dataframe中加載你的表之后,你的代碼應該是(用scala語言):
import org.apache.spark.sql.function.{collect_list, struct}
資料框架
.groupBy("item.order_id")
.agg(collect_list(struct("item.item_id", "item.price")).as(" items")
.withColumn("order", struct("order_id", " items")
.drop("items")
uj5u.com熱心網友回復:
假設以下模型:
case class Order(order_id。Int, items: Seq[Item])
case class Item(item_id。Int, price: Double)。
case class Line(item: Item)
使用groupBy將行按item.order_id分組,然后收集專案:
import sparkSession. implicits._
df.groupBy($"item.order_id"/span>)
.as[Int, Line]
.mapGroups { case (order_id, lines) =>
(order_id, Order(order_id, lines.toSeq.map(line => Item(line.item.itid, line.item.price)))))
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/328342.html
標籤:
下一篇:無法在pyspark中實體化com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem。
