我有以下代碼:
def proccess(spark: SparkSession, df : DataFrame): DataFrame= {
val mydf = df.withColumn("next_intent_temp", when (col("next_intent")=== "val", 1).otherwise(0)
val mynewdf = mydf.groupBy(col("a"), col("b"), col("c")
.agg(sum(col("next_intent_temp")).as("next_intent"))
.select(col("market"), struct(col("next_intent"),...).alias("data")
)
mynewdf
}
運行它給出:
cannot resolve 'next_intent_temp' given input columns [...]; Aggregate [sum('next_intent_temp')...
我真的不明白這個錯誤,因為當我這樣做時,mydf.show()我確實看到 next_intent_temp 列具有正確的值。那么為什么總和失敗并出現此錯誤?
注意:我簡化了問題的代碼,但保留了我真正擁有的結構。
uj5u.com熱心網友回復:
你有.as("next_intent"). 使用該列名。
我認為您有一些語法和缺少 ) 方面。
這是一個更簡單的例子,但如果你遵循它,你應該沒問題。
import spark.implicits._
import org.apache.spark.sql.functions._
val df = spark.sparkContext.parallelize(Seq( (1,7,"ST"), (1,8,"XX"), (1,9,"RW"), (3,10,"ST"), (3,11,"AA"), (3,12,"RW"), (2,3,"ST"), (2,4,"TT"))).toDF("i", "c", "t")
val df2 = df.withColumn("next_intent_temp", lit(1))
val df3 = df2.groupBy(col("i"), col("c"), col("t"))
.agg(sum(col("next_intent_temp")).as("next_intent"))
.select(col("i"), struct(col("next_intent")).alias("data"))
df3.show(false)
轉載請註明出處,本文鏈接:https://www.uj5u.com/qianduan/530960.html
標籤:斯卡拉阿帕奇火花
