使用 Kafka 模板發送多種型別的 DTO 的更好方法是什么?
方法 1:使用Object作為 ProducerFactory 的型別值,這樣我就可以使用我的 Kafka 模板發送多種型別的物件。
@Bean
public ProducerFactory<String, Object> ProducerFactory() {
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaTemplate<String, Object> KafkaTemplate() {
return new KafkaTemplate<>(ProducerFactory());
}
方法 2:為我要發送的每個物件添加另一個 ProducerFactory 和 Kafka 模板配置。
@Bean
public ProducerFactory<String, Student> ProducerFactory1() {
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
public ProducerFactory<String, Person> ProducerFactory2() {
Map<String, Object> config = new HashMap<String, Object>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class);
return new DefaultKafkaProducerFactory<>(config);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, String> concurrentKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
@Bean
public KafkaTemplate<String, Student> leadTimeKafkaTemplate() {
return new KafkaTemplate<>(ProducerFactory());
}
@Bean
public KafkaTemplate<String, Person> leadTimeKafkaTemplate() {
return new KafkaTemplate<>(ProducerFactory());
}
uj5u.com熱心網友回復:
如果要使用 JSONSerializer,請使用 Jackson 的 JsonNode,而不是無法正確序列化的普通 Java 物件
否則,任何一個都可以,但 Kafka Javadoc 建議您只使用一個 Producer 實體,如果可能的話
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/514384.html
