我有一個 spring boot 應用程式,我們在消費者部分使用 spring Kafka 我已將 enable.auto.commit 設定為 false 并設定我的 listener ack-mode to manual_immediate
我有并發消費者,所以在消費記錄后我呼叫 acknowledgment.acknowledge() 但是在這里我仍然面臨重復問題問題,每當重新平衡發生時,其他消費者開始消費已經被一個消費者消費的相同訊息。知道幕后發生了什么魔術。
任何人都知道使用 manual_immediate 時會通過 commitSync 或 commitAsync 提交訊息嗎?有沒有辦法我們可以改變行為以避免重復記錄訊息閱讀。有沒有辦法我們可以在 Spring Kafka 中使用混合模型
在 Spring Boot Kafka 中,有一種方法可以讓我們在重新平衡發生時看到我可以記錄它。
如果我們想出于某種測驗目的進行重新平衡,如何創建重新平衡?
uj5u.com熱心網友回復:
只要你acknowledge在監聽執行緒上呼叫,就會commitSync()默認使用;使用syncCommits容器屬性來使用異步提交。
如果您在不同的執行緒上呼叫它,則提交將排隊等待由消費者執行緒盡快處理。
如果發生強制重新平衡,則無法避免重復,因為您的偵聽器花費了太長時間來處理poll().
您可以增加max.poll.interval.ms和/或減少max.poll.records以確保您可以及時處理記錄。
您可以將 a 添加ConsumerRebalanceListener到容器屬性以記錄重新平衡。
減少max.poll.interval.ms到一個較小的值以在測驗中重現。
uj5u.com熱心網友回復:
首先,無論 ack 模式如何,都不能保證一條訊息只被消費一次。例如,在消費訊息和提交時間偏移之間可能會發生重新平衡,導致 Kafka 再次將訊息傳遞給新分配的消費者。對重復訊息具有冪等性是應用程式的責任。
為了監聽重新平衡事件,需要一個ConsumerRebalanceListener的實作。您可以將此實作插入 Spring 的自動配置的ConcurrentKafkaListenerContainerFactory實體。此處已回答了有關如何完成此操作的更詳細說明。
如果您希望為測驗創建強制重新平衡,您可以通過殺死希望超過 1 個現有消費者中的一個來實作。如果使用 spring-kafka,您可以通過使用@Autowired實體KafkaListenerEndpointRegistry并殺死/休息/(重新)啟動任何消費者來做到這一點。對此應該做的事情:
@Autowired
KafkaListenerEndpointRegistry registry;
public void myTest() {
Collection<MessageListenerContainer> containers = registry.getAllListenerContainers()
containers.get(0).stop()
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/409405.html
標籤:
