java8已經在日常開發編碼中非常普遍了,掌握運用好它可以在開發中運用幾行精簡代碼就可以完成所需功能,
今天將介紹CompletableFuture的在生產環境如何使用實踐,CompletableFuture類作為Java 8 Concurrency API改進而引入,熟悉的同學應該了解在Java 9 也有對CompletableFuture有一些改進,橘子之后再進入講解,
閱讀這篇文章需要知道的前置知識點有,函式式編程,執行緒池原理等,還不熟悉的同學可以看看之前的文章,話不多說,開始吧,
為了更好的表達,我們結合例子講解,假設今天小橘收到TL任務,要求完成實時拉取資料的功能,完成后告知拉取完成,假設拉取資料需要從A,B,C三個服務中獲取,拉取完成推送需要呼叫D服務,
需求變更1:拉取資料需要從E服務獲取,但是會依賴從A服務獲取的結果,
需求變更2:從A服務一次能拉去一萬+資料,但是E服務的性能支撐不了大呼叫,在Provider端有限流兜底,
需求變更3:拉取資料程序中需要保證資料完整性,不能出現統計錯誤,
為什么使用CompletableFuture
橘友們說了,這個可以用jdk5.0提供的Future
OK,簡單實作這個功能沒有問題,但是有什么缺陷,需要怎么可以改進嘛?
我們通過原始碼注釋可以看到Future類回傳的結果需要阻塞等待get方法回傳結果,它提供了isDone()方法檢測異步計算是否已經結束,get()方法等待異步操作結束,以及獲取計算的結果,等到所有Future任務完成,通知執行緒獲取結果并合并,
從性能上,需要等待 Future 集合中的所有任務都完成(此需求沒問題,接著往下看), 從健壯性上,Futrue介面沒有方法去進行計算組合或者處理可能出現的錯誤,從功能擴展上,Future介面無法進行多個異步計算之間相互獨立,同時第二個又依賴于第一個的結果,而今天的主角CompletableFuture都可以滿足上述功能,具有大約50種不同的構成,結合,執行異步計算步驟和處理錯誤,(全部學習完所有方法是不現實的,掌握靈魂和核心方法即可依法炮制)
CompletableFuture API 使用
API太多,簡單列舉,讀者自行學習即可,本文重點不在介紹api
/**
任務 A 執行完執行 B,執行 B 不需要依賴 A 的結果同時 B 不回傳結果,
*/
CompletableFuture.supplyAsync(() -> "resultA").thenRun(() -> {});
/**
任務 A 執行完執行 B,B 執行依賴 A 結果同時 B 不回傳結果
*/
CompletableFuture.supplyAsync(() -> "resultA").thenAccept(resultA -> {});
/**
任務 A 執行完執行 B,B 執行依賴 A 結果同時 B 回傳結果
*/
CompletableFuture.supplyAsync(() -> "resultA").thenApply(resultA -> resultA + " resultB");
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "orange")
.thenCompose(s -> CompletableFuture.supplyAsync(() -> s + " csong"));
//true
assertEquals("orangecsong", completableFuture.get());
你的疑問:該thenCompose方法不和thenApply一樣實作結果合并計算嘛?
剛學習時候確實有點迷惑,其實他們的內部形式是不一樣的,它們與Java 8中可用的Stream和Optional類的map和flatMap方法是有著類似的設計思路在里面的,都是接收一個CompletableFuture并將其應用于計算結果,但thenCompose(flatMap)方法接收一個函式,該函式回傳相同型別的另一個CompletableFuture物件,
CompletableFuture<String> completableFuture
= CompletableFuture.supplyAsync(() -> "orange")
.thenCombine(CompletableFuture.supplyAsync(
() -> " chizongzi"), (s1, s2) -> s1 + s2));
assertEquals("orange chizongzi", completableFuture.get());
thenCombine方法旨在當你想要使用多個計算結果時,而后續的處理同時需要依賴回傳值,第一個計算結果回傳 "orange",第二個計算結果回傳 "chizongzi",對結果進行拼接,那么結果就是"orange chizongzi" 啦,你可能會問如果結果無需處理呢?thenAcceptBoth將可以實作你的功能,那么它和thenApply的區別又是啥呢?
thenCompose()方法是使用前一個Future作為引數,它會直接使結果變新的Future,而不是我們在thenApply()中到的嵌套Future,而是用來連接兩個CompletableFuture,是生成一個新的CompletableFuture,因此,如果想要繼續嵌套鏈接CompletableFuture 方法,那么最好使用thenCompose(),
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs){...}
當我們需要并行執行多個任務時,我們通常希望等待所有它們執行,然后處理它們的組合結果,CompletableFuture提供了allOf靜態方法允許等待所有的完成任務,但是它回傳型別是CompletableFuture ,局限性在于它不會回傳所有任務的綜合結果,相反,你必須手動從Futures獲取結果,那么怎么解決呢,CompletableFuture提供了join()可以解決,這里小橘用Stream實作同樣可以的,
String multiFutures= Stream.of(future1, future2, future3)
.map(CompletableFuture::join)
.collect(Collectors.joining(" "));
assertEquals("Today is sun", multiFutures);
那么 CompletableFuture 針對例外是如何處理的呢?
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn);
public <U> CompletionStage<U> handle(BiFunction<? super T, Throwable, ? extends U> fn);
CompletableFuture.supplyAsync(() -> "resultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
//如果resultA,resultB,resultC在獲取中有例外
CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> {
throw new RuntimeException();
}).exceptionally(ex -> "errorResultA")
.thenApply(resultA -> resultA + " resultB")
.thenApply(resultB -> resultB + " resultC")
上面的代碼中,任務 A 拋出例外,然后通過exceptionally() 方法處理了例外,并回傳新的結果,這個新的結果將傳遞給任務 B,如果inovke future.join方法結果將會輸出 "errorResultA resultB result C"
上述方法基本就是底層函式式api的使用,聰明的橘友們實踐起來吧!
CompletableFuture 例子
Talk is cheap , show me code,自從上篇 你還在擔心rpc介面超時嗎 文章末尾講述大批量呼叫,其中是順序invoke呼叫,其實我們分析,異步呼叫利用CompletableFuture需要怎么實作呢?
/**
* @Description:
* @author: orangeCs
* @create: 2020-06-25
*/
public class AsyncInvokeUtil {
private AsyncInvokeUtil() {}
/**
* @param paramList 源資料 (需處理資料載體)
* @param buildParam 中轉函式 (獲取的結果做一層trans,來滿足呼叫服務條件)
* @param transParam 中轉函式 (獲取的結果做一層trans,來滿足呼叫服務條件)
* @param processFunction 中轉處理函式
* @param size 分批大小
* @param executorService 暴露外部自定義實作執行緒池(demo沒判空,可以做成非必傳)
* @param <R>
* @param <T>
* @param <P>
* @param <k>
* @return
* @throws ExecutionException
* @throws InterruptedException
*/
public static <R, T, P, k> List<R> partitionAsyncInvokeWithRes(List<T> paramList,
Function<List<T>, P> buildParam,
Function<P, List<k>> transParam,
Function<List<k>,List<R>> processFunction,
Integer size,
ExecutorService executorService) throws ExecutionException, InterruptedException {
List<CompletableFuture<List<R>>> completableFutures = Lists.partition(paramList, size).stream()
.map(buildParam)
.map(transParam)
.map(eachList -> CompletableFuture.supplyAsync(() ->
processFunction.apply(eachList), executorService))
.collect(Collectors.toList());
//get
CompletableFuture<Void> finishCompletableFuture = CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[0]));
finishCompletableFuture.get();
return completableFutures.stream().map(CompletableFuture::join)
.filter(Objects::nonNull).reduce(new ArrayList<>(), (resList1, resList2) -> {
resList1.addAll(resList2);
return resList1;
});
}
}
僅僅這一篇文章是不夠的,任何知識都是長期積累,反復思考才能變成自己的東西,在浮躁的社會,我們年輕人切勿浮躁,今天介紹到這里了,喜歡博主的朋友點個關注哦,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/151072.html
標籤:Java
上一篇:獲取鍵盤輸入常用的兩種方法
