如有更佳的保存MySQL方法 歡迎私信或留言分享 相互學習~
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction
// 自定義Sink
dataStream.addSink( new JDBCSink() )
// 繼承RichSinkFunction
class JDBCSink extends RichSinkFunction[輸入的資料型別]{
// 定義sql連接、插入預編譯器、更新預編譯器
var conn: Connection = _
var insertStatement: PreparedStatement = _
var updateStatement: PreparedStatement = _
// 重寫open函式 在此函式初始化,創建連接和預編譯陳述句
override def open(parameters: Configuration): Unit = {
// 初始化連接
conn = DriverManager.getConnection("jdbc:mysql://localhost:3306/資料庫", "賬號", "密碼")
// 初始化插入預編譯器
insertStatement = conn.prepareStatement("INSERT INTO 表名 VALUES (?, ?, ?(占位符));")
// 初始化更新與編譯器
updateStatement = conn.prepareStatement("UPDATE 表名 SET 欄位 = ? WHERE 欄位 = ?;")
}
// 重寫close函式 關閉與編譯器和sql連接
override def close(): Unit = {
insertStatement.close()
updateStatement.close()
conn.close()
}
// 重寫invoke函式
override def invoke(value: 輸入的資料型別, context: SinkFunction.Context[_]): Unit = {
// 先執行更新操作 給跟更新預編譯器的占位符賦值
updateStatement.setInt(1, value.count.toInt)
updateStatement.setString(2, value.url)
updateStatement.setDouble(3, value.windowEnd)
// 執行更新
updateStatement.execute()
// 判斷如果更新的行數為0 則執行插入
if(updateStatement.getUpdateCount == 0){
// 給插入預編譯器的占位符賦值
insertStatement.setDouble(1, value.windowEnd)
insertStatement.setString(2, value.url)
insertStatement.setInt(3, value.count.toInt)
// 執行插入
insertStatement.execute()
}
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/396135.html
標籤:其他
上一篇:Spark SQL functions.scala 原始碼決議(六)Misc functions (基于 Spark 3.3.0)
