我在 Scala 的 Spark 資料框中有一列,它是由于使用多列聚合而生成的
agg(collect_list(struct(col(abc), col(aaa)).as(def)
我想將此列傳遞給 UDF 以進行進一步處理,以處理此聚合列中的一個索引。
當我將引數傳遞給我的 UDF 時:
.withColumn(def, remove
(col(xyz), col(def)))
UDF- 型別為 Seq[Row]: val removeUnstableActivations: UserDefinedFunction = udf((xyz: java.util.Date, def: Seq[Row])
我收到錯誤:
Exception encountered when invoking run on a nested suite - Schema for type org.apache.spark.sql.Row is not supported
我應該如何傳遞這些列以及 UDF 中列的資料型別應該是什么?
uj5u.com熱心網友回復:
確實不支持型別 Row 的模式,但您可以回傳一個案例類。Spark 會將回傳的 case 類視為 StructType。例如:
import org.apache.spark.sql.functions._
import spark.implicits._
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.Row
val df = Seq(
(1, "a"),
(2, "b"),
(3, "c")
).toDF("number", "word")
val aggDf = df.agg(
collect_list(struct(col("number"), col("word"))) as "aggColumn"
)
aggDf.printSchema()
// |-- aggColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- number: string (nullable = true)
// | | |-- word: integer (nullable = false)
case class ReturnSchema(word: String, number: Int)
val myUdf: UserDefinedFunction =
udf((collection: Seq[Row]) => {
collection.map(r => {
val word = r.getAs[String]("word")
val newNumber = r.getAs[Int]("number") * 100
new ReturnSchema(word, newNumber)
})
})
val finalDf = aggDf.select(myUdf(col("aggColumn")).as("udfTranformedColumn"))
finalDf.printSchema
// root
// |-- udfTranformedColumn: array (nullable = true)
// | |-- element: struct (containsNull = true)
// | | |-- word: string (nullable = true)
// | | |-- number: integer (nullable = false)
finalDf.show(false)
// ------------------------------
// |udfTranformedColumn |
// ------------------------------
// |[[a, 100], [b, 200], [c, 300]]|
// ------------------------------
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/327807.html
