我有一個要求。我們有一個 Spring Boot kafka 消費者應用程式,它正在讀取 kafka 主題。我們的要求是,每當應用程式出現故障并出現時,我都希望從最新的偏移量開始,而不是被舊值所困擾。是否有可能重置組的偏移量?我進行了一些研究,并使用 AbstractConsumerSeekAware 使用seekToEnd() 將偏移量設定到末尾,如下面的代碼所示。
public class KafkaConsumer extends AbstractConsumerSeekAware {
@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@KafkaListener(topics = "${topic.consumer}")
public void consume(ConsumerRecord<?, ?> consumerRecord, Acknowledgment ack) {
//consuming the message from consumer Record.
seekToEnd();
doSomething();
ack.acknowledge();
}
然而,當我們停止應用程式并重新啟動它時,它從它離開的最后一個偏移量開始讀取,但我們只想從應用程式啟動時的偏移量讀取。我們怎樣才能做到這一點?
uj5u.com熱心網友回復:
在@KafkaListener方法中這樣做是不正確的。只有當消費者從磁區傳遞記錄時才真正呼叫這個。
onPartitionsAssigned()出于這個原因,您必須實作,因此消費者將在開始輪詢之前尋找這些磁區。
在檔案中查看更多資訊:https : //docs.spring.io/spring-kafka/docs/current/reference/html/#seek
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/331719.html
上一篇:我正在嘗試使用Mockito、JUnit5和SpringBoot對服務層進行單元測驗,但我得到NullPointerException
