我們的基礎設施必須管理 2 個 Spring Boot 應用程式之間的資訊交換。部分資訊基于 Oracle DB。我們使用 Kafka 來通知接收者必須管理哪些資訊。
(注意:我必須概括代碼)
我們有 Kafka 生產者,配置如下:
@Configuration
@Lazy(false)
public class KafkaConfig {
//...
@Bean(name="trManagerJpaKafka")
public ChainedKafkaTransactionManager<Object, Object> chainedTm(KafkaTransactionManager<String, String> ktm,
JpaTransactionManager jpaTransactionManager) {
return new ChainedKafkaTransactionManager<>(jpaTransactionManager,ktm);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<?, ?> kafkaListenerContainerFactory(KafkaOperations<?, ?> template,
ConcurrentKafkaListenerContainerFactoryConfigurer configurer,
ConsumerFactory<Object, Object> kafkaConsumerFactory, ChainedKafkaTransactionManager<Object, Object> chainedTm) {
ConcurrentKafkaListenerContainerFactory<Object, Object> factory = new ConcurrentKafkaListenerContainerFactory<>();
configurer.configure(factory, kafkaConsumerFactory);
//...
factory.setErrorHandler(new SeekToCurrentErrorHandler(recoverer,new FixedBackOff(0L, 2)));
factory.getContainerProperties().setTransactionManager(chainedTm);
return factory;
}
}
然后我們有一些服務產生 Kafka 訊息,像這樣
public class SomeService implements ISomeService {
@Override
@Transactional(transactionManager = "trManagerJpaKafka" , rollbackFor = { Exception.class })
public boolean createMessage(Long idIstituto, CreateRDM createRDM) {
...
Entity entity = new Entity();
entity.setSomething(something);
entity.setSomethingElse(somethingElse);
entity = entityRepository.save(entity);
JSONObject message = new JSONObject();
try {
message.put("id", entity.getId());
kafkaProducerService.sendMessage(message.toString());
...
}
...
}
public class SomeGenericKafkaService implements ISomeGenericKafkaService {
...
public void sendMessage(String data) {
Map<String, Object> headers = new HashMap<String, Object>();
headers.put(KafkaHeaders.TOPIC, someTopic);
headers.put(KafkaHeaders.MESSAGE_KEY, UUID.randomUUID().toString());
KafkaProducerCallback callback = new KafkaProducerCallback();
kafkaTemplate.send(new GenericMessage<String>(data, headers)).addCallback(callback);
if (callback.isError()) {
throw new KafkaException(callback.getThrowable());
}
}
...
}
在消費者 Spring Boot 應用程式中,我們有時會遇到這個問題:它消費來自 Kafka 的訊息,但有時 DB 上的記錄尚未存在,因此當我們嘗試訪問 DB 記錄時會出現例外......生產者中的事務不是原子的Jpa和卡夫卡?在 Jpa 插入之前提交 Kafka 訊息?
同時,作為緊急解決方案,為了管理從 Oracle 檢索資料時的例外,我們在接收器中進行了一些重試,例如他的:
...
Optional<Entity> optionalEntity = entityRepository.findById(id);
if (!optionalEntity.isPresent()) {
for (int i = 0; i < kafkaDbMaxRetry; i ) {
Thread.sleep(kafkaMessageDelay);
optionalEntity = entityRepository.findById(id);
if (optionalEntity.isPresent()) break;
}
}
entity = optionalEntity.get();
...
如何調整生產者并避免消費者出現這種不良代碼?
uj5u.com熱心網友回復:
Kafka 僅按順序處理來自一個磁區的事件,但是如果事件最終位于兩個不同的磁區中,則它們可能會被亂序處理。因此,您可以實作您的發送訊息邏輯以包含例如您的資料庫記錄的主鍵作為磁區鍵。
Kafka 使用 key 來指定目標磁區。默認策略是根據鍵的散列選擇磁區,或者如果鍵為空則使用回圈演算法。
您可以實作自定義org.apache.kafka.clients.producer.Partitioner將訊息映射到您需要的磁區。您類的名稱必須設定為生產者的 partitioner.class 屬性。
uj5u.com熱心網友回復:
鏈式事務管理器只提供“Best Effort 1 Phase Commit”——Kafka不能參與JTA/XA事務;不可能用這些不同的技術提供原子更新。
有關更多資訊,請參閱https://www.infoworld.com/article/2077963/distributed-transactions-in-spring--with-and-without-xa.html。
您必須使您的偵聽器具有冪等性 - 一種常見模式是將主題/磁區/偏移量與資料一起存盤,以便您可以檢查它是否已被處理。
轉載請註明出處,本文鏈接:https://www.uj5u.com/caozuo/342768.html
