我有一個以Avro格式發送訊息的生產者和一個偵聽這些訊息的消費者。
@RetryableTopic我還通過在我的消費者中使用來處理錯誤來實作非阻塞重試。
當消費者無法反序列化訊息時(由于模式更改或其他原因),它不會將該訊息放入-retry主題中。它直接將其發送到-dlt主題。
我也希望DeserializationExceptions 被重試。原因是當這些錯誤被重試時,我可以在我的消費者中部署一個修復程式,以便重試最終可以成功。
我嘗試了該include選項,@RetryableTopic但它似乎不適用于DeserializationException.
@RetryableTopic(
attempts = "${app.consumer.retry.topic.count:5}",
backoff = @Backoff(delayExpression = "${app.consumer.retry.topic.back-off:2000}"),
fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC,
include = {DeserializationException.class} // does not work
)
這是一個錯誤@RetryableTopic還是有其他方法可以實作這一目標?
uj5u.com熱心網友回復:
正如您所描述的,由于Spring Kafka 2.8.3有一組全域致命例外,將導致記錄被DLT直接轉發到。
DLT處理這種例外的通常模式是,在部署修復程式之后,讓某種控制臺應用程式從在主題中重復。
對于您描述的模式,您可以FATAL通過提供一個DestinationTopicResolverbean 來管理這組全域例外,例如:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(Clock.systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
return ddtr;
}
請讓我知道這是否適合您。謝謝。
uj5u.com熱心網友回復:
以下是我們如何實作它:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver destinationTopicResolver(ApplicationContext context) {
DefaultDestinationTopicResolver resolver = new DefaultDestinationTopicResolver(systemUTC(), context);
resolver.setClassifications(emptyMap(), true);
return resolver;
}
這樣我們就不必一一指定要包含的每個例外。另一個解決方案是 Tomaz 建議的:
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
public DestinationTopicResolver topicResolver(ApplicationContext applicationContext) {
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(systemUTC(), applicationContext);
ddtr.removeClassification(DeserializationException.class);
ddtr.removeClassification(ClassCastException.class);
return ddtr;
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/464399.html
