假設我有一個@KafkaListener類,@KafkaHandler里面有一個方法來處理任何收到的訊息并執行一些資料庫操作。
我希望對此類中提交(或回滾)資料庫更改(即手動管理資料庫事務)的方式和時間進行細粒度控制。無論 DB 事務結果如何,都可以提交消耗的訊息偏移量。
這是我所擁有的簡化版本:
@Service
@RequiredArgsConstructor
@KafkaListener(
topics = "${kafka.topic.foo}",
groupId = "${spring.kafka.consumer.group-id-foo}",
containerFactory = "kafkaListenerContainerFactoryFoo")
public class FooMessageConsumer {
// ...
private final EntityManager entityManager;
@KafkaHandler
public void handleMessage(FooMessage msg) {
// ...
handleDBOperations(msg);
// ...
}
void handleDBOperations(msg) {
try {
entityManager.getTransaction().begin();
// ...
entityManager.getTransaction().commit();
} catch (Exception e) {
log.error(e.getLocalizedMessage(), e);
entityManager.getTransaction().rollback();
}
}
}
當收到并entityManager.getTransaction().begin();呼叫訊息時,這會導致例外:
java.lang.IllegalStateException: Not allowed to create transaction on shared EntityManager - use Spring transactions or EJB CMT instead
為什么我不能在這里創建交易?
如果我洗掉EntityManager并添加@Transactional注釋到具有資料庫操作的方法(盡管這不是我想要的),那么它會導致另一個例外:
TransactionRequiredException Executing an update/delete query
它似乎完全忽略了注釋。這是否與擁有自己的事務管理的 Kafka 消費者有關?
簡而言之,我在這里做錯了什么,如何在@KafkaHandler方法中管理資料庫事務?
任何幫助表示贊賞。提前致謝。
uj5u.com熱心網友回復:
嘗試使用 Springs TransactionTemplate:https : //docs.spring.io/spring-framework/docs/3.0.0.M4/reference/html/ch10s06.html
如果您的用例很簡單,Springs 宣告式事務管理也應該讓您實作您要求的行為:https : //docs.spring.io/spring-framework/docs/3.0.0.M3/reference/html /ch11s05.html
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/395404.html
