只說結論!
如果我們使用原始apache-kafka 依賴的API來消費資料:
- 如果enable.auto.commit為true,則表示自動提交,但不會在拉取資料之后立即提交,在一次poll的資料處理完畢之后,將會在下一次poll資料的時候,首先檢查是否到達了auto.commit.interval.ms自動提交間隔的時間,如果到達了(默認5s),那么會提交此前拉取的訊息的最大偏移量,否則不會提交,
- 如果enable.auto.commit為false,則表示手動提交,那么需要通過**consumer.commitAsync()或者commitSync()**手動提交偏移量,這兩個方法將會提交目前最大的offset,否則重啟之后將會消費此前的資料,
如果使用spring-kafka 的@Listener注解來消費資料:
-
如果enable.auto.commit為true,則表示自動提交,但不會在拉取資料之后立即提交,在一次poll的資料處理完畢之后,將會在下一次poll資料的時候,首先檢查是否到達了auto.commit.interval.ms自動提交間隔的時間,如果到達了(默認5s),那么會提交此前拉去的訊息的最大偏移量,否則不會提交,
-
如果enable.auto.commit為false,則表示手動提交,此時需要注意選擇提交的模式AckMode,
- BATCH:默認的提交模式,當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后提交,由Spring幫我們提交,
- RECORD:當每一條記錄被消費者監聽器(ListenerConsumer)處理之后提交,由Spring幫我們提交,
- TIME:當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,距離上次提交時間大于TIME時提交,由Spring幫我們提交,
- COUNT:當每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后,被處理record數量大于等于COUNT時提交,由Spring幫我們提交,
- COUNT_TIME:TIME和COUNT有一個條件滿足時提交,由Spring幫我們提交,
- MANUAL:需要對監聽訊息的方法中引入 Acknowledgment引數,并在代碼中呼叫acknowledge()方法進行手動提交,實際上,對于每一批poll()的資料,每次呼叫acknowledge()方法之后僅僅是將offset存放到本地map快取,在下一次poll的時候,在poll新資料之前從快取中拿出來批量提交,也就是說與BATCH有相同的語意,
- MANUAL_IMMEDIATE:需要對監聽訊息的方法中引入 Acknowledgment引數,并在代碼中呼叫acknowledge()方法進行手動提交,實際上,對于每一批poll()的資料,每次呼叫acknowledge()方法之后立即進行偏移量的提交,
由于默認的提交模式是BATCH,因此在使用@Listener注解來消費資料時,即使enable.auto.commit為false,偏移量也會在每一批poll()的資料被消費者監聽器(ListenerConsumer)處理之后提交,這里的提交實際上是手動提交,但是這個“手動提交”操作由Spring幫我們做了,因此如果不設定AckMode為MANUAL或者MANUAL_IMMEDIATE,我們仍然會覺得這些資料被“自動提交”了,實際上是由Spring幫我們執行了手動提交的代碼,造成誤解,
MANUAL和MANUAL_IMMEDIATE的區別是:MANUAL_IMMEDIATE是消費完一個訊息就提交,MANUAL是處理完一批訊息(默認500)之后,在下一次拉取訊息之前批量提交,
如果中間有一批資料沒有提交,那么在一次消費程序中,這些沒有提交的資料不會重復消費,而是會一直向后消費,除非重啟消費者,會被再次消費,如果后面有訊息的offset被提交,那么該offset之前的所有訊息都算作已提交,重啟之后也不會被再次消費,
相關文章:
Kafka
如有需要交流,或者文章有誤,請直接留言,另外希望點贊、收藏、關注,我將不間斷更新各種Java學習博客!
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/385525.html
標籤:其他
上一篇:HBase MemStore GC問題、MSLAB記憶體管理模式、MemStore ChunkPool
下一篇:Hbase基礎
