官方的東西抽象到技術層面,跟具體的業務有點脫節,我們需要下沉封裝,而不是削足適履,
引言
Doris用多了,把一些坑都免疫了,遇到就知道不該跳,就像spark/flink的算子調優一樣,還用后期調優嗎?不應該在寫的時候,就肌肉記憶的使用reduceByKey來代替groupByKey嗎?
與其叫“Doris同步多庫多表”不如叫“Doris同步binlog踩坑指南”,基于當前大眾化的實時架構,來將業務庫的資料同步到Doris,做到資料的一致性,后期也希望palo團隊做一下庫表的過濾,
這里我說的是業務庫,業務庫,業務庫,一般來說業務庫格式更加標準和統一,并且不存在洗掉操作,也不會用mysql來存太大的欄位,利用這幾點特性來實作資料的統一,我們目前有二十幾個mysql實體,3000多張表,只需要維護二十幾個flink任務即可,不懂doris原理的同學照樣可以進行資料同步,
routineLoad不是個好選擇
有一些文章推薦使用RoutineLoad來做業務庫的資料同步,這是個壞的選擇
- routineLoad依靠的kafka的topic,并且是基于表緯度的資料同步
如果你的上游用的maxwell,那么你需要讀kafka,過濾庫表,再寫入kafka,最后用routineLoad去接,增加了一步資料etl,并且將鏈路又加長了,增加了業務不穩定性;
如果你用的canal來同步binlog,那么你要對每一個表創建一條鏈路,會導致canal壓力增大,cup飆升,影響整個實時鏈路 - 使用復雜
撰寫任務時,要指明欄位和過濾格式,不好同一管理,當上游表結構發生變更時,必須進行重寫任務,而不是重啟,有點蠢 - 管理缺陷,有多少表就有多少routineLoad任務
Doris沒有提供可視化界面,幾千個呢,即使有問題出現了也不知道是那個出問題,集群出問題,所有任務都要重啟,重啟也不能指定時間重啟,出問題的時候就知道有多蠢了, - routineLoad的本質是streamLoad
官方檔案中說的很明白,所有千萬不要說routineLoad是實時同步,是微批同步而已,要額外指定batch_rows和batch_interval的,與其把壓力給BE,不如給flink做這個etl - routineLoad同步業務庫資料時要注意的幾個點
1)不要指定error_row,既然是業務庫資料,就要保證百分百的正確,有容錯條數是幾個意思?
2)kafka上游要嚴格指定messageKey,建議是(庫名+表名+主鍵),這樣才能保證同一個主鍵下的資料永遠在同一個partition中,不會出現亂序的問題,
3)下游表設計時,varchar的長度盡量大點,Doris不允許超長度,mysql卻可以,如果上游表欄位長度太長,下游一定報錯,
4)下游表設計時,default欄位為null,既然是binlog的資料,就不會有缺少欄位,就不要硬給數值,設定成null的好處還有就是streamLoad同步json時,表結構發生變更不會報錯
上邊這幾條是通用的
準備
- topic中的資料是以庫為單位,或者是實體為單位
- binlog寫入kafka要有嚴格的messageKey
- 穩定的kafka和flink集群
開發
可以提前看一下我的這篇博客Flink寫入Doris的實時應用
確定我們的引數
- batch_rows和batch_interval:批次提交行數和時間間隔
- 重啟方式:時間、offset,既然是業務庫,不怕重復消費kafka,建議是時間,比如2h前或者2021/04/26/18(精確到小時就行)
- 指定topic:讀那個topic,寫入那些表可以做映射,也可以讀Doris上對應的庫表名
- 提交curl的執行緒數:既然是streamLoad,那肯定是基于表的,同時允許幾個curl提交,單庫不得超過100個執行緒
決議conf
傳參也好,讀xml也好,讀yaml也好,根據自己的喜好來,
source
想維護offset,可以看我的文章Flink手動維護kafka的offset
關鍵是用時間戳來讀取kafka資料
etl
傳統的步驟,過濾出binlog里的(data、database、table)就夠了,重新組合成一個結構,準備給sink使用
sink
切記:streamLoad 不要帶 -H “max_filter_ratio:0.01”,業務庫資料不允許丟資料
- 根據topic獲取要寫入的表,存到一個set中,比如我的topic是以庫為單位的,那么doris的庫和mysql中的也是對應的,用jdbc去讀一下這個庫里有那些表就可以了;或者是我將要同步的表維護到redis,間隔一段時間去redis里同步一下要寫入的表,
- 定義一個快取資料的list來積累資料,并開始計數和計時
- 到達batch_rows和batch_interval閾值時進行提交
- 創建一個HashMap[String, ArrayList[String]],對set和list進行過濾,即put({databses}.{tablename}, ArrayList[{binlog中的data}])
- 遍歷HashMap,多執行緒寫入doris
- 重置batch_rows、batch_interval、list、HashMap
第5步的代碼(簡化版),結合Flink寫入Doris的實時應用去看會更清晰,是我略的部分,CurlCallableThread是個執行curl的多執行緒,
curlThread是我之前提到的一個引數:“提交curl的執行緒數”,另外對任務回傳結果的一個報錯,
var isStop = false
var lastRes = ""
if(insertDataMap.nonEmpty){
val latch = new CountDownLatch(1)
val pool = Executors.newFixedThreadPool(curlThread)
val reslist = new java.util.ArrayList[(Future[String], String)]()
try{
for(elem <- insertDataMap){
val data = elem._3.mkString("[",",\n","]")//組合成jsonArray
val database = elem._1 //得到database
val table = elem._2 //得到table
val path = s"/tmp/flink_doris/$topic/$getCurrentThreadId/$database.$table"
val c1 = new CurlCallableThread(data, path, database, table, latch)
val f1 = pool.submit(c1)
reslist.add((f1, table))
}
latch.await()
for (i <- 0 until reslist.length){
val res = reslist(i)._1.get().toString
if(!res.startsWith("accessed")){
Logger.warn(res)
lastRes = res
if(res.contains("ErrorURL") || res.contains("unknown table")){
//列印錯誤
Logger.error(content)
Logger.error(lastRes)
//洗掉
val table = reslist(i)._2
tableErr(table)
}else{
isStop = true
}
}
}
}catch {
case e: InterruptedException => e.printStackTrace()
} finally {
Logger.warn(s"${timestampToDate(System.currentTimeMillis())} upsert $topic data $listLength t")
pool.shutdown()
insertDataMap.clear()
if(isStop){
val content = s"topic $topic stop in "+ timestampToDate(System.currentTimeMillis())
//列印錯誤
Logger.error(content)
Logger.error(lastRes)
//停止
println(1/0)
}
}
自己做好報錯監控,郵件、釘釘、短信等等
使用
存量資料補完(后期會出一篇如何快速補存量資料的文章),topic準備好就可以開始啟動了
指定batch_rows和batch_interval,topic,重啟方式,執行緒數就可以了,
當業務表欄位發生變更,提前到Doris執行alter命令就可以了,欄位default設定為null就不會報錯
新建表時,在Doris提前建好,重啟識別一下要寫入的表就行了
如果沒有及時的進行變更,那也沒事,把重啟方式的時間往前推到變更業務表之前,重新補資料即可,
如果發現報欄位長度問題,超了65533,結合業務看看是不是可以刪掉這個欄位,不影響資料同步,
根據業務庫的重要程度來調整batch_interval,建議10s以上,準實時即可
感謝百度同學熱心幫助和senc老師的指導,以及數倉同學的case
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/281210.html
標籤:其他
下一篇:業務資料調研及ETL
