文章目錄
- 背景
- 相關代碼
- 現象
- 解決方法
- Reference
背景
專案中用到了kafka訊息佇列,在開發測驗程序中發現了訊息端設定的最大重試次數失效的情況,具體資訊如下:
- consumer: 3
- partition:1
- maxRetryTimes:15
- spring-kafka: 2.2.15.RELEASE
- kafka-client: 2.0.1
相關代碼
消費者config檔案
@Configuration
@EnableKafka
@Slf4j
public class KafkaConsumerConfig {
@Bean
KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<Integer, String>> demoContainerFactory() {
ConcurrentKafkaListenerContainerFactory<Integer, String>
factory = new ConcurrentKafkaListenerContainerFactory<>();
// 設定消費者工廠
factory.setConsumerFactory(demoContainerFactory());
// 消費者組中執行緒數量
factory.setConcurrency(3);
// 當使用批量監聽器時需要設定為true
factory.setBatchListener(false);
// 拉取超時時間
factory.getContainerProperties().setPollTimeout(3000);
// 最大重試次數3次
SeekToCurrentErrorHandler seekToCurrentErrorHandler = new SeekToCurrentErrorHandler((consumerRecord, e) -> {
log.error("消費訊息例外.拋棄這個訊息,{}", consumerRecord.toString(), e);
}, 3);
factory.setErrorHandler(seekToCurrentErrorHandler);
return factory;
}
消費者業務代碼
@Component
@Slf4j
public class DemoSingleConsumer {
@Autowired
private DemoHandler demoHandler;
/**
* 監聽 topic 進行單條消費
*/
@KafkaListener(topics = {KafkaConstants.TOPIC}, groupId = KafkaConstants.GROUPID,
containerFactory = "demoContainerFactory", errorHandler = "listenErrorHandler")
public void kafkaListener(ConsumerRecord<String, String> message) {
log.info("消費訊息開始 msg={}", JSONUtil.toJSONString(message.value()));
SendMessage message = JSONUtil.parseObject(message.value(), ASendMessage.class);
try {
demoHandler.process(message);
} catch (Throwable e) {
log.error("訊息消費例外,messageBody={}", JSONObject.toJSONString(message.value()), e);
}
}
現象
上述代碼的生產者啟動后,手動給對應topic生產一個訊息“clear”,由于定義的訊息體是一個json,顯然這次生產的訊息不符合彼此的協議,因此會報如下的錯,
message:clear
exception:Listener method 'public void com.demo.DemoConsumer.kafkaListener(org.apache.kafka.clients.consumer.ConsumerRecord<java.lang.String, java.lang.String>,org.apache.kafka.clients.consumer.Consumer<java.lang.String, java.lang.String>,java.lang.String,int)' threw exception; nested exception is com.alibaba.fastjson.JSONException: syntax error, expect {, actual error, pos 0, fastjson-version 1.2.72
上述報錯我們是可以理解的,但是kafka消費者一直重復上述的訊息,即毒丸問題,因為消費者反序列化是在spring poll()之前的,所以spring是沒法處理的,即這個問題會一直存在,
在用戶應用程式中無法處理毒藥的影響很大,讓我們來看看發生了什么:
- 消費者應用程式正在使用Kafka主題,
- 在某個時間點,應用程式無法對記錄進行反序列化(遇到毒丸),
- 消費者不能處理毒丸,
- 因為使用者偏移量沒有向前移動,所以阻止了主題磁區的使用,
- 消費者將一次又一次地(非常迅速地)嘗試反序列化記錄,但是永遠不會成功,因此,您的消費者應用程式將陷入一個無窮回圈,嘗試對失敗的記錄進行反序列化,
- 對于每次失敗,都會在您的日志檔案中寫入一行…糟糕!
現在我們已經知道重復消費的原因了,即消費者反序列化失敗,接下來就來解決問題,
解決方法
解決問題最好的方法就是先查看官方檔案,其次才是在各種論壇上搜索,這里先給出官網的方法,這里給出官網鏈接
When a deserializer fails to deserialize a message, Spring has no way to handle the problem because it occurs before the poll() returns. To solve this problem, version 2.2 introduced the ErrorHandlingDeserializer. This deserializer delegates to a real deserializer (key or value). If the delegate fails to deserialize the record content, the ErrorHandlingDeserializer returns a DeserializationException instead, containing the cause and raw bytes. When using a record-level MessageListener, if either the key or value contains a DeserializationException, the container’s ErrorHandler is called with the failed ConsumerRecord. When using a BatchMessageListener, the failed record is passed to the application along with the remaining records in the batch, so it is the responsibility of the application listener to check whether the key or value in a particular record is a DeserializationException.
官網給出的方法是給消費者設定 ErrorHandlingDeserializer 來處理反序列化時的例外,接下來是官網給出的配置,
... // other props
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ErrorHandlingDeserializer.class);
props.put(ErrorHandlingDeserializer.KEY_DESERIALIZER_CLASS, JsonDeserializer.class);
props.put(JsonDeserializer.KEY_DEFAULT_TYPE, "com.example.MyKey")
props.put(ErrorHandlingDeserializer.VALUE_DESERIALIZER_CLASS, JsonDeserializer.class.getName());
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.example.MyValue")
props.put(JsonDeserializer.TRUSTED_PACKAGES, "com.example")
return new DefaultKafkaConsumerFactory<>(props);
Reference
- kafkatemplate無法注入_kafka消費無限重試問題排查
- kafka專題:kafka的訊息丟失、重復消費、訊息積壓等線上問題匯總及優化
- Kafka常見的導致重復消費原因和解決方案
- Kafka auto.offset.reset值詳解
- Springboot整合Kafka-自動,手動提交偏移量
- Kafka在消費者反序列化時出現問題
- Apache Kafak如何處理訊息反序列化失敗等毒丸現象?
- Spring 整合Apache Kafka 處理事件流
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/433253.html
標籤:其他
上一篇:22年美賽c題-交易策略
下一篇:org.apache.zookeeper.KeeperException$ConnectionLossException: KeeperErrorCode = ConnectionLoss for /
