我在kafka中消費批次,在spring cloud stream kafka binder中不支持重試,有一個選項,你可以配置一個SeekToCurrentBatchErrorHandler(使用ListenerContainerCustomizer)來實作類似于binder中的重試功能。
我嘗試了同樣的方法,但是使用SeekToCurrentBatchErrorHandler,但是它的重試次數超過了設定的時間,即3次。
我怎樣才能做到這一點? 我想重試整個批次。
我怎樣才能做到呢?
我怎樣才能將整個批次發送到DLQ主題? 就像對于記錄監聽器,我曾經將deliveryAttempt(retry)匹配到3,然后發送到DLQ主題,在監聽器中檢查。
我怎樣才能讓我的系統不受影響呢?
我已經檢查了這個鏈接,這正是我的問題所在,但是一個例子將是很大的幫助,用這個庫spring-cloud-stream-kafka-binder,我可以實作嗎?請用一個例子來解釋,我是新手。
目前我有以下代碼。
@Configuration
public class ConsumerConfig {
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer() {
return (container, dest, group) -> {
container.getContainerProperties().setAckOnError(false)。
SeekToCurrentBatchErrorHandler seekToCurrentBatchErrorHandler。
= new SeekToCurrentBatchErrorHandler()。
seekToCurrentBatchErrorHandler.setBackOff(new FixedBackOff(0L, 2L)。
container.setBatchErrorHandler(seekToCurrentBatchErrorHandler)。
//container.setBatchErrorHandler(new BatchLoggingErrorHandler());.
};
}
}
Listerner:
@StreamListener(ActivityChannel.INPUT_CHANNEL)
public void handleActivity(List<Message<Event> > messages,
@Header(name = KafkaHeaders.ACKNOWLEDGMENT) Acknowledgment
確認。
@Header(name = "deliveryAttempt", defaultValue = "1") int
deliveryAttempt) {
try {
log.info("收到活動訊息,訊息長度{}", messages.size())。
nodeConfigActivityBatchProcessor.processNodeConfigActivity(mages)。
acknowledgment.confirmledge()。
log.debug("成功處理了活動訊息{}!!", messages.size())。
} catch (MessagePublishException e) {
if (deliveryAttempt == 3) {
log.error(
String.format("發生例外,發送訊息=%s到DLQ由于。"。
"message")。)
e);
publisher.publishToDlq(EventType.UPDATE_FAILED, "message", e.getMessage() )。
} else {
throw e;
}
}
在看到@Gary的回復后,添加了帶有RetryBatchErrorHandler的ListenerContainerCustomizer @Bean,但無法匯入該類。附上截圖。
無法匯入 RetryingBatchErrorHandler
uj5u.com熱心網友回復:
使用RetryingBatchErrorHandler來發送整個批次到DLT
https://docs.spring.io/spring-kafka/docs/current/reference/html/#retrying-batch-eh
使用一個 RecoveringBatchErrorHandler,你可以拋出一個 BatchListenerFailedException 來告訴它批處理中的哪個記錄失敗了。
https://docs.spring.io/spring-kafka/docs/current/reference/html/#recovering-batch-eh
在這兩種情況下,向錯誤處理程式提供一個DeadLetterPublishingRecoverer;在裝訂器中禁用DLT。
EDIT
這是一個例子;它使用了較新的函式式風格,而不是廢棄的 @StreamListener,但是同樣的概念也適用(但是你應該考慮轉向函式式風格)。
@SpringBootApplication
public class So69175145Application {
public static void main(String[] args){
SpringApplication.run(So69175145Application.class, args)。
}
@Bean
ListenerContainerCustomizer<AbstractMessageListenerContainer<?, ?>> customizer(
KafkaTemplate<byte[], byte[]> template) {
return (container, dest, group) -> {
容器。 setBatchErrorHandler(new RetryingBatchErrorHandler(new FixedBackOff(5000L, 2L)。)
new DeadLetterPublishingRecoverer(模板。
(rec, ex) -> new TopicPartition("錯誤。" dest "。" group, rec.partition()))));
};
}
/* *DLT主題不會有任何問題。
* 由于enableDlq是假的,DLT主題不會被自動提供。
*/
@Bean
public NewTopic topic() {
return TopicBuilder.name("errors.so69175145.grp").partitions(1).replicas(1) .build()
}
/*.
*相當于@StreamListener的功能
*/
@Bean
public Consumer<List<String>> input {
return list -> {
System.out.println(list)。
throw new RuntimeException("test"/span>)。
};
}
/*
*這里不需要--只是為了顯示我們把它們發送到了DLT上
*/
@KafkaListener(id = "so69175145", topics = "errors.so69175145.grp")/span>
public void listen(String in) {
System.out.println("From DLT: " in);
}
spring.cloud.stream.bindings.input-0.__span>destination=so69175145
spring.cloud.stream.bindings.input-0.group=grp
spring.cloud.stream.bindings.input-0.content type=text/plain
spring.cloud.stream.bindings.input-0.consumer.batch-mode=true。
# for DLT監聽器
spring.kafka.consumer.auto-offset-reset=earliest
[foo] 。
2021-09-14 09:55:32.838ERROR ...
...
[foo]。
2021-09-14 09:55:37.873ERROR...
...
[foo]。
2021-09-14 09:55:42.886ERROR ...
...
從DLT: foo
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/307783.html
標籤:
