Reactor
Reactor 是用于 Java 的異步非阻塞回應式編程框架,同時具備背壓控制的能力,它與 Java 8 函式式 Api 直接集成,比如 分為CompletableFuture、Stream、以及 Duration
,它提供了異步 Api 回應流 Flux (用于 [0 - N] 個元素)和 Mono (用于 [0或1] 個元素),并完全遵守和實作了回應式規范,
引入 reactor
reactor 自 3.0.4 版本之后,采用了 BOM (Bill Of Materials)的方式,使用 BOM 可以管理一組良好集成的 maven artifacts,而無需擔心不同版本組件之間的相互依賴問題,在 maven 專案中在 dependencyManagement 中 加入 reactor 的 bom 定義即可,
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-bom</artifactId>
<version>Dysprosium-SR8</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
在需要使用 reactor 的專案中,依賴對應 reactor 模塊即可,
<dependencies>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-core</artifactId>
</dependency>
</dependencies>
在我學習的程序中,reactor 的最新版本是 Dysprosium-SR8 ,它的命名來自元素周期表,按順序遞增,通過 https://github.com/reactor/reactor/releases 獲取最新版本,
reactor bom 中 包含了如下組件:
| 序號 | 模塊 | artifactId | 說明 |
|---|---|---|---|
| 1 | Reactor Core | reactor-core | 基于 Java 8 的回應式流規范實作 |
| 2 | Reactor Test | reactor-test | reactor 的測驗工具集 |
| 3 | Reactor Extra | reactor-extra | 為 Flux 額外提供的運算子 |
| 4 | Reactor Netty | reactor-netty | 基于 Netty 實作的 TCP、UDP、HTTP 的客戶端和服務端 |
| 5 | Reactor Adapter | reactor-adapter | 和其他回應式庫(如RxJava2、SWT Scheduler、 Akka Scheduler)的配接器 |
| 6 | Reactor Kafka | reactor-kafka | Apache Kafka 的回應式橋接實作 |
| 7 | Reactor Kotlin Extensions | reactor-kotlin-extensions | 在 Dysprosium 版本后額外提供的 Kotlin 擴展 |
| 8 | Reactor RabbitMQ | reactor-rabbitmq | RabbitMQ 的回應式橋接實作 |
| 9 | Reactor Pool | reactor-pool | 回應式應用程式的通用物件池 |
| 10 | Reactor Tools | reactor-tools | 一組用于改善 Project Reactor 除錯和開發經驗的工具, |
序號 [1 - 3] 為我們學習 Reactor 程序中主要涉及的模塊,序號 [4 - 9] 在我們學習 Spring WebFlux 的程序中會有所涉及,序號 [10] 是用于 Reactor 除錯的,下面會講到,
使用 gradle 的同學請自行百度,
如果需要嘗鮮 Reactor 里程碑版或開發者預覽版的同學,添加 Spring Milestones repository 的倉庫即可,
Reactor 之 初體驗
上面說了那么多,我們先來體驗下 Reactor,
在學習 Java Stream 的環節中,不知是否有同學有提出這樣的疑問:在進行中間操作或終端消費操縱時,如何獲取流中元素的序號值呢?
假如有如下單詞 [ the, quick, brown, fox, jumped, over, the, lazy, dog ] ,使用 Stream 可否實作輸出時并列印每個單詞的序號呢?
仔細想想,似乎沒有直接的辦法可以獲取,我們只能通過外部創建變數獲取并遞增來實作,
來看下 Stream 的實作:
AtomicInteger index = new AtomicInteger(1);
Arrays.stream(WORDS)
.map(word -> StrUtil.format("{}. {}", index.getAndIncrement(), word))
.forEach(System.out::println);
來看下 Reactor 的實作:
Flux.fromArray(WORDS) // ①
.zipWith(Flux.range(1, Integer.MAX_VALUE), // ②
(word, index) -> StrUtil.format("{}. {}", index, word)) // ③
.subscribe(System.out::println); // ④
先不看 Reactor 代碼的含義,感覺怎么樣,Reactor 的代碼看起來是不是更清新一點,沒有定義任何三方變數解決了這個問題,
有了 Stream 的基礎,Reactor 的代碼很容易理解了,我們稍微來解釋下 Reactor 上段的代碼:
- 序號① 的代碼 Flux 是我們之前提到的 一個能夠發出 0 到 N 個元素的回應流發布者,fromArray 是它的靜態方法,用來創建 Flux 回應流
- 序號② 的代碼 Flux 的 range 運算子和 Stream 的 range 相同,用來生成 整數 Flux 回應流;zipWith 運算子用來合并兩個 Flux,并將回應流中的元素一一對應,當其中一個回應流完成時,合并結束,之前未結束的回應流剩下的元素將被忽略
- 序號③ 的代碼 zipWith 運算子 支持傳遞一個 BiFunction 的函式式介面實作,定義如何來合并兩個資料流中的元素,本例中我們將索引和單詞連接起來
- 序號④ 的代碼 subscribe 即為訂閱方法,此處我們做了資料流中元素輸出至控制臺
Reactor 之 測驗 & 除錯
測驗
Reactor 的測驗需要依賴測驗模塊:
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<scope>test</scope>
</dependency>
撰寫測驗代碼如下:
// 創建 Flux 回應流
Flux<String> source = Flux.just("foo", "bar");
// 使用 concatWith 運算子連接 2 個回應流
Flux<String> boom = source.concatWith(Mono.error(new IllegalArgumentException("boom")));
// 創建一個 StepVerifier 構造器來包裝和校驗一個 Flux,
StepVerifier.create(boom)
.expectNext("foo") // 第一個我們期望的信號是 onNext,它的值為 foo
.expectNext("bar") // 第二個我們期望的信號是 onNext,它的值為 bar
.expectErrorMessage("boom") // 最后我們期望的是一個終止信號 one rror,例外內容應該為 boom
.verify(); // 使用 verify() 觸發測驗,
除了正常測驗外,Reactor 還提供了諸如:
- 測驗基于時間運算子相關的方法,使用 StepVerifier.withVirtualTime 來進行
- 使用 StepVerifier 的 expectAccessibleContext 和 expectNoAccessibleContext 來測驗 Context
- 用 TestPublisher 手動發出元素
- 用 PublisherProbe 檢查執行路徑
測驗方面暫時不是我們學習的重點,這塊內容,我們快速跳過,等到學習到相關場景,需要的時候,我們回過頭來再彌補,
除錯
回應式編程的除錯令人生畏,因為它不像命令式編程,我們可以從例外的堆疊資訊中看到發生錯誤代碼的位置及具體錯誤資訊,這也是回應式編程學習曲線比較陡峭的原因,
有如下代碼:
Flux.range(1, 3)
.flatMap(n -> Mono.just(n + 100))
.single()
.map(n -> n * 2)
.subscribe(System.out::println);
執行測驗時,列印錯誤日志如下:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330)
at reactor.core.publisher.Mono.subscribe(Mono.java:4190)
at reactor.core.publisher.Mono.subscribe(Mono.java:4126)
at reactor.core.publisher.Mono.subscribe(Mono.java:4073)
at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:79)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
我們從上述的錯誤中獲取到發生了 IndexOutOfBoundsException 資料越界例外,從上往下看,應該是 MonoSingle 回應式流發出了不止一個元素,查看 Mono#singe 運算子描述,我們看到 single 有一個規定: 源必須只能發出一個元素,看來是有一個源發出了多于一個元素,從而違反了這一規定,
粗略過一下這些行,我們可以大概勾畫出一個大致的出問題的鏈:涉及 MonoSingle、FluxFlatMap、FluxRange(每一個都對應 trace 中的幾行,但總體涉及這三個類), 所以難道是 range().flatMap().single() 這樣的鏈?
但是如果在我們的應用中多處都用到這一模式,那怎么辦?通過這些還是不能確定為什么會拋除這個例外, 搜索 single 也找不到問題所在,直到最后幾行指向了我們的代碼,查看代碼和我們之前的預測的呼叫鏈一樣,
但是最終我們怎么快速確定代碼的問題在哪里呢?
方案1: 開啟除錯模式
使用 Hooks.onOperatorDebug(); 在程式初始的地方開啟除錯模式
錯誤日志如下:
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.IndexOutOfBoundsException: Source emitted more than one item
Caused by: java.lang.IndexOutOfBoundsException: Source emitted more than one item
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
Suppressed: reactor.core.publisher.FluxOnAssembly$OnAssemblyException:
Assembly trace from producer [reactor.core.publisher.MonoSingle] :
reactor.core.publisher.Flux.single(Flux.java:7851)
top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:83)
Error has been observed at the following site(s):
|_ Flux.single ? at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:83)
|_ Mono.map ? at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:84)
Stack trace:
at reactor.core.publisher.MonoSingle$SingleSubscriber.onNext(MonoSingle.java:129)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.tryEmitScalar(FluxFlatMap.java:480)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:413)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:154)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:109)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:363)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:68)
at reactor.core.publisher.Mono.subscribe(Mono.java:4219)
at reactor.core.publisher.Mono.subscribeWith(Mono.java:4330)
at reactor.core.publisher.Mono.subscribe(Mono.java:4190)
at reactor.core.publisher.Mono.subscribe(Mono.java:4126)
at reactor.core.publisher.Mono.subscribe(Mono.java:4073)
at top.todev.note.web.flux.reactor.ReactorFirstExperienceTest.testReactorDebug(ReactorFirstExperienceTest.java:85)
我們從 Error has been observed at the following site(s) 這行錯誤起,可以看到錯誤沿著操作鏈傳播的軌跡(從錯誤點到訂閱點),我們從 Assembly trace from
producer 這行開始的錯誤中也看到了源代碼 83 行開始報錯,也確定了上一行的 flatMap 運算子發出了不止一個元素導致,
方案2: 使用 checkpoint 運算子埋點除錯
使用方案1開啟全域除錯有較高的成本即影響性能,我們可以在可能發生錯誤的代碼中加入運算子 checkpoint 來檢測本段回應式流的問題,而不影響其他資料流的執行,
checkpoint 通常用在明確的運算子的錯誤檢查,類似于埋點檢查的概念,同時該運算子支持 3個多載方法:checkpoint(); checkpoint(String description); checkpoint
(String description, boolean forceStackTrace);
description 為埋點描述,forceStackTrace 為是否列印堆疊
方案3: 啟用除錯代理
1. 在專案中引入 reactor-tools 依賴
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-tools</artifactId>
</dependency>
2. 使用 ReactorDebugAgent.init(); 初始化代理
由于該代理是在加載類時對其進行檢測,因此放置它的最佳位置是在main(String [])方法中的所有其他項之前
3. 如果是測驗類,使用如下代碼處理現有的類
注意,在測驗類中需要提前運行,比如在 @Before 中
ReactorDebugAgent.init();
ReactorDebugAgent.processExistingClasses();
總結
本篇我們了解了如何引入 Reactor ;初步體驗了 Reactor 的 Hello World 代碼;最后我們了解了如何測驗及除錯 Reactor,這些內容為我們后面學習 Reactor 的基礎,希望大家都能掌握,
今天的內容就學到這里,我們下篇開始 Reactor 的基礎和特性學習,
原始碼詳見:https://github.com/crystalxmumu/spring-web-flux-study-note{:target="_blank"} 下 02-reactor-core-learning 模塊,
參考
- Reactor 3 Reference Guide
- Reactor 3 中文指南
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/161019.html
標籤:Java
上一篇:LintCode 9.Fizz Buzz 問題(JAVA實作,一個if都不用)
下一篇:Java的兩把鎖淺析
