Reactor 運算子
上篇文章我們將 Flux 和 Mono 的運算子分了 11 類,我們來繼續學習轉換類運算子的第 2 篇,
轉換類運算子
轉換類的運算子數量最多,平常程序中也是使用最頻繁的,
Flux#concatMap
將回應式流中元素順序轉換為目標型別的回應式流,之后再將這些流連接起來,該方法提供了 2 個多載方法,傳遞的第 2 個引數為內部生成回應式流的預取數量,見圖知意:

Flux.range(3, 8)
.concatMap(n -> Flux.just(n - 10, n, n + 10), 3)
.subscribe(System.out::println);
Flux#concatMapDelayError
concatMapDelayError 和 concatMap 區別在于,當內部生成回應式流發出 error 時,是否延遲回應 error ,該方法提供了 3 個多載方法,支持傳遞引數:是否延遲發出錯誤和預取數量,
Flux.range(3, 8)
.concatMapDelayError(n -> {
if (n == 4) {
return Flux.error(new NullPointerException());
}
return Flux.just(n - 10, n, n + 10);
})
.subscribe(System.out::println, System.err::println);
Flux#concatIterable
concatIterable 和 concatMap 的區別在于 內部回傳的型別不同,一個為 Iterable, 一個為 回應式流,見圖知意:

Flux.range(3, 8)
.publishOn(Schedulers.single())
.concatMapIterable(n -> {
if (n == 4) {
throw new NullPointerException();
}
return Arrays.asList(n - 10, n, n + 10);
})
.onErrorContinue((e, n) -> System.err.println("資料:" + n + ",發生錯誤:" + e))
.subscribe(System.out::println);
elapsed
收集回應式流中元素的間隔發出時間,轉換為 時間間隔 和 舊元素 組成的 Tuple2 的回應式流,見圖知意:

Flux.interval(Duration.ofMillis(300))
.take(20)
.elapsed(Schedulers.parallel())
.subscribe(System.out::println);
Thread.sleep(7000);
expand
從上層節點逐層展開方式遞回展開樹形節點,
Flux.just(16, 18, 20)
.expand(n -> {
if (n % 2 == 0) {
return Flux.just(n / 2);
} else {
return Flux.empty();
}
})
.subscribe(System.out::println);
expandDeep
從上層節點逐個展開方式遞回展開樹形節點,expand 和 expandDeep 的區別在于展開方式不同,另外它倆都提供了 capacityHint 指定遞回時初始化容器的容量,
Flux.just(16, 18, 20)
.expandDeep(n -> {
if (n % 2 == 0) {
return Flux.just(n / 2);
} else {
return Flux.empty();
}
})
.subscribe(System.out::println);
總結
本篇我們介紹了 Reactor 部分的轉換類運算子,講解示例時都是單個運算子,相信大家都能理解,
由于最近學習時間不確定,內容比較少,無論作業還是生活的困難,我們只要堅持,終將會被克服解決,今天的內容就學到這里,我們下篇繼續學習 Reactor 的運算子,
原始碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note 下 02-reactor-core-learning
模塊下 ReactorTransformOperator02Test 測驗類,
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/154814.html
標籤:Java
上一篇:Maven(二) 構建生命周期
