Stream API
上篇內容我們學習了Stream的大部分終端操作,我們這篇著重了解下Stream中重要的終端操作:collect,
collect 方法
| 序號 | 支持的類 | 方法定義 | 方法說明 |
|---|---|---|---|
| 1 | Stream |
對此流的元素執行 mutable reduction操作, | |
| 2 | Stream |
<R, A> R collect(Collector<? super T, A, R> collector); | 使用 Collector對此流的元素執行 mutable reduction Collector, |
以下代碼見 StreamTerminalOperationTransformTest,
實作3引數轉換介面
序號1的方法,傳遞了3個引數,引數1為創建新結果容器的函式;引數2為累加器函式,將引數1和流內元素執行累加操作;引數3為組合器函式,并行執行時會使用該函式,
同步執行時,該方法相當于執行:
R result = supplier.get();
for (T element : this stream) {
accumulator.accept(result, element);
}
return result;
我們撰寫如下代碼,看下實際效果
// 使用collect方法實作字串連接
log.info("拼接字串為:{}",
Stream.of("I", "love", "you", "too")
.collect(StringBuilder::new, (b1, b2) -> {
log.info("累加執行:{} + {}", b1, b2);
b1.append(b2);
}, (b1, b2) -> {
log.info("組合執行:{} ++ {}", b1, b2);
b1.append(b2);
})
.toString());
以上代碼將輸出如下日志:
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + I
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行:I + love
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行:Ilove + you
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行:Iloveyou + too
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 拼接字串為:Iloveyoutoo
并行執行時,該方法相當于執行:
R result1 = supplier.get();
R result2 = supplier.get();
R result3 = supplier.get();
R result4 = supplier.get();
// 累加執行,此處為并發(多執行緒)執行,每行代表一個執行緒
accumulator.accept(result1, element1);
accumulator.accept(result2, element2);
accumulator.accept(result3, element3);
accumulator.accept(result4, element4);
// ...
// accumulator.accept(resultN, elementN);
// 開始組合,此處為并發(多執行緒)執行,每行代表一個執行緒
combiner.accept(result1, result2);
combiner.accept(result3, result4);
combiner.accept(result1, result3);
// combiner.accept(result1, resultN);
return result1;
將上述的代碼改為.parallel()方式呼叫,將輸出如下日志:
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + you
[ForkJoinPool.commonPool-worker-3] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + I
[ForkJoinPool.commonPool-worker-2] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + too
[ForkJoinPool.commonPool-worker-2] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 組合執行:you ++ too
[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 累加執行: + love
[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 組合執行:I ++ love
[ForkJoinPool.commonPool-worker-1] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 組合執行:Ilove ++ youtoo
[main] INFO top.todev.note.web.flux.stream.StreamTerminalOperationTransformTest - 拼接字串為:Iloveyoutoo
注意:上述日志中出現的ForkJoinPool.commonPool-worker-N為并發(多執行緒)執行時的執行緒名,
實作Collector介面
實作Collector需要實作如下4個介面:
// 一個創建并回傳一個新的可變結果容器的函式,
Supplier<A> supplier();
// 將值折疊成可變結果容器的函式,
BiConsumer<A, T> accumulator();
// 一個接受兩個部分結果并將其合并的函式,
BinaryOperator<A> combiner();
// 執行從中間累積型別 A到最終結果型別 R的最終 R ,
Function<A, R> finisher();
// 回傳一個 Collector.Characteristics 型別的Set, 表示該收集容器的特征,
Set<Characteristics> characteristics();
collect方法執行時,他們的呼叫流程如下:
- 創建新的結果容器(supplier())
- 將新的資料元素并入結果容器(accumulator())
- 將兩個結果容器組合成一個(combiner())
- 在容器上執行可選的最終變換(finisher())
簡單來講,生成容器A,通過accumulator針對A及流元素T執行累加,(如果并行存在的話)對多個A執行組合combiner,最終執行finisher后由A轉換為R,對于使用者來說,A為中間變數,無關其實作細節,
我們實作一個計算整數流的平均數的Collector,代碼如下:
// 使用collector實作求ping均值
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.parallel()
.collect(new Collector<Integer, long[], Double>() {
@Override
public Supplier<long[]> supplier() {
return () -> new long[2];
}
@Override
public BiConsumer<long[], Integer> accumulator() {
return (a, t) -> {
log.info("{}累加{}", a, t);
a[0] += t;
a[1]++;
};
}
@Override
public BinaryOperator<long[]> combiner() {
return (a, b) -> {
log.info("{}組合{}", a, b);
a[0] += b[0];
a[1] += b[1];
return a;
};
}
@Override
public Function<long[], Double> finisher() {
return (a) -> a[1] == 0 ? 0 : new Long(a[0]).doubleValue() / a[1];
}
@Override
public Set<Characteristics> characteristics() {
Set<Characteristics> set = new HashSet<>();
set.add(Characteristics.CONCURRENT);
return set;
}
}
)
);
常用Collector
通過上面的示例,我們實作了一個自定義的Collector,我們發現實作一個自定義的Collector還是比較麻煩的,需要實作5個介面,
Java 開發者們已經想到了這個問題,他們額外提供了一個 of 方法,可以通過lambda的方式創建 collector,類似 collect 中傳遞幾個引數:提供者、累加器、組合器、完成器以及特征配置,此處我們就不細講了,
Java 開發者們更為貼心的為我們創建了一些常用的 Collector ,讓我們可以直接使用,這些常用的 Collector 實作放在 Collectors 類下,我們來了解下,
統計平均值 averagingXxx 的使用
Collectors 提供了 averagingDouble、averagingLong、averagingInt 3種統計平均值的 Collector 實作類,以下代碼以 averagingInt 為例,由于使用方式相似,我們就不舉例了,
// 使用collector實作求均值
log.info("[1, 2, 3, 4, 5, 6]的平均值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.averagingInt(n -> n))
);
統計元素個數 counting 的使用
該方法和 Stream 中的 count 方法一樣,
// 使用collector獲取元素數量
log.info("[1, 2, 3, 4, 5, 6]的個數:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.counting())
);
統計總和 summingXxx 的使用
Collectors 提供了 summingDouble、summingLong、summingInt 3種統計求和值的 Collecto r實作類,同時還提供了 summarizingDouble 、 summarizingLong 、summarizingInt 3種統計物件的 Collector 實作類,以下代碼以 summingInt 為例,由于使用方式相似,我們就不舉例了,
// 使用collector獲取總和
log.info("[1, 2, 3, 4, 5, 6]的總和:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.summingInt(n -> n))
);
統計最小元素 minBy 的使用
// 使用collector獲取最小元素
log.info("[1, 2, 3, 4, 5, 6]的最小值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.minBy(Integer::min))
.get()
);
統計最大元素 maxBy 的使用
// 使用collector獲取最da元素
log.info("[1, 2, 3, 4, 5, 6]的最大值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.maxBy(Integer::max))
.get()
);
統計累加處理 reducing 的使用
reducing 和 Stream 中的 reduce 操作方法類似,我們就不詳述了,
// 使用collector實作求均值
log.info("[1, 2, 3, 4, 5, 6]的求和:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.reducing(0, Integer::sum))
);
轉換映射 mapping 的使用
mapping 支持將 第一個引數的結果再次執行轉換,即向下游傳遞,
log.info("[1, 2, 3, 4, 5, 6]每個增加20后的平均值:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.mapping(n -> n + 20, Collectors.averagingInt(n -> n)))
);
轉換連接 joining 的使用
joining 提供了 3 種多載方法,支持傳遞 分隔符、前綴、后綴等,
// 使用collector連接字串
log.info("連接字串為:{}",
Stream.of("I", "love", "you", "too")
.collect(Collectors.joining(" ", "Java, ", "!"))
);
轉換為集合 toList 的使用
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]轉換為集合:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.toList())
);
轉換為Map toMap 的使用
toMap 提供了 3 種多載方法,除了指定 Key 和 Value 的生成器外,區別在于對于 Key 重復時, Value的處理方式;以及初始Map的生成器,
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]轉換為Map:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.toMap(Object::toString, n -> n, Integer::sum))
);
轉換為Set toSet 的使用
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]的轉換為Set:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.toSet())
);
轉換為分組 groupingBy 的使用
分組函式將流中元素按某種定義分組,也提供了 2 種多載方法,支持遞回向下游分組,
log.info("[1, 2, 3, 4, 5, 6, 5, 3, 6]的分組資料:{}",
Stream.of(1, 2, 3, 4, 5, 6, 5, 3, 6)
.collect(Collectors.groupingBy(n -> n))
);
轉換為磁區 partitioningBy 的使用
磁區函式將流中元素按條件分為2組,也提供了 2 種多載方法,支持遞回向下游分組,
log.info("[1, 2, 3, 4, 5, 6]的奇偶磁區資料:{}",
Stream.of(1, 2, 3, 4, 5, 6)
.collect(Collectors.partitioningBy(n -> n %2 == 0))
);
其他方法
Collectors 中還提供了 groupingByConcurrent 、 toCollection 、 toConcurrentMap 等幾種支持并發的 Collector 實作,用法基本和非并發的相同,我們就不詳述了,
原始碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note
以上是本期筆記的內容,我們下期見,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/165054.html
標籤:Java
下一篇:Flink 如何分流資料
