Reactor 運算子
資料在回應式流中的處理,就像流過一條裝配流水線,Reactor 既是傳送帶,又是一個個的裝配工或機器人,原材料從源頭(最初的 Publisher )流出,經過一個個的裝配線中裝配工或機器人的工位加工(operator 操作),最終被加工成成品,等待被推送到消費者( subscribe 操作),
在 Reactor 中,每個運算子對 Publisher 進行處理,然后將 Publisher 包裝為另一個新的 Publisher ,就像一個鏈條,資料源自第一個 Publisher ,然后順鏈條而下,在每個環節進行相應的處理,最終,訂閱者(Subscriber )終結這個程序,所以, 回應式編程按照鏈式方式進行開發,
注意,如同 Java Stream 的終端操作,訂閱者( Subscriber )在沒有訂閱( subscribe )到一個發布者( Publisher )之前,什么也不會發生,
如同 Java Stream 的中間操作一樣,Reactor 的 Flux 和 Mono 也為我們提供了多種運算子(遠多于 Stream ),我們將它們分類如下:
| 序號 | 型別 | 運算子 |
|---|---|---|
| 1 | 轉換 | as, cast, collect, collectList, collectMap, collectMultimap, collectSortedList, concatMap, concatMapDelayError, concatMapIterable, elapsed, expand, expandDeep, flatMap, flatMapDelayError, flatMapIterable, flatMapSequential, flatMapSequentialDelayError, groupJoin, handle, index, join, map, switchMap, switchOnFirst, then, thenEmpty, thenMany, timestamp, transform, transformDeferred |
| 2 | 篩選 | blockFirst, blockLast, distinct, distinctUntilChanged, elementAt, filter, filterWhen, ignoreElements, last, next, ofType, or, repeat, retry, single, singleOrEmpty, sort, take, takeLast, takeUntil, takeUntilOther, takeWhile |
| 3 | 組合 | concatWith, concatWithValues, mergeOrderWith, mergeWith, startWith, withLatestFrom, zipWith, zipWithIterable |
| 4 | 條件 | defaultIfEmpty, delayUntil, retryWhen, switchIfEmpty |
| 5 | 時間 | delayElements, delaySequence, delaySubscription, sample, sampleFirst, sampleTimeout, skip, skipLast, skipUntil, skipUntilOther, skipWhile, timeout |
| 6 | 統計 | count, reduce, reduceWith, scan, scanWith |
| 7 | 匹配 | all, any, hasElement, hasElements |
| 8 | 分組 | buffer, bufferTimeout, bufferUntil, bufferUntilChanged, bufferWhen, groupBy, window, windowTimeout, windowUntil, windowUntilChanged, windowWhen, windowWhile |
| 9 | 事件 | doAfterTerminate, doFinally, doFirst, doOnCancel, doOnComplete, doOnDiscard, doOnEach, doOnError, doOnNext, doOnRequest, doOnSubscribe, doOnTerminate, onBackpressureBuffer, onBackpressureDrop, onBackpressureError, onBackpressureLatest, one rrorContinue, one rrorMap, one rrorResume, one rrorReturn, one rrorStop |
| 10 | 除錯 | checkpoint, hide, log |
| 11 | 其它 | cache, dematerialize, limitRate, limitRequest, materialize, metrics, name, onTerminateDetach, parallel, publish, publishNext, publishOn, replay, share, subscribeOn, subscriberContext, subscribeWith, tag |
接下來我們來挨個學習各類的運算子,如同前面學習回應式流創建一樣,講解運算子時,如果是 Flux 或 Mono 獨有的,會在方法名前增加類名前綴,
轉換類運算子
轉換類的運算子數量最多,平常程序中也是使用最頻繁的,
as
將回應式流轉換為目標型別,既可以是非回應式物件,也可以是 Flux 或 Mono,
Flux.range(3, 8)
.as(Mono::from)
.subscribe(System.out::println);
cast
將回應式流內的元素強轉為目標型別,如果型別不匹配(非父型別別或當前型別),將拋出 ClassCastException ,見圖知意:

Flux.range(1, 3)
.cast(Number.class)
.subscribe(System.out::println);
Flux#collect
通過應用收集器,將 Flux 發出的所有元素收集到一個容器中,當此流完成時,發出收集的結果, Flux 提供了 2 個多載方法,主要區別在于應用的收集器不同,一個是 Java Stream 的 Collector, 另一個是自定義收集方法(同 Java Stream 中 collect 方法):
<R,A> Mono<R> collect(Collector<? super T,A,? extends R> collector);
<E> Mono<E> collect(Supplier<E> containerSupplier,
BiConsumer<E,? super T> collector);
見圖知意:

Flux.range(1, 5)
.collect(Collectors.toList())
.subscribe(System.out::println);
Flux#collectList
當此 Flux 完成時,將此流發出的所有元素收集到一個串列中,該串列由生成的 Mono 發出,見圖知意:

Flux.range(1, 5)
.collectList()
.subscribe(System.out::println);
Flux#collectMap
將 Flux 發出的所有元素按照鍵生成器和值生成器收集到 Map 中,之后由 Mono 發出,Flux 提供了 3 個多載方法:
<K> Mono<Map<K,T>> collectMap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,V>> collectMap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,V>> mapSupplier);
它們的主要區別在于是否提供值生成器和初始的Map,意同 Java Stream 中的 Collectors#toMap ,見圖知意:

Flux.just(1, 2, 3, 4, 5, 3, 1)
.collectMap(n -> n, n -> n + 100)
.subscribe(System.out::println);
Flux#collectMultimap
collectMultimap 與 collectMap 的區別在于,map 中的 value 型別不同,一個是集合,一個是元素, collectMultimap 對于流中出現重復的 key 的 value,加入到了集合中,而 collectMap 做了替換,在這點上,reactor 不如 Java Stream 中的 Collectors#toMap 方法,沒有提供 key 重復時的合并函式,也提供了 3 個多載方法,
<K> Mono<Map<K,Collection<T>>> collectMultimap(Function<? super T,? extends K> keyExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor);
<K,V> Mono<Map<K,Collection<V>>> collectMultimap(Function<? super T,? extends K> keyExtractor,
Function<? super T,? extends V> valueExtractor,
Supplier<Map<K,Collection<V>>> mapSupplier)
見圖知意:

Flux.just(1, 2, 3, 4, 5, 3, 1)
.collectMultimap(n -> n, n -> n + 100)
.subscribe(System.out::println);
Flux#collectSortedList
將 Flux 發出的元素在完成時進行排序,之后由 Mono 發出,Flux 提供了 2 個多載方法:
Mono<List<T>> collectSortedList();
Mono<List<T>> collectSortedList(@Nullable Comparator<? super T> comparator);
見圖知意:

Flux.just(1, 3, 5, 3, 2, 5, 1, 4)
.collectSortedList()
.subscribe(System.out::println);
總結
本篇我們介紹了 Reactor 運算子的分類,之后介紹了部分轉換類運算子,講解示例時都是單個運算子,相信大家都能理解,
今天的內容就學到這里,我們下篇繼續學習 Reactor 的運算子,
原始碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperatorTest 測驗類,
參考
- Reactor 3 Reference Guide
- Reactor 3 中文指南
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/158263.html
標籤:Java
上一篇:Android 3種資料保存(SharedPreferences存盤 內部檔案存盤 資料庫存盤)
下一篇:Git常用命令及方法大全
