我有一個Spring Boot專案,在Confluent Kakfa主題上運行幾個Kafka使用者(@KafkaListener),有8個磁區.每個消費者的并發性設定為1.主題加載了來自檔案和檔案的大約一百萬行訊息.消費者批量使用它們來驗證,處理和更新資料庫.
消費者工廠具有以下設定 – max.poll.records = 10000,fetch.min.bytes = 100000,fetch.max.wait.ms = 1000,session.timeout.ms = 240000.
更新06/04這是消費者工廠設定.它是Spring-Kafka-1.3.1.RELEASE. Confluent Kafka經紀人是版本
@Bean
public ConsumerFactory<String, ListingMessage> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(ConsumerConfig.GROUP_ID_CONFIG, CONSUMER_GROUP);
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 10000);
props.put(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, 100000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 1000);
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 240000);
return new DefaultKafkaConsumerFactory<>(props, new StringDeserializer(),
new JsonDeserializer<>(ListingMessage.class));
}
@Bean(KAFKA_LISTENER_CONTAINER_FACTORY) @Autowired
public concurrentKafkaListenerContainerFactory<String, ListingMessage> listingKafkaListenerContainerFactory(
ConsumerFactory<String, ListingMessage> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, ListingMessage> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(listingConsumerFactory);
factory.setConcurrency(1);
factory.setAutoStartup(false);
factory.setBatchListener(true);
return factory;
}
注意:Container Factory的自動啟動設定為false.這是在加載大檔案時手動啟動/停止使用者.
在運行大約1小時(時間變化)之后,即使主題有許多訊息可用,消費者也會停止使用其主題中的訊息. consume方法中有一個日志陳述句,用于停止在日志中列印.
我使用“./kafka-consumer-groups”命令跟蹤消費者的狀態,并在一段時間后看到該組中沒有消費者.
$./kafka-consumer-groups --bootstrap-server localhost:9092 --describe --group group_name
此消費者失敗的日志中沒有錯誤.使用者方法包含在try-catch塊中,因此它將捕獲在處理訊息期間拋出的任何例外.
我們如何設計Spring-Kafka消費者,以便在消費者停止消費時重新啟動消費者?當消費者停止時,是否有可以記錄確切點的監聽器?這是因為將并發性設定為1?我必須將并發性設定為1的原因是,如果此消費者具有更多并發性,那么其他消費者會放慢速度.
uj5u.com熱心網友回復:
我剛剛運行了30秒max.poll.interval.ms = 30000的測驗,暫停了聽眾,30秒后恢復了;我在日志中看到了這一點……2018-06-04 18:35:59.361 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so50687794-0]
foo
2018-06-04 18:37:07.347 ERROR 4191 --- [ foo-0-C-1] o.s.kafka.listener.LoggingErrorHandler : Error while processing: null
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:722) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:600) ~[kafka-clients-1.0.1.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1250) ~[kafka-clients-1.0.1.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:1329) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1190) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:688) ~[spring-kafka-2.1.6.RELEASE.jar:2.1.6.RELEASE]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_131]
at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_131]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_131]
2018-06-04 18:37:07.350 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=foo] Revoking previously assigned partitions [so50687794-0]
2018-06-04 18:37:07.351 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions revoked: [so50687794-0]
2018-06-04 18:37:07.351 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=foo] (Re-)joining group
2018-06-04 18:37:10.400 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=consumer-1, groupId=foo] Successfully joined group with generation 15
2018-06-04 18:37:10.401 INFO 4191 --- [ foo-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-1, groupId=foo] Setting newly assigned partitions [so50687794-0]
2018-06-04 18:37:10.445 INFO 4191 --- [ foo-0-C-1] o.s.k.l.KafkaMessageListenerContainer : partitions assigned: [so50687794-0]
foo
您可以看到重新平衡后,重新添加消費者并重新傳遞相同的訊息;這是我所期待的.
我得到了相同的結果;即使是1.3.1.
uj5u.com熱心網友回復:
樓主,解決了嗎,我也出現這個問題了轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/169914.html
標籤:其他技術討論專區
上一篇:調節閥的安裝和檢修
