我正在使用 spring-kafka 2.8.0 并且我正在嘗試為批處理 kafka consumer實作非阻塞重試。這是我的配置和消費者:
@Configuration
public class KafkaConfig {
@Bean
public KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, GenericRecord>>
batchListenerFactory(ConsumerFactory<Object, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, GenericRecord> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
factory.setBatchListener(true);
return factory;
}
}
@Component
public class MyConsumer {
@KafkaListener(
topics = "my-topic",
containerFactory = "batchListenerFactory"
)
@RetryableTopic(
backoff = @Backoff(delay = 1000, multiplier = 2.0),
attempts = "4",
topicSuffixingStrategy = SUFFIX_WITH_INDEX_VALUE,
autoCreateTopics = "false"
)
public void consume(List<ConsumerRecord<String, GenericRecord>> messages) {
// do some stuff
}
}
但是在 sturtup 我得到以下例外:
java.lang.IllegalArgumentException: The provided class BatchMessagingMessageListenerAdapter is not assignable from AcknowledgingConsumerAwareMessageListener
我的問題是:
有沒有辦法將批量消費者與
@RetryableTopic?有沒有其他方法可以為批量消費者實作非阻塞重試?是否可以
RetryTemplate用于此目的?
uj5u.com熱心網友回復:
@RetryableTopic 批處理偵聽器不支持。
在RecoveringBatchErrorHandler(DefaultErrorHandler2.8及更高版本)支持發送一個失敗的記錄批內一紙空文話題,與聽眾的幫助投擲BatchListenerFailedException失敗指示哪些記錄。
然后,您必須在該主題上實作自己的偵聽器。
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/409040.html
標籤:
