這是場景:它可能會隨機生成一些資料,如果是,則需要遞回檢索資料,最后我需要獲取所有生成的資料。
interface DataProvider {
List<String> randomData(String url);
}
public static void main(String[] args) {
List<String> strings = fetch(Executors.newFixedThreadPool(4), new DataProvider() {
final Random random = new Random();
@Override
public List<String> randomData(String url) {
if (random.nextBoolean()) {
System.out.println("provide some data");
return List.of(UUID.randomUUID().toString());
}
return null;
}
}, List.of("a", "b", "c"));
System.out.println("results are: ");
System.out.println(strings);
}
private static List<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return new ArrayList<>();
List<CompletableFuture<List<String>>> collect =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.collect(Collectors.toList());
List<CompletableFuture<List<String>>> list = new ArrayList<>();
collect.forEach(item -> {
CompletableFuture<List<String>> listCompletableFuture = item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es);
list.add(listCompletableFuture);
});
return list.stream().flatMap(item -> item.join().stream()).collect(Collectors.toList());
}
有時程式會凍結,有時會列印一個空集合。(provide some data已列印)。
我哪里做錯了?我一點也不熟悉CompletableFuture,所以也許整個遞回呼叫都是錯誤的。(或者代碼可以更簡單,因為CompletableFuture有很多方法)。
uj5u.com熱心網友回復:
您的代碼確實存在一些問題:
邏輯問題:結果只能是一個空串列!
的結果fetch()將是:
- 如果
items為空,則為空串列 - 遞回應用于串列中的
fetch(randomData(item))每個專案的結果items由于沒有回傳非空串列的遞回葉,并且父呼叫也不向串列中添加任何內容,因此它只能回傳一個空結果。
也許您也想randomData()在結果中包含 ?
技術問題:CompletableFuture.join()在固定執行緒池中使用
您正在使用Executors.newFixedThreadPool(4). 顧名思義,執行緒的數量是固定的,所以當所有執行緒都用盡時,新的任務就會排隊。
問題是您正在join()等待其中一些新任務。所以一方面你阻塞了一個執行緒,另一方面你在等待你剛剛放入佇列的東西。
由于它是遞回的,如果它達到 4 的深度,就會死鎖。
防止這種情況的最簡單方法是使用ForkJoinPool. 當其中一個執行緒被阻塞時,這種池將產生新執行緒。
作為旁注, aForkJoinPool使用守護執行緒,因此它們不會阻止 JVM 終止。如果您想使用固定執行緒池,您必須呼叫shutdown()它以確保它允許終止 - 或將其配置為使用守護執行緒。
其他可能的改進
您的代碼正在構建大量中間集合并阻止大量join()呼叫。CompletableFuture由于它的實作,旨在鏈接相關的計算階段CompletionStage,而不僅僅是將其用作Future可以完成的。
在不更改當前邏輯的情況下,首先您可以fetch()return a Stream,這會洗掉很多collect()/stream()呼叫:
private static Stream<String> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return Stream.empty();
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenApplyAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return list.stream().flatMap(CompletableFuture::join);
}
為了達到這個目的,這里有什么改變:
- 僅將
forEach()/替換list.add()為collect.stream(…).map(…).collect(toList()) - 行內
collect變數 - 洗掉結果,
.collect(toList()).stream()因為它是多余的 - 將回傳型別更改為
Stream<String>,主要更改是: ** 將型別更改list為List<CompletableFuture<Stream<String>>>(因為內部Stream來自第二次map(… -> fetch())呼叫 **從 return 陳述句中洗掉collect()andstream()
請注意,您需要保留中間檔案list以確保在第一次呼叫CompletableFuture::join.
這樣更好,但呼叫仍然是同步的,需要 aForkJoinPool才能作業。既然里面的邏輯fetch()大多是異步的,那為什么不讓整個方法異步呢?
這主要需要將回傳型別更改為CompletableFuture<Stream<String>>并用于allOf()創建一個將在所有中間期貨完成后完成的期貨:
private static CompletableFuture<Stream<String>> fetch(ExecutorService es, DataProvider dataProvider, List<String> items) {
if (items == null || items.isEmpty())
return CompletableFuture.completedFuture(Stream.empty());
List<CompletableFuture<Stream<String>>> list =
items.stream()
.map(item -> CompletableFuture.supplyAsync(() -> dataProvider.randomData(item), es))
.map(item -> item.thenComposeAsync(strings -> fetch(es, dataProvider, strings), es))
.collect(Collectors.toList());
return CompletableFuture.allOf(list.toArray(new CompletableFuture[0]))
.thenApply(dummy -> list.stream().flatMap(CompletableFuture::join));
}
這現在是 100% 異步的,因為join()只會在已知已完成的期貨上呼叫(感謝allOf()),因此永遠不會阻塞。
它不再需要ForkJoinPool了。事實上,它甚至可以在newFixedThreadPool(1)! 好吧,只要你shutdown()在最后呼叫它......
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/406391.html
標籤:
上一篇:中間沒有空格的文本框
