我遇到了 Spring Data 未保存物體的問題。應用程式邏輯如下:
- 另一個應用程式正在偵聽負載非常重的 Kafka 主題(每秒數十條訊息),并將訊息插入到具有“新”狀態的資料庫中的表中。
- @Scheduled 方法加載具有“NEW”狀態的物體串列,這些物體被一個接一個地轉移到 FixedThreadPool(20 個執行緒),它們的狀態設定為“PROCESSING”,并且在同一個表上呼叫 saveAll 方法。
@Scheduled(fixedDelay = 10_000)
public void scheduled() {
Pageable pageable = PageRequest.of(0, pageSize);
List<EntityClass> entities = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
while (!entities.isEmpty()) {
entities.forEach(entity -> {
entity.setStatus(PROCESSING_STATUS);
process(entity);
});
entityService.saveAll(entities);
loggers = entityService.getAllByStatusOrderByIdDesc(NEW_STATUS, pageable);
}
private void process(Entity entity) {
threadPool.execute(
() -> processor.execute(
TaskBuilder.entity(entity).build()
)
);
}
- 在 FixedThreadPool 的執行緒中,一些業務邏輯由 TaskProcessor 通過幾個類發生,導致每個物體的狀態更改為“OK”或“ERROR”。
- 業務邏輯的最后一步是將具有結果狀態的物體更新到它被匯集出來的同一個表,這就是我遇到問題的地方:部分物體不會持久存在并永遠保持“處理”狀態。從最后一步的最后一部分開始:
...
log.info("[SaveStep] [execute] status before {}", entity.getStatus());
Entity entity = entityService.save(entity);
log.info("[SaveStep] [execute] status after {}", entity.getStatus());
兩種日志方法都顯示正確的狀態(“OK”或“ERROR”),但資料庫行一直包含“PROCESSING”狀態。我試圖用多種變體重做一個 entityService.save()(entityRepository 是一個常規的 JPARepository):
- 常規 repo.save() 不起作用:
public Entity save(Entity entity) {
return repository.save(logger);
}
- 獲取、重繪 和保存不起作用:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
saved.setStatus(entity.getStatus());
return repository.save(saved);
}
- 我認為 Hikari 池大小可能是一個問題,但是將其增加到 maximum-pool-size=20 甚至 30 都沒有問題。
- 如果我用記錄器包圍選項 2,我在兩個記錄器行中都有正確的狀態,但在資料庫中沒有:
@Transactional
public Entity save(Entity entity) {
Entity saved = repository.getFirstById(entity.getId());
log.info("[Service] [save] status before {}", saved.getStatus());
saved.setStatus(entity.getStatus());
log.info("[Service] [save] status after {}", saved.getStatus());
return repository.save(saved);
}
- 我試過原生查詢,但沒有成功:
@Modifying
@Transactional
@Query(value = "UPDATE entity_table_name set status = :status where id = :id", nativeQuery = true)
void updateStatus(@Param("id") Long id, @Param("status") String status);
- 我試過用 repo.saveAndFlush() 替換 repo.save() 但沒有效果。
目前我沒有想法嘗試。主要的侮辱是大多數物體最終被資料庫正確消化,只有大約 20% 被 save() 忽略而沒有任何拋出例外。
uj5u.com熱心網友回復:
您在將訊息提交到池后將訊息設定為“正在處理”,這意味著池可能會先到達它,然后可能會發生“處理”更新。不確定這是否是您的問題,但您正在做的事情有可能。在提交到池之前嘗試設定并保存/提交“處理”更新。
uj5u.com熱心網友回復:
感謝泰勒,因為他絕對正確地指出了我的問題。物體串列已被并行處理并以正確的狀態保存得足夠快,甚至在調度程式點擊他的 saveAll('PROCESSING') 行之前,這會導致狀態重寫。
轉載請註明出處,本文鏈接:https://www.uj5u.com/shujuku/467442.html
