我有使用 kafka binder 的 spring-cloud-stream 專案。應用程式以批處理模式消費訊息。我需要按特定標題過濾消費記錄。在這種情況下,我使用 BatchInterceptor:
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
BatchInterceptor<String, String> customInterceptor
) {
return (((container, destinationName, group) -> {
container.setBatchInterceptor(customInterceptor);
log.info("Container customized");
}));
}
@Bean
public BatchInterceptor<String, String> customInterceptor() {
return (consumerRecords, consumer) -> {
log.info("Origin records count: {}", consumerRecords.count());
final Set<TopicPartition> partitions = consumerRecords.partitions();
final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader
= Stream.of(partitions).flatMap(Collection::stream)
.collect(Collectors.toMap(
Function.identity(),
p -> Stream.ofNullable(consumerRecords.records(p))
.flatMap(Collection::stream)
.filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
.collect(Collectors.toList())
));
var filteredRecords = new ConsumerRecords<>(filteredByHeader);
log.info("Filtered count: {}", filteredRecords.count());
return filteredRecords;
};
}
示例代碼在這里批處理攔截器示例。
在我看到的日志中,記錄已成功過濾,但過濾后的記錄仍會進入消費者。
為什么 ButchInterceptor 不過濾記錄?如何通過啟用批處理模式的 spring-cloud-stream 中的特定標頭過濾 ConsumerRecords?您可以運行示例中的測驗來重現行為者。
uj5u.com熱心網友回復:
您正在使用不受 OSS 支持的非常舊的代碼(Boot 2.5.0)。
https://spring.io/projects/spring-boot#support
(云也是)。
我用當前版本測驗了你的攔截器,它作業正常。
啟動 2.7.5,云 2021.0.4:
@SpringBootApplication
public class So74203611Application {
private static final Logger log = LoggerFactory.getLogger(So74203611Application.class);
public static void main(String[] args) {
SpringApplication.run(So74203611Application.class, args);
}
@Bean
public ListenerContainerCustomizer<AbstractMessageListenerContainer<String, String>> customizer(
BatchInterceptor<String, String> customInterceptor) {
return (((container, destinationName, group) -> {
container.setBatchInterceptor(customInterceptor);
log.info("Container customized {}", destinationName);
}));
}
@Bean
public BatchInterceptor<String, String> customInterceptor() {
return (consumerRecords, consumer) -> {
log.info("Origin records count: {}", consumerRecords.count());
final Set<TopicPartition> partitions = consumerRecords.partitions();
final Map<TopicPartition, List<ConsumerRecord<String, String>>> filteredByHeader = Stream.of(partitions)
.flatMap(Collection::stream)
.collect(Collectors.toMap(Function.identity(),
p -> Stream.ofNullable(consumerRecords.records(p)).flatMap(Collection::stream)
.filter(r -> Objects.nonNull(r.headers().lastHeader("TEST")))
.collect(Collectors.toList())));
var filteredRecords = new ConsumerRecords<>(filteredByHeader);
log.info("Filtered count: {}", filteredRecords.count());
return filteredRecords;
};
}
@Bean
Consumer<List<String>> input() {
return str -> {
System.out.println(str);
};
}
@Bean
ApplicationRunner runner(KafkaTemplate<byte[], byte[]> template) {
return args -> {
Headers headers = new RecordHeaders();
headers.add("TEST", "foo".getBytes());
ProducerRecord<byte[], byte[]> rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "bar".getBytes(),
headers);
template.send(rec);
headers = new RecordHeaders();
rec = new ProducerRecord<>("input-in-0", 0, 0L, null, "baz".getBytes(), headers);
template.send(rec);
template.send(rec);
};
}
}
spring.cloud.stream.bindings.input-in-0.group=foo
spring.cloud.stream.bindings.input-in-0.consumer.batch-mode=true
[bar]
轉載請註明出處,本文鏈接:https://www.uj5u.com/ruanti/520505.html
標籤:爪哇春天春天卡夫卡春天云流spring-cloud-stream-binder-kafka
