我正在研究錯誤處理實作并有一個問題。
讓我解釋一下問題:
我收到一批訊息,我在 for 回圈中對每個訊息進行資料庫查找,然后我需要收集串列中所有查找的物件并呼叫批量插入存盤程序和批量更新存盤程序使用此物件串列。
現在讓我們假設在查找程序中發生了一些例外。我想重試這條訊息。對于這種情況,我嘗試使用DefaultErrorHandler. 但是有一個問題,根據檔案,當拋出帶有元素索引的 BatchListenerFailedException 時,它會提交索引之前記錄的偏移量。但正如我所說,我需要在查找后執行批量插入和更新,所以我不想在索引之前提交偏移量,這些記錄還沒有插入/更新到資料庫中。
這是否意味著我唯一的選擇是RetryingBatchErrorHandler每次都重試整個批次?我可以以某種方式繼續處理不產生錯誤的訊息嗎?
另外,如果RetryingBatchErrorHandler是唯一的選擇,我怎么能確定在較長的退避期(指數退避)的情況下,kafka 不會殺死我的消費者并且不會啟動重新平衡?
我目前的實作:
RetryingBatchErrorHandler retryingBatchErrorHandler =
new RetryingBatchErrorHandler(backoff,
(consumerRecord, e) ->
log.error("Backoff attempts exhausted for the record with offset={}, partition={}, value={}, offset committed.",
consumerRecord.offset(), consumerRecord.partition(), consumerRecord.value()));
factory.setBatchErrorHandler(retryingBatchErrorHandler);
更新:請參閱 Artem 答案中的評論。
這就是如何將查找步驟包裝成retryTemplate
LookedUpRequest lookedUpRequest = retryTemplate.execute(ctx -> {
//Lookup step
return lookup.process(request);
});
如果它失敗了,那么它將進一步拋出例外給批處理錯誤處理程式,在RetryingBatchErrorHandler該處理程式中根據其策略重試批處理
uj5u.com熱心網友回復:
查看它的 JavaDocs:
/**
* A batch error handler that invokes the listener according to the supplied
* {@link BackOff}. The consumer is paused/polled/resumed before each retry in order to
* avoid a rebalance. If/when retries are exhausted, the provided
* {@link ConsumerRecordRecoverer} is invoked for each record in the batch. If the
* recoverer throws an exception, or the thread is interrupted while sleeping, seeks are
* performed so that the batch will be redelivered on the next poll.
*
* @author Gary Russell
* @since 2.3.7
*
*/
public class RetryingBatchErrorHandler extends KafkaExceptionLogLevelAware
implements ListenerInvokingBatchErrorHandler {
因此,沒有重新平衡,因為消費者在兩者之間暫停。
您可能需要考慮為您的查找部分快取一些東西,因此您不會在失敗之前對那些您已經請求的記錄施加壓力。
您也可以考慮自己重試該查找。見RetryTemplate。但在這種情況下,您需要確保整個操作的時間不夠長(請參閱 參考資料max.poll.interval.ms)以使消費者離開其組。
轉載請註明出處,本文鏈接:https://www.uj5u.com/gongcheng/409413.html
標籤:
