我有一個這樣的spark資料框架:
scala> val data = Seq( (1, "k1", "measureA", 2)。) (1, "k1", "measureA", 4),(1,"k1"。"measureB", 5),(1, "k1", "measureB", 7),(1,"k1"。"measureC", 7),(1, "k1", "measureC", 1)。) (2, "k1"/span>, "measureB", 8),(2,"k1"。"measureC", 9),(2, "k2", "measureA", 5),(2, "k2", "measureC", 5),(2, "k2", "measureC", 8>)
資料。Seq[(Int, String, String, Int)] = List((1,k1,measureA, 2), (1,k1,measureA,4), (1, k1,措施B,5), (1,k1,措施B,7)。(1,k1,measureC,7), (1, k1, measureC, 1), (2,k1,measureB,8), (2, k1, measureC,9), (2,k2,measureA,5), (2, k2,measureC,5), (2, k2,measureC,8)
scala> val rdd = spark.sparkContext.parallelize(data)
rdd: org.apache.spark.rdd. RDD[(Int, String, String, Int)] = ParallelCollectionRDD[22] at parallelize at < console>。 27
scala> val df = rdd.toDF("ts","key","measurement_type","value")
df: org.apache.spark.sql.DataFrame = [ts: int, key: string ... 2 more fields]
scala> df.show
--- --- ------------ -----
| ts|key|measure_type|value||。
--- --- ------------ -----
| 1| k1| measureA| 2|
1| k1| measureA| 4|
1| k1| measureB| 5|
1| k1| measureB| 7|
1| k1| measureC| 7|
1| k1| measureC| 1|
2| k1| measureB| 8|
2| k1| measureC| 9|
2| k2| measureA| 5|
2| k2| measureC| 5|
2| k2| measureC| 8|
--- --- ------------ -----
我想對measure_type進行透視,并根據measure_type對該值應用不同的聚合型別:
然后,得到下面的輸出資料幀:
-- -- -------- -------- --------
| 解釋為:"我的意思是說,如果你想知道我的名字,就請你告訴我你的名字。
--- --- -------- -------- --------
| 1| k1| 6| 6| 7|
| 2| k1| null| 8| 9|
2| k2| 5| null| 8|
--- --- -------- -------- --------
非常感謝。
uj5u.com熱心網友回復:
val ddf = df.groupBy("ts", "key").agg(
sum(when(col("measure_type") === "measureA",col("value")).as("measureA") 。)
avg(when(col("measure_type") === "measureB",col("value")) .as("measureB") 。)
max(當(col("measure_type") === "measureC",col("value")) .as("measureC")
而結果是
scala> ddf.show(false)
--- --- -------- -------- --------
|ts |key|measureA|measureB|measureC|
--- --- -------- -------- --------
|2 |k2 |5 |null |8 |
|2 |k1 |null |8.0 |9 !
|1 |k1 |6 |6.0 |7 !
--- --- -------- -------- --------
uj5u.com熱心網友回復:
我認為用傳統的pivot函式來做是很乏味的,因為它只會將你限制在一個特定的聚合函式中。
以下是我要做的:映射一個我需要執行的預定義聚合函式串列,并將它們應用于我的資料框架,為每個聚合函式提供3個額外的列,然后像你提到的那樣為measure_type創建另一列,然后放棄我在前一步創建的3個列
。import org.apache.spark.sql.function._
import org.apache.spark.sql.Column。
import spark.implicits._
val df = Seq((1, "k1", "measureA", 2)。) (1, "k1", "measureA", 4),(1,"k1"。"measureB", 5),(1, "k1", "measureB", 7),(1,"k1"。"measureC", 7),(1, "k1", "measureC", 1)。) (2, "k1"/span>, "measureB", 8),(2,"k1"。"measureC", 9),(2, "k2", "measureA", 5),(2, "k2", "measureC", 5),(2, "k2", "measureC", 8))。) toDF("ts","key","measure_type","value")
val的映射。Map[String, Column => Column] =Map(
"sum" -> sum, "avg" -> avg, "max" -> max)
val groupBy = Seq("ts"/span>,"key"/span>,"measure_type")
val aggregate = Seq("value")
val operations = Seq("sum"/span>, "avg"/span>, "max"/span>)
val exprs = aggregate.flatMap(c => operations .map(f => mapping(f)(col(c)))))
val df2 = df.groupBy(groupBy.map(col) 。_*).agg(exprs.head, exprs.tail: _*)
val df3 = df2.withColumn("new_column"/span>,
when($"measure_type" === "measureA", $"sum(value)")
.when($"measure_type" === "measureB", $"avg(值)")
.否則($"max(value)"))
.drop("sum(value)")
.drop("avg(value)")
.drop("max(value)")
df3是你需要的資料框架。
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/328151.html
標籤:
上一篇:AWS-子域不支持HSTS
下一篇:DivHTML元素隱藏在影像后面
