在消費Kafka中磁區的資料時,我們需要跟蹤哪些訊息是讀取過的、哪些是沒有讀取過的,這是讀取訊息不丟失的關鍵所在,
Kafka是通過offset順序讀取事件的,如果一個消費者退出,再重啟的時候,它知道從哪兒繼續讀取訊息進行處理,所以,消費者需要「提交」屬于它們自己的偏移量,如果消費者已經提交了偏移量,但訊息沒有得到有效處理,此時就會造成消費者訊息丟失,所以,我們應該重視偏移量提交的時間點以及提交的方式,
Kafka消費者的可靠性配置
1、group.id
- 如果兩個消費者有相同的 group.id,并且定義同一個主題,那么每個消費者都會消費一個磁區的資料
2、auto.offset.reset
- 這個引數的作用是:當沒有偏移量提交(例如:消費者第一次啟動、或者請求的偏移量在broker上不存在時),消費者會如何處理
- earliest:消費者從磁區的開始位置讀取大量的重復資料,可以保證個最少的資料丟失
- latest:消費者會從磁區的末尾開始讀取資料,可以減少重復讀,但很有可能會錯過一些訊息
3、enable.auto.commit
- 可以設定自動提交偏移量,可以在代碼中手動提交偏移量
- 自動提交,可以讓消費者邏輯更簡單
- 但它無法控制重復處理訊息、或者如果訊息交給另外一個后臺執行緒去處理,自動提交機制可能會在訊息還沒有處理完就提交了偏移量
4、auto.commit.interval.ms
- 通過該引數,可以配置提交的頻率,默認:每5秒鐘提交一次
- 提交的頻率高,也是會增加額外的開銷的
顯示提交偏移量
如果我們希望能夠更有效地控制偏移量提交的時間點,就需要顯示地提交偏移量,
1、總是在處理完事件后再提交偏移量
如果所有的處理都是在輪詢里完成,無需在輪詢之間維護狀態,那么可以使用自動提交,或者在輪詢結束后進行手動提交,
2、提交頻率是性能和重復訊息數量之間的權衡
這個意思是:提交頻率越高,重復訊息處理的數量越少,性能也是比較低的,提交頻率越低,重復訊息處理的數量越多,性能是比較好的,所以,要根據實際的情況,來衡量在什么時機,來提交偏移量,即使是在最簡單的場景你,也需要在一個回圈中多次提交偏移量,
3、確保對提交的偏移量心里有數
一定要在處理完訊息后,再提交偏移量,否則會出現某些訊息會被處理,
4、消費者可能需要重試
但處理訊息出現問題時,例如:把Kafka中的資料寫入到HBase中,此時HBase臨時不可用,我們想要重試,假設這條訊息是:#30,#30處理失敗了,那大家想想?#31能提交嗎?
顯然是不能的,如果#31提交了,那么#31之前的所有資料,都不會被處理了,我們可以使用以下幾種模式來處理:
模式一
① 但遇到可重試錯誤時,提交最后一個處理成功的偏移量
② 把沒有處理好的訊息保存到緩沖區
③ 呼叫 pause() 方法,確保其他的輪詢不會回傳資料
④ 嘗試重新處理快取中的資料,如果重試成功,或者重試次數達到上限并決定放棄,把錯誤記錄下來并丟棄訊息
⑤ 呼叫 resume() 方法讓消費者繼續從輪詢里獲取新資料
模式二
① 遇到可重試錯誤時,把錯誤寫入一個獨立的主題,然后繼續
② 用一個獨立的消費者組負責從該主題上讀取錯誤訊息,并進行重試
5、長時間處理
有時候要進行比較復雜的處理,暫停輪詢的時間不能超過幾秒鐘,要保持輪詢,因為只有在輪詢程序中,才能往broker發送心跳,可以使用一個執行緒池來處理資料,可以讓輪詢不獲取新的資料,直到作業縣好吃呢個處理完成,消費者一直保持輪詢,心跳正常,就不會發生再均衡,
8、僅一次傳遞
有的程式不僅是需要“至少一次”(at least-once語意)(意味著沒有資料丟失),還需要僅一次(exactly-once)語意,實作一次性語意,最常用的辦法就是把結果寫入到一個支持唯一鍵的系統里,比如:k-v存盤、關系資料庫、ES或者其他資料存盤,可以使用主題、磁區和偏移量來作為主鍵,這樣,可以碰巧讀取到同一個相同的訊息,直接覆寫寫入就可以了,這種稱為冪等性寫入,
還有一種,就是使用關系型資料庫,HDFS中一些被定義過的原子操作也經常用來達到相同的目的,把訊息和偏移量放在同一個事務里,這樣讓它們保持同步,消費者啟動,獲取最近處理過的偏移量,呼叫seek()方法從偏移量位置繼續讀取資料
參考檔案:
「Kafka權威指南」
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/32574.html
標籤:大數據
下一篇:01-Flink運行架構
