import org.apache.spark.sql.functions.lit
val containsString = (haystack:String, needle:String) =>{
if (haystack.contains(needle)){
1
}
else{
0
}
}
val containsStringUDF = udf(containsString _)
val new_df = df.withColumn("nameContainsxyz", containsStringUDF($"name"),lit("xyz")))
我是 Spark Scala 的新手。上面的代碼似乎編譯成功。但是,當我嘗試運行時
new_df.groupBy("nameContainsxyz").sum().show()
錯誤拋出。有人可以幫我嗎?錯誤訊息如下。
Caused by: org.apache.spark.SparkException: Failed to execute user defined function($anonfun$1: (string, string) => int)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.agg_doAggregateWithKeys_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage6.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$15$$anon$2.hasNext(WholeStageCodegenExec.scala:655)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:55)
at org.apache.spark.scheduler.Task.run(Task.scala:121)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1405)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
... 3 more
Caused by: java.lang.NullPointerException
at $anonfun$1.apply(<console>:41)
at $anonfun$1.apply(<console>:40)
... 15 more
只是一個更新:錯誤拋出,因為指定列中的某些行為空。在 UDF 中添加空檢查完全解決了這個問題。
謝謝
uj5u.com熱心網友回復:
如果我理解您要正確執行的操作,您想知道列中有多少行 'xyz' 出現name?
您可以在不使用 UDF 的情況下做到這一點:
df.filter('name.contains("xyz")). count
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/350949.html
上一篇:今天面了個騰訊程式員,基礎扎實,當場給30K的offer
下一篇:洗掉DBFS中的檔案夾
