我有一個帶有架構的資料集
root
|-- id: long (nullable = true)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = true)
假設 dist 是[(A, 10), (B, 5), (C, 3)],我想添加一個[10/18, 5/18, 3/18]對應于數字除以總和的列。
我怎樣才能做到這一點?謝謝你們。
uj5u.com熱心網友回復:
假設您從類似(更新為處理Option[Map[String, Long]])的資料集開始:
import org.apache.spark.sql.Dataset
case class Model(id: Long, dist: Option[Map[String, Long]])
val ds: Dataset[Model] = List(
Model(1, Some(Map("a" -> 10, "b" -> 5, "c" -> 3))),
Model(2, Some(Map("x" -> 100, "y" -> 50, "z" -> 30))),
Model(3, None)
).toDS()
和你的新列percentage,這將需要一個String列來顯示{x}/{y}:
火花 >= 3.0.0
import org.apache.spark.sql.functions.{aggregate, col, format_string, lit, map_values, transform}
import org.apache.spark.sql.Column
import org.apache.spark.sql.types.LongType
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", aggregate(col("values"), lit(0).cast(LongType), (acc, x) => acc x))
// format column with total
.withColumn("percentage", transform(col("values"), (c: Column) => format_string("%s/%s", c, col("total"))))
.drop("total", "values")
.show(false)
給出:
--- ---------------------------- -------------------------
|id |dist |percentage |
--- ---------------------------- -------------------------
|1 |{a -> 10, b -> 5, c -> 3} |[10/18, 5/18, 3/18] |
|2 |{x -> 100, y -> 50, z -> 30}|[100/180, 50/180, 30/180]|
|3 |null |null |
--- ---------------------------- -------------------------
其架構為:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: array (nullable = true)
| |-- element: string (containsNull = false)
Spark < 3.0.0(使用 Spark 2.4.5 測驗)
import org.apache.spark.sql.functions.{expr, col, map_values}
import org.apache.spark.sql.catalyst.expressions.aggregate._
val toDivisionUDF = udf(
(values: Seq[String], total: Long) =>
"[" values.map(v => s"$v/$total").mkString(", ") "]"
)
ds
// take just the values of the map `dist`
.withColumn("values", map_values(col("dist")))
// calculate the total per row
.withColumn("total", expr("aggregate(values, 0D, (acc, v) -> acc v)"))
// format column with total
.withColumn("percentage", toDivisionUDF(col("values"), col("total")))
.drop("total", "values")
.show(false)
給出:
--- ---------------------------- -------------------------
|id |dist |percentage |
--- ---------------------------- -------------------------
|1 |[a -> 10, b -> 5, c -> 3] |[10/18, 5/18, 3/18] |
|2 |[x -> 100, y -> 50, z -> 30]|[100/180, 50/180, 30/180]|
|3 |null |null |
--- ---------------------------- -------------------------
其架構為:
root
|-- id: long (nullable = false)
|-- dist: map (nullable = true)
| |-- key: string
| |-- value: long (valueContainsNull = false)
|-- percentage: string (nullable = true)
請注意,在OptionSpark 處理 Option 時,包含根本沒有改變代碼,將 Option 內的欄位視為可為空的。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/455001.html
