我有一個方法
@Service
public class MyService {
public Mono<Integer> processData() {
... // very long reactive operation
}
}
在正常的程式流程中,我通過 Kafka 事件異步呼叫此方法。
出于測驗目的,我需要將該方法公開為 Web 服務,但該方法應公開為異步:僅回傳 HTTP 代碼 200 OK(“請求已接受”)并在后臺繼續處理資料。
Mono#subscribe()僅從控制器方法呼叫和回傳可以嗎(= 它沒有任何不需要的副作用)?
@RestController
@RequiredArgsConstructor
public class MyController {
private final MyService service;
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
}
或者最好這樣做(在這里我對 IntelliJ 的警告感到困惑,也許與https://youtrack.jetbrains.com/issue/IDEA-276018相同?):
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe(); // IntelliJ complains "Inappropriate 'subscribe' call" but I think it's a false alarm in my case(?)
return Mono.empty();
}
或者其他一些解決方案?
uj5u.com熱心網友回復:
僅呼叫 Mono#subscribe() 并從控制器方法回傳是否可以(= 沒有任何不需要的副作用)?
有副作用,但你可能會接受它們:
- 這真的是一發不可收拾——這意味著雖然你永遠不會收到成功的通知(大多數人意識到這一點),但你也永遠不會收到失敗的通知(很少有人意識到這一點。)
- 如果該程序因某種原因掛起,該發布者將永遠無法完成,您將無從得知。由于您訂閱了有界彈性執行緒池,因此它也會無限期地占用這些有限執行緒之一。
第一點你可能沒問題,或者你可能想在反應鏈中進一步記錄一些錯誤記錄作為副作用,所以如果出現問題,你至少有一個內部通知。
對于第二點 - 我建議在你的方法呼叫上設定一個(慷慨的)超時,這樣如果它沒有在設定的時間內完成,它至少會被取消,并且不再浪費資源。如果您正在運行異步任務,那么這不是一個大問題,因為它只會消耗一點記憶體。如果您在彈性調度程式上包裝阻塞呼叫,那么情況會更糟,因為您無限期地在該執行緒池中占用了一個執行緒。
我還會質疑為什么你需要在這里使用有界彈性調度程式 - 它用于包裝阻塞呼叫,這似乎不是這個用例的基礎。(要明確的是,如果您的服務被阻塞,那么您絕對應該將其包裝在彈性調度程式上 - 但如果不是,則沒有理由這樣做。)
最后,這個例子:
public Mono<Void> processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
return Mono.empty();
}
...是不該做什么的一個很好的例子,因為您正在創建一種“冒名頂替反應方法” - 有人可能非常合理地訂閱該回傳的發布者,認為它會在底層發布者完成時完成,這顯然不是'這里發生了什么。void在這種情況下,使用回傳型別并因此不回傳任何內容是正確的做法。
uj5u.com熱心網友回復:
您使用以下代碼的選項實際上是可以的:
@GetMapping
public void processData() {
service.processData()
.subscribeOn(Schedulers.boundedElastic())
.subscribe();
}
這實際上是您在一個@Scheduled方法中所做的事情,該方法僅不回傳任何內容,并且您顯式訂閱MonoorFlux以便發出元素。
轉載請註明出處,本文鏈接:https://www.uj5u.com/net/365411.html
標籤:爪哇 异步 弹簧-webflux 项目反应堆
上一篇:js承諾等到解決繼續“主要”
