使用springboot整合kafka時,消費者使用@KafkaListener監聽批量訊息,然后手動提交偏移。然后我現在有一個需求 :就是消費者觸發特定邏輯后需要kafka暫停拉取90秒。可以用nack去暫停kafka嗎?有大神用過嗎?備注:我用過KafkaListenerEndpointRegistry可以呼叫start()和stop()去手動的啟動和停止kafka消費。但是我不想手動干預,并且我發現KafkaListenerEndpointRegistry里面start()容易觸發再均衡,kafka起來很慢。 我就想讓kafak暫停一段時間,然后自動再去拉取。代碼簡單的實體如下:
@KafkaListener(topics = KafkaConstants.TOPIC_NAME, containerFactory = "batchFactory")
public void batchPullMessage(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
try {
log.info("kafka拉取的訊息數量: {}", records.size());
final CountDownLatch latch = new CountDownLatch(records.size());//初始化計數器
for (ConsumerRecord<String, String> record : records) {
log.error("訊息偏移量:{}", record.offset());
demoService.test(record.value(), latch);//異步消費
}
latch.await();//等待當前所有子執行緒執行完畢
if ("業務代碼邏輯觸發需要kafka暫停拉取90秒") {
ack.nack(records.size() - 1, 90 * 1000);
} else {
ack.acknowledge();//提交kafka的offset
}
} catch (Exception e) {
log.error("kafka消費者例外", e);
}
}
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/273040.html
標籤:Java相關
上一篇:java swing
下一篇:java 16進制轉時間問題
