專案背景:
兩個Kafka消費者群組,消費同樣的主題,一個消費者群組消費資料后,發給mqtt服務,供其他應用接收;另一個消費者群組消費資料后,存入mysql資料庫,
問題描述:
提示:這里描述專案中遇到的問題:
在專案中使用Kafka消費者消費資料,并配置了磁區再均衡監聽器,在日志檔案中發現再均衡監聽器頻繁的輸出日志,即頻繁發生磁區再均衡,而專案已經跑了很久,沒有新的消費者加入消費者群組,為何還要頻繁進行磁區再均衡呢?
經過分析日志還發現,當發生再均衡時會輸出如下資訊:
Attempt to heartbeat failed since group is rebalancing
而且,偶爾還報如下錯誤:
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
我們知道,頻繁的進行磁區再均衡極易偏移量提交不準確,造成資料重復消費等問題,那為何會發生如此現象呢?
原因分析:
提示:這里填寫問題的分析:
經過進一步分析發現,輸出上面兩種日志資訊的都是進行資料庫入庫的那個消費者群組,而實時轉發到mqtt的那個消費者群組沒有報這種錯誤,
而且從第二個報錯資訊來看,提示說兩次poll的時間間隔超過了max.poll.interval.ms引數閾值,說明在一次poll中,處理資料的時間太長了,建議調大session timeout引數或者調小max.poll.records引數,
而第一個報錯資訊是說正在發生磁區再均衡,心跳檢測失敗,
由此可以推測,是存入資料庫的方法太占用時間了(一次poll會insert大量資料),導致了頻繁發生了再均衡,我們看mysql服務器的磁盤利用率,發現達到了100%,所以insert效率慢,造成了這個問題
解決方案:
提示:這里填寫該問題的具體解決方案:
重新換了一個mysql服務器,再運行專案,問題解決,
知識積累:
這里了解一下Kafka消費者的心跳機制,以及Kafka剔除消費者的機制:
首先先要了解kafkaconsumer三個引數的區別:
max.poll.interval.ms、session.timeout.ms、heartbeat.interval.ms,
max.poll.interval.ms:當使用消費者群組時,一個消費者兩次呼叫poll方法的最大延遲,如果超過了這個時間呼叫poll,則將會發生磁區再均衡,
session.timeout.ms:配置消費者做大間隔發送心跳的時間閾值,如果超過這個閾值沒發送心跳,則認為這個消費者掛了,會發生磁區再均衡,group coordinator檢測consumer發生崩潰所需的時間,
heartbeat.interval.ms: 消費者間隔多久向group coordinator發送一次心跳,消費者會周期性以這個值的引數進行心跳發送,
session.timeout.ms和heartbeat.interval.ms的區別是:
heartbeat.interval.ms設定消費者多久發送一次心跳,但是可能因為網路抖動等等原因,會偶爾的超過這個時間發送心跳或者少發送一次心跳,但是只要沒超過session.timeout.ms的值發送心跳,那么就認為這個消費者是存活的,
特別注意的是,在早期版本,kafka的心跳機制和poll方法系結的,也就是每poll一次,發送一次心跳,這種機制的缺陷就是當poll處理時間過久時,kafka認為該消費者掛掉了,直接移除出去了,所以在kafka1.0版本后,心跳和poll分成了兩個執行緒:一個是heartbeat 執行緒,另一個是processing執行緒,processing執行緒可理解為呼叫consumer.poll方法執行訊息處理邏輯的執行緒,而heartbeat執行緒是一個后臺執行緒,對程式員是"隱藏不見"的,如果訊息處理邏輯很復雜,比如說需要處理5min,那么 max.poll.interval.ms可設定成比5min大一點的值,而heartbeat 執行緒則和上面提到的引數 heartbeat.interval.ms有關,heartbeat執行緒 每隔heartbeat.interval.ms向coordinator發送一個心跳包,證明自己還活著,只要 heartbeat執行緒 在 session.timeout.ms 時間內 向 coordinator發送過心跳包,那么 group coordinator就認為當前的kafka consumer是活著的,
我們再分析上面的問題場景,兩次poll間隔時間過長,超過了max.poll.interval.ms,所以導致了頻繁的磁區再均衡,而消費者的心跳是正常發的,所以每次再均衡,不會把處理慢的消費者移除出去,還會將其加入佇列,以此頻繁反復磁區再均衡,其實質其實是mysql慢造成的磁區再均衡,
參考文章:Kafka session.timeout.ms heartbeat.interval.ms引數的區別以及對資料存盤的一些思考
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/348320.html
標籤:其他
上一篇:Hive 資料遷移與備份
下一篇:java web第一次作業
