前言
本文隸屬于專欄《Spark例外問題匯總》,該專欄為筆者原創,參考請注明來源,不足和錯誤之處請在評論區幫忙指出,謝謝!
本專欄目錄結構和參考文獻請見 Spark例外問題匯總
正文
問題描述
Spark 編譯報錯:
Error:(34, 25) overloaded method foreachBatch with alternatives: (function: org.apache.spark.api.java.function.VoidFunction2[org.apache.spark.sql.Dataset[org.apache.spark.sql.Row],java.lang.Long])org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] (function: (org.apache.spark.sql.Dataset[org.apache.spark.sql.Row], scala.Long) => Unit)org.apache.spark.sql.streaming.DataStreamWriter[org.apache.spark.sql.Row] cannot be applied to ((org.apache.spark.sql.DataFrame, scala.Long) => org.apache.spark.sql.DataFrame) askDF.writeStream.foreachBatch { (askDF: DataFrame, batchId: Long) =>
我的代碼如下所示:
val properties = new java.util.Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
val query = wordCounts.writeStream
.outputMode("complete")
.foreachBatch((ds, batchID) => {
println("BatchID:" + batchID)
if(ds.count() != 0){
ds.cache()
ds.write.json(PATH_PREFIX + batchID)
ds.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://node1:3306/spark_bigdata_analyze", "t_word_count", properties)
ds.unpersist()
}
}).start()
query.awaitTermination()
}
問題定位
這是由于Scala 版本由 2.11 升級成 2.12 所致,
由于Scala 2.12中的一些更改,DataStreamWriter.foreachBatch方法需要對代碼進行一些更新,否則就會發生這種模糊性,
可以在此處查看兩種foreachBatch方法:
https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamWriter.html
問題解決
可以改用scala 2.11,或者查看已解決這個問題的鏈接:
https://docs.databricks.com/release-notes/runtime/7.0.html
代碼修改
val properties = new java.util.Properties()
properties.setProperty("user", "root")
properties.setProperty("password", "123456")
val query = wordCounts.writeStream
.outputMode("complete")
.foreachBatch((ds: Dataset[Row], batchId: Long) => myFunc(ds, batchId, properties)).start()
query.awaitTermination()
private def myFunc(ds: Dataset[Row], batchID: Long, properties: java.util.Properties): Unit = {
println("BatchID:" + batchID)
if (ds.count() != 0) {
ds.cache()
ds.write.json(PATH_PREFIX + batchID)
ds.write.mode(SaveMode.Overwrite).jdbc("jdbc:mysql://node1:3306/spark_bigdata_analyze", "t_word_count", properties)
ds.unpersist()
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/337646.html
標籤:其他
