tream
Stream是在Java SE 8 API添加的用于增強集合的操作介面,可以讓你以一種宣告的方式處理集合資料,將要處理的集合看作一種流的創建者,將集合內部的元素轉換為流并且在管道中傳輸, 并且可以在管道的節點上進行處理, 比如篩選,排序,聚合等,元素流在管道中經過中間操作(intermediate operation)的處理,最后由最終操作(terminal operation)得到前面處理的結果,Stream的繼承關系圖如下,且容我慢慢抽絲剝繭細細道來,
過濾,轉換,聚合,歸約
Stream.of("one", "two", "three", "four") .filter(e -> e.length() > 3) .peek(e -> System.out.println("Filtered value: " + e)) .map(String::toUpperCase) .peek(e -> System.out.println("Mapped value: " + e)) .collect(Collectors.toList());在沒有Stream之前,我們對集合資料的處理到多是外部遍歷,然后做資料的聚合用算,排序,merge等等,這屬于OO思想,在引入Java SE 8引入FP之后,FP的操作可以提高Java程式員的生產力,,基于型別推斷的lambda運算式可以 讓程式員寫出高效率、干凈、簡潔的代碼,可以避免冗余的代碼,根據給定的集合操作通過stream()方法創建初始流,配合map(),flatMap(),filter()對集合資料進行過濾,轉換,api呼叫我這里就不多說了,直接從原始碼入手,看上圖最核心的就是類為AbstractPipeline,ReferencePipeline和Sink介面.AbstractPipeline抽象類是整個Stream中流水線的高度抽象了源頭sourceStage,上游previousStage,下游nextStage,定義evaluate結束方法,而ReferencePipeline則是抽象了過濾,轉換,聚合,歸約等功能,每一個功能的添加實際上可以理解為卷心菜,菜心就是源頭,每一次加入一個功能就相當于重新長出一片葉子包住了菜心,最后一個功能集成完畢之后整顆卷心菜就長大了,而Sink介面呢負責把整個流水線串起來,然后在執行聚合,歸約時候調AbstractPipeline抽象類的evaluate結束方法,根據是否是并行執行,呼叫不同的結束邏輯,如果不是并行方法則執行terminalOp.evaluateSequential否則就執行terminalOp.evaluateParallel,非并行執行模式下則是執行的是AbstractPipeline抽象類的wrapAndCopyInto方法去呼叫copyInto,呼叫前會先執行一下wrapSink,用于剝開這個我們在流水線上產生的卷心菜,從下游向上游去遍歷AbstractPipeline,然后包裝到Sink,然后在copyInto方法內部迭代執行對應的方法,最后完成呼叫,
并行執行實際上是構建一個ForkJoinTask并執行invoke去提交到ForkJoinPool執行緒池,
BaseStream
流的基本介面,該介面制定流可以支持無序,順序,并行的,Stream實作了BaseStream介面,
-
Iterator<T> iterator();外部迭代器
-
Spliterator<T> spliterator();用于創建一個內部迭代器
-
isParallel用于判斷該stream是否是并行的
-
S sequential();標識該stream創建是順序執行的
-
S parallel();標識該stream創建是并行的,需要使用
ForkJoinPool -
S unordered();標識該stream創建是無序的
-
S onClose(Runnable closeHandler);當stream關閉的時候執行一個方法回呼去關閉流,
PipelineHelper
該抽象類主要定義了操作管道的核心方法,并且能收集到流管道內的所有資訊,如通過
TerminalOp#evaluateParallel用于執行并行流操作,通過TerminalOp#evaluateSequential執行順序流的操作,
-
abstract StreamShape getSourceShape();
-
abstract int getStreamAndOpFlags();
-
abstract<P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator);
將此時間的管道內的元素應用到提供的
Spliterator,并將結果發送到提供的接收器sink里
-
abstract<P_IN, S extends Sink<P_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator);
用于輸出回傳值的大小,
-
abstract<P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
用于將從
Spliterator獲得的元素推入提供的接收器中Sink,如果已知流管道中有短路階段(包含StreamOpflag#SHORT_CURRENT),則在每個元素之后執行一下Sink#cancellationRequested(),如果回傳請求true,則執行終止,這個方法被實作之后需要遵守Sink的協議即:Sink#begin->Sink#accept->Sink->end
-
abstract <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator);
用于將從
Spliterator獲得的元素推入提供的接收器中Sink,在每個元素之后執行一下Sink#cancellationRequested(),如果回傳請求true,則執行終止,這個方法被實作之后需要遵守Sink的協議即:Sink#begin->Sink#accept->Sink->end
-
abstract<P_IN> Sink<P_IN> wrapSink(Sink<P_OUT> sink);
該方法主要用于包裝sink,從下游向上游去遍歷
AbstractPipeline,然后包裝到一個Sink內,用于然后在copyInto方法內部迭代執行對應的方法,
-
abstract Node.Builder<P_OUT> makeNodeBuilder(long exactSizeIfKnown,IntFunction<P_OUT[]> generator);
用于構造一個節點Builder,轉換為陣列去處理陣列型別和PipelineHelper定義的輸出型別一樣,
-
abstract<P_IN> Node<P_OUT> evaluate(Spliterator<P_IN> spliterator,boolean flatten,IntFunction<P_OUT[]> generator);
@Override @SuppressWarnings("unchecked") final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { // @@@ Optimize if op of this pipeline stage is a stateful op return evaluateToNode(this, spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } }該方法將源拆分器應用到管道內的所有元素,針對陣列處理,如果管道沒有中間(
filter,map)操作,并且源由一個節點支持(源頭),則該節點將被回傳(內部遍歷然后回傳),這減少了由有狀態操作和回傳陣列的終端操作組成的管道的復制.例如:stream.sorted().toArray();該方法對應到AbstractPipeline內部,代碼如下:
AbstractPipeline
“管道”類的抽象基類,是流介面及其原始專門化的核心實作,用來表示流管道的初始部分,封裝流源和零個或多個中間操作,對于順序流和沒有狀態中間操作的并行流、并行流,管道中資料的處理是在一次“阻塞”所有操作的程序中完成的也就是最后才去處理,對于具有狀態操作的并行流,執行被分成多個段,其中每個狀態操作標記一個段的結束,每個段被單獨評估,結果被用作下一個段的輸入,上述所有情況,都是達到終端操作才開始處理源資料,
AbstractPipeline(Supplier<? extends Spliterator<?>> source,
int sourceFlags, boolean parallel)
創建源Source stage 第一個引數指定一個Supplier介面(工廠模式,只能生成Spliterator<?>的物件,根據傳入的lambda實作,
<? extends Spliterator<?泛型的PECS原則了解一下,)
AbstractPipeline(Spliterator<?> source,
int sourceFlags, boolean parallel)
AbstractPipeline(Spliterator<?> source, int sourceFlags, boolean parallel) { this.previousStage = null; this.sourceSpliterator = source; this.sourceStage = this; this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK; // The following is an optimization of: // StreamOpFlag.combineOpFlags(sourceOrOpFlags, StreamOpFlag.INITIAL_OPS_VALUE); this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE; this.depth = 0; this.parallel = parallel; }創建源Source stage 第一個引數制定這個拆分器,和上面的構造方式一樣,直接分析一下這個方法:
創建Stream 源階段的時候
previousStage為null,this.sourceOrOpFlags = sourceFlags & StreamOpFlag.STREAM_MASK;用于設定當前階段的標識位,this.combinedFlags = (~(sourceOrOpFlags << 1)) & StreamOpFlag.INITIAL_OPS_VALUE;添加源階段的對流的操作標識,這個combinedFlags是流在整個管道內部所有操作的合集,在最后的規約操作的時候去決議出來,
-
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags)
AbstractPipeline(AbstractPipeline<?, E_IN, ?> previousStage, int opFlags) { if (previousStage.linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); previousStage.linkedOrConsumed = true; previousStage.nextStage = this; this.previousStage = previousStage; this.sourceOrOpFlags = opFlags & StreamOpFlag.OP_MASK; this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags); this.sourceStage = previousStage.sourceStage; if (opIsStateful()) sourceStage.sourceAnyStateful = true; this.depth = previousStage.depth + 1; }根據上游創建下游
Pipeline,
this.sourceStage = previousStage.sourceStage;,用于上游和下游關聯,this.combinedFlags = StreamOpFlag.combineOpFlags(opFlags, previousStage.combinedFlags);將上游的操作標識位添加到本階段的操作標識位中,depth記錄整個管道的中間運算元,
-
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp)
進行終端匯聚計算,執行最終的計算,得到結果,根據是否是并行執行,呼叫不同的結束邏輯,如果不是并行方法則執行
terminalOp.evaluateSequential否則就執行terminalOp.evaluateParallel,
-
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator)
final Node<E_OUT> evaluateToArrayNode(IntFunction<E_OUT[]> generator) { if (linkedOrConsumed) throw new IllegalStateException(MSG_STREAM_LINKED); linkedOrConsumed = true; if (isParallel() && previousStage != null && opIsStateful()) { depth = 0; return opEvaluateParallel(previousStage, previousStage.sourceSpliterator(0), generator); } else { return evaluate(sourceSpliterator(0), true, generator); } }處理流轉換陣列,
轉換陣列的時候,如果是并行流并且不是源階段,而且呼叫過
sorted||limit||skip||distinct這些有狀態的操作之后,這里是個模版方法呼叫,實際上是通過呼叫DistinctOps||SortedOps||SliceOps這些實作的opEvaluateParallel方法,提交到ForkJoin執行緒池來轉換陣列,串行執行的時候直接執行evaluate(sourceSpliterator(0), true, generator);
-
evaluate(sourceSpliterator(0), true, generator);
@Override @SuppressWarnings("unchecked") final <P_IN> Node<E_OUT> evaluate(Spliterator<P_IN> spliterator, boolean flatten, IntFunction<E_OUT[]> generator) { if (isParallel()) { // @@@ Optimize if op of this pipeline stage is a stateful op return evaluateToNode(this, spliterator, flatten, generator); } else { Node.Builder<E_OUT> nb = makeNodeBuilder( exactOutputSizeIfKnown(spliterator), generator); return wrapAndCopyInto(nb, spliterator).build(); } } @Override final <P_IN> Node<P_OUT> evaluateToNode(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator) { return Nodes.collect(helper, spliterator, flattenTree, generator); } // Nodes.collect方法 public static <P_IN, P_OUT> Node<P_OUT> collect(PipelineHelper<P_OUT> helper, Spliterator<P_IN> spliterator, boolean flattenTree, IntFunction<P_OUT[]> generator) { long size = helper.exactOutputSizeIfKnown(spliterator); if (size >= 0 && spliterator.hasCharacteristics(Spliterator.SUBSIZED)) { if (size >= MAX_ARRAY_SIZE) throw new IllegalArgumentException(BAD_SIZE); P_OUT[] array = generator.apply((int) size); new SizedCollectorTask.OfRef<>(spliterator, helper, array).invoke(); return node(array); } else { Node<P_OUT> node = new CollectorTask.OfRef<>(helper, generator, spliterator).invoke(); return flattenTree ? flatten(node, generator) : node; } }具體的執行方法,用于吧管道內部的輸出結果放到Node中,
如果是源是并行流的情況,以ReferencePipeline參考管道來看主要執行的是 return Nodes.collect(helper, spliterator, flattenTree, generator);,該collect方法內部根據切割器有無Spliterator.SUBSIZED確定了生成的Node的長度,主要作業是創建一個Task提交到執行緒池,然后呼叫invoke拿到結果,示例代碼Arrays.asList("2","22","222").parallelStream().skip(2).toArray(); 整個流程如下:
串行執行示例代碼Arrays.asList("2","22","222").stream().skip(2).toArray(); 整個流程如下:
-
final Spliterator<E_OUT> sourceStageSpliterator()
獲取Stream源頭設定的拆分器,如果設定有則回傳并且把源拆分器置空,如果有Supplier則呼叫get方法回傳拆分器并且把源拆分器置空,
-
public final S sequential()
設定為串行流 ,設定源的paraller屬性為false,終態方法不允許重寫
-
public final S sequential()
設定為并行流 ,設定源的paraller屬性為true,終態方法不允許重寫
-
public void close()
關閉管道的方法,在關閉的時候會把管道使用標志設定為false,拆分器設定為null,如果源的回呼關閉Job存在不為null時則invoker這個回呼Job,
-
public S onClose(Runnable closeHandler)
用于注冊關閉的回呼job,在呼叫close的時候用于去執行這個回呼job,
-
public Spliterator<E_OUT> spliterator()
和
sourceStageSpliterator方法一樣的功能,只不過不是終態方法,可以重寫用于自定義的拓展,
-
public final boolean isParallel()
用于盤帶你當前管道是否是并行流,
-
final int getStreamFlags()
獲取流的標志和Stream的包含的所有操作,
-
private Spliterator<?> sourceSpliterator(int terminalFlags) {
獲取源拆分器,和
sourceStageSpliterator方法一樣的功能,針對是并行流時候,并且是創建Stream階段的話有中間狀態,會組合流標志和操作構建拆分器,如果傳入的操作碼不等于0,那么則添加到拆分器的操作碼中,
-
final StreamShape getSourceShape()
輸出Stream源的型別,(參考 OR int OR Double OR Long)
-
final <P_IN> long exactOutputSizeIfKnown(Spliterator<P_IN> spliterator)
獲取期望的size,如果拆分器如果有SIZE標志,呼叫拆分器的getExactSizeIfKnown方法,否則回傳-1,
-
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator)
封裝整個管道的階段,包裝在Sink中,把每一個階段串聯起來,包裝在Sink內部的
downstream.
wrapAndCopyInto代碼執行流程如下:
看完三件事??
如果你覺得這篇內容對你還蠻有幫助,我想邀請你幫我三個小忙:
-
點贊,轉發,有你們的 『點贊和評論』,才是我創造的動力,
-
關注公眾號 『 java爛豬皮 』,不定期分享原創知識,
-
同時可以期待后續文章ing??
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/226046.html
標籤:Java
上一篇:記一次 Java 服務性能優化
