如果 Kafka 服務器(暫時)關閉,我的 Spring Boot 應用程式ReactiveKafkaConsumerTemplate會不斷嘗試連接失敗,從而導致不必要的流量并弄亂日志檔案:
2021-11-10 14:45:30.265 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:32.792 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Connection to node -1 (localhost/127.0.0.1:29092) could not be established. Broker may not be available.
2021-11-10 14:45:34.845 WARN 24984 --- [onsumer-group-1] org.apache.kafka.clients.NetworkClient : [Consumer clientId=consumer-group-1, groupId=consumer-group] Bootstrap broker localhost:29092 (id: -1 rack: null) disconnected
是否可以使用斷路器之類的東西(此處或此處的靈感),以便 Spring Boot Kafka 客戶端在出現故障(或者甚至更好的幾次連續故障)的情況下減慢其連接嘗試的速度,并回傳到正常速度只有在服務器再次啟動后才能正常運行?
是否已經有現成的配置引數,或任何其他解決方案?
我知道引數 reconnect.backoff.ms,這就是我創建ReactiveKafkaConsumerTemplatebean 的方式:
@Bean
public ReactiveKafkaConsumerTemplate<String, MyEvent> kafkaConsumer(KafkaProperties properties) {
final Map<String, Object> map = new HashMap<>(properties.buildConsumerProperties());
map.put(ConsumerConfig.GROUP_ID_CONFIG, "MyGroup");
map.put(ConsumerConfig.RECONNECT_BACKOFF_MS_CONFIG, 10_000L);
final JsonDeserializer<DisplayCurrencyEvent> jsonDeserializer = new JsonDeserializer<>();
jsonDeserializer.addTrustedPackages("com.example.myapplication");
return new ReactiveKafkaConsumerTemplate<>(
ReceiverOptions
.<String, MyEvent>create(map)
.withKeyDeserializer(new ErrorHandlingDeserializer<>(new StringDeserializer()))
.withValueDeserializer(new ErrorHandlingDeserializer<>(jsonDeserializer))
.subscription(List.of("MyTopic")));
}
而且消費者仍然每 3 秒嘗試連接一次。
uj5u.com熱心網友回復:
請參閱https://kafka.apache.org/documentation/#consumerconfigs_retry.backoff.ms
在嘗試重新連接到給定主機之前等待的基本時間。這避免了在緊密回圈中重復連接到主機。此退避適用于客戶端到代理的所有連接嘗試。
和https://kafka.apache.org/documentation/#consumerconfigs_reconnect.backoff.max.ms
重新連接到反復連接失敗的代理時要等待的最長時間(以毫秒為單位)。如果提供,則每個主機的退避將在每次連續連接失敗時呈指數增加,直至達到此最大值。計算回退增量后,添加 20% 的隨機抖動以避免連接風暴。
和
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/355780.html
