一般在SpringBoot使用kafka,通常用
@KafkaListener注解來進行監聽消費,然而某些時候,我們不需要監聽而要以定時拉取的方式進行消費,本文主要就是簡單記錄此方式的實作方法,
//批次大小
private static Integer batchSize = 3;
//批次時間
private static Integer batchTime = 5;
@Resource
private KafkaProperties kafkaProperties;
@Test
void kafkaTest() {
//配置消費者
Map<String, Object> properties = kafkaProperties.buildConsumerProperties();
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "test-group");//指定消費組
properties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, batchSize); //指定批次消費條數
properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); //禁用自動提交
//建立消費者
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
//獲取所有partition資訊
List<PartitionInfo> partitionList = kafkaConsumer.partitionsFor("test-consumer");
Map<TopicPartition, Integer> topicPartitionMap = MapUtil.newHashMap();
partitionList.forEach(item
-> topicPartitionMap.put(new TopicPartition(item.topic(), item.partition()), item.partition()));
//訂閱topic并設定起始offset
kafkaConsumer.assign(topicPartitionMap.keySet());
topicPartitionMap.forEach(kafkaConsumer::seek);
//啟動消費執行緒(僅用作示例)
((Runnable) () -> {
Duration duration = Duration.ofSeconds(batchTime);
long batchTimeMs = batchTime * 1000L;
Map<Integer, ConsumerRecord<String, String>> recordMap = MapUtil.newHashMap();
while (true) {
try {
TimeInterval interval = DateUtil.timer();
ConsumerRecords<String, String> records = kafkaConsumer.poll(duration);
int count = records.count();
log.info("測驗消費獲取到資料 => {} 條", count);
if (count > 0) {
//處理資料
List<String> values = CollUtil.newArrayList();
records.forEach(item -> values.add(item.value()));
//記錄當前批次每個Partition最小offset
for (ConsumerRecord<String, String> item : records) {
values.add(item.value());
if (recordMap.containsKey(item.partition())) {
ConsumerRecord<String, String> original = recordMap.get(item.partition());
if (item.offset() < original.offset()) {
recordMap.put(item.partition(), item);
}
} else {
recordMap.put(item.partition(), item);
}
}
//執行業務,拋出例外
throw new RuntimeException("測驗錯誤");
//同步提交offset
kafkaConsumer.commitSync();
//正常提交后清除記錄
recordMap.clear();
}
//批次消費達到上限,不休眠直接進行下一次消費
if (batchSize == count) {
continue;
}
//計算消費耗時并休眠
long used = interval.intervalMs();
if (used < batchTimeMs) {
ThreadUtil.safeSleep(batchTimeMs - used);
}
} catch (Exception e) {
log.error("消費出錯 => {}", e.getMessage());
recordMap.forEach((k, v) -> kafkaConsumer.seek(new TopicPartition(v.topic(), v.partition()), v.offset()));
log.error(ExceptionUtil.stacktraceToString(e));
ThreadUtil.safeSleep(batchTimeMs);
}
}
}).run();
}
(備注:主要涉及依賴:spring-kafka、hutool)
文章轉載自我的個人博客:https://blog.fordes.top,歡迎訪問交流,文章如有謬誤請務必指出~
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/346861.html
標籤:Java
上一篇:第一個mybatis程式的報錯-Type interface com.xiaoma.mapper.UserMapper is not known to the MapperRegistry
下一篇:PHP面試(A-01)
