我有以下情況,我正在嘗試查看是否有解決方案:
- 必須并行進行兩個 Spring 服務呼叫(一個是現有的服務呼叫/邏輯,第二個是新添加的)。
- 然后結果應該由 RESTful API 合并和回傳。
然而,當涉及到服務發出的錯誤時,應該遵循以下規則:
僅當兩個服務呼叫都失敗時,API 才會失敗——這應該從主執行緒而不是
@Async池中拋出,因為它們是獨立的執行緒并且無法訪問彼此的例外(至少這是我的推理)。如果其中只有一個失敗,則通過另一個服務(異步)記錄錯誤,API 僅回傳成功服務的結果——這可以從各自的
@Async執行緒中完成。@Service public class Serv1 interface ServInf { @Async("customPool") public CompletableFuture<List<Obj>> getSomething(int id) { // The service ensures that the list is never null, but it can be empty return CompletableFuture.completedFuture(/* calling an external RESTful API */); } } @Service public class Serv2 interface ServInf { @Async("customPool") public CompletableFuture<List<Obj>> getSomething(int id) { // The service ensures that the list is never null, but it can be empty return CompletableFuture.completedFuture(/* calling another external RESTful API */); } } @RestController public class MyController { /** Typical service @Autowired's */ @GetMapping(/* ... */) public WrapperObj getById(String id) { CompletableFuture<List<String>> service1Result = service1.getSomething(id) .thenApply(result -> { if (result == null) { return null; } return result.stream().map(Obj::getName).collect(Collectors.toList()); }) .handle((result, exception) -> { if (exception != null) { // Call another asynchronous logging service which should be easy return null; } else { return result; } }); CompletableFuture<List<String>> service2Result = service2.getSomething(id) .thenApply(result -> { if (result == null) { return null; } return result.stream().map(Obj::getName).collect(Collectors.toList()); }) .handle((result, exception) -> { if (exception != null) { // Call another asynchronous logging service which should be easy return null; } else { return result; } }); // Blocking till we get the results from both services List<String> result1 = service1Result.get(); List<String> result2 = service2Result.get(); /** Where to get the exceptions thrown by the services if both fail if (result1 == null && result2 == null) { /** Signal that the API needs to fail as a whole */ throw new CustomException( /** where to get the messages? */); } /** merge and return the result */ } }
我的問題是,由于這些服務回傳某個物件的串列,即使我使用CompletableFuture.handle()并檢查是否存在例外,我也無法回傳例外本身以捕獲并讓 Spring Advice 類處理它(鏈接以回傳串列)。
我想到的一件事是AtomicReference用來捕獲例外并將它們設定在其中handle()并在期貨完成/完成后使用它們,例如
AtomicReference<Throwable> ce1 = new AtomicReference<>();
AtomicReference<Throwable> ce2 = new AtomicReference<>();
.handle((result, exception) -> {
if (exception != null) {
ce1.set(exception);
return null; // This signals that there was a failure
} else {
return result;
}
});
List<String> result1 = service1Result.get();
List<String> result2 = service2Result.get();
/** Where to get the exceptions thrown by the services if both fail
if (result1 == null && result2 == null) {
/** Signal that the API needs to fail as a whole */
throw new CustomException(/** do logic to capture ce1.get().getMessage() ce2.get().getMessage() */);
}
首先,這聽起來像是多執行緒異步呼叫中的可行解決方案嗎?
Second, this looks messy, so I was wondering if there is a more elegant way of capturing these exceptions outside of Spring async pool, and deal with it in the main thread, e.g. combine the exception information and throw it to Spring Advice exception handler.
uj5u.com熱心網友回復:
假設兩個期貨
CompletableFuture<List<String>> service1Result = …
CompletableFuture<List<String>> service2Result = …
將兩種期貨結合起來的直接方法是
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
但是如果任何一個未來失敗,這個未來就會失敗。
只有在兩個futures都失敗時才失敗并從兩個throwable構造一個新的例外,我們可以定義兩個實用方法:
private static Throwable getThrowable(CompletableFuture<?> f) {
return f.<Throwable>thenApply(value -> null)
.exceptionally(throwable -> throwable).join();
}
private static <T> T throwCustom(Throwable t1, Throwable t2) {
throw new CustomException(t1.getMessage() " and " t2.getMessage());
}
該方法getThrowable旨在用于已知將例外完成的未來。我們可以呼叫join并捕獲例外,但如上所示,我們還可以將轉換未來轉換為包含可拋出物件作為其值的非例外未來。
然后,我們可以將以上所有內容結合起來
CompletableFuture<List<String>> failOnlyWhenBothFailed = both
.thenApply(list -> both)
.exceptionally(t ->
!service1Result.isCompletedExceptionally()? service1Result:
!service2Result.isCompletedExceptionally()? service2Result:
throwCustom(getThrowable(service1Result), getThrowable(service2Result)))
.thenCompose(Function.identity());
在傳遞給 的函式中exceptionally,已知傳入的期貨已經完成,因此我們可以使用實用方法來提取可拋出物件并拋出新例外。
這樣做的好處是生成的構造是非阻塞的。
但是在您的情況下,您希望等待完成而不是回傳未來,因此我們可以簡化操作:
CompletableFuture<List<String>> both = service1Result.thenCombine(service2Result,
(list1, list2) -> Stream.concat(list1.stream(), list2.stream())
.collect(Collectors.toList()));
both.exceptionally(t -> null).join();
if(service1Result.isCompletedExceptionally()&&service2Result.isCompletedExceptionally()){
Throwable t1 = getThrowable(service1Result), t2 = getThrowable(service2Result);
throw new CustomException(t1.getMessage() " and " t2.getMessage());
}
List<String> result = (
service1Result.isCompletedExceptionally()? service2Result:
service2Result.isCompletedExceptionally()? service1Result: both
).join();
通過使用both.exceptionally(t -> null).join();,我們等待兩個作業的完成,而不會在失敗時拋出例外。在此陳述句之后,我們可以安全地使用isCompletedExceptionally()來檢查我們知道要完成的期貨。
因此,如果兩者都失敗,我們提取可拋出物件并拋出我們的自定義例外,否則,我們檢查哪些任務成功并提取其中一個或兩者的結果。
uj5u.com熱心網友回復:
CompletableFutures 處理起來相當麻煩,但這里將是一種更具功能性和反應性的 IMO 方法。
我們需要sequence來自https://stackoverflow.com/a/30026710/1225328的方法:
static<T> CompletableFuture<List<T>> sequence(List<CompletableFuture<T>> com) {
return CompletableFuture.allOf(com.toArray(new CompletableFuture<?>[0]))
.thenApply(v -> com.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList())
);
}
然后,我用Optional它來表示操作的狀態,但是Trymonad會更適合(所以如果你的代碼庫中有這樣的實用程式,請使用一個 - Java 還沒有自帶一個):
CompletableFuture<Optional<List<Object>>> future1 = service1.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
CompletableFuture<Optional<List<Object>>> future2 = service2.getSomething().thenApply(Optional::of).exceptionally(e -> {
// log e
return Optional.empty();
});
現在等待兩個期貨并在可用時處理結果:
CompletableFuture<List<Object>> mergedResults = sequence(Arrays.asList(future1, future2)).thenApply(results -> {
Optional<List<Object>> result1 = results.get(0);
Optional<List<Object>> result2 = results.get(1);
if (result1.isEmpty() && result2.isEmpty()) {
throw new CustomException(...);
}
// https://stackoverflow.com/a/18687790/1225328:
return Stream.of(
result1.map(Stream::of).orElseGet(Stream::empty),
result2.map(Stream::of).orElseGet(Stream::empty)
).collect(Collectors.toList());
});
然后你最好mergedResults直接回傳并讓框架為你處理它,這樣你就不會阻塞任何執行緒,或者你可以.get()在它上面(這將阻塞執行緒),這將拋出一個ExecutionExceptionif your CustomException(或任何其他例外)被拋出(可在 中訪問e.getCause())。
如果您已經在使用 Project Reactor(或等效項),這看起來會更簡單,但想法大致相同。
uj5u.com熱心網友回復:
僅僅因為我提出了它作為一種可能性,我想這樣的事情應該在使用 Project Reactor 的世界中作業:
首先,我們將服務修改為 return Monos,這很容易使用Mono.fromFuture(或者,如果并且一旦準備好,您可以將一項服務轉換為 Reactor 樣式):
@Service
public class Serv1 implements ServInf {
public Mono<List<Obj>> getSomething(int id) {
// The service ensures that the list is never null, but it can be empty
return Mono.fromFuture(CompletableFuture.completedFuture(/* calling an external RESTful API */));
//This Mono will either emit the result or complete with an error in case of Exception
}
}
//similar for Serv2
(反應式)端點可能如下所示(請參閱下面的編號注釋):
public Mono<WrapperObj> getById(String id) {
WrapperObj wrapper = new WrapperObj(); //1
Mono<Optional<List<Obj>>> s1Mono = serv1.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS1ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
Mono<Optional<List<Obj>>> s2Mono = serv2.getSomething(id)
.subscribeOn(Schedulers.boundedElastic()) //2
.map(Optional::ofNullable) //3
.doOnError(wrapper::setS2ErrorResult) //4
.onErrorResume(t -> Mono.just(Optional.empty())); //5
return s1Mono
.zipWith(s2Mono) //6
.map(result ->
//transforms non-error results and merges them into the wrapper object
transformResult(result.getT1().orElse(null), result.getT2().orElse(null), wrapper) //7
)
.switchIfEmpty(Mono.just(wrapper)) //8
;
}
注釋:
結果用于“累積”結果和例外
在執行緒池上呼叫服務
boundedElastic,推薦用于較長的 IO 任務。將結果包裝在
Optional. 我使用一個空的 Optional 作為錯誤完成的便利結果,因為nulls 不能很好地通過 Reactor 傳播。如果服務呼叫拋出例外,我們可以在
WrapperObj. 這類似于您對 的使用AtomicReference,但沒有創建額外的物件。However, such an exception would cause
zipWith(6) to fail, so if this happens we substitute theOptional.empty()result.zipWithcreates a tuple of both resultsWe process these results, replacing
What's left is to transform the two (non-exceptional) results:
private WrapperObj transformResult(List<Obj> s1Result, List<Obj> s2Result, WrapperObj wrapper) { //perform your result transformation and //flesh out 'wrapper' with the results //if there was an exception, the 'wrapper' contains the corresponding exception values return wrapper; }
轉載請註明出處,本文鏈接:https://www.uj5u.com/qukuanlian/437713.html
