
1. 前言
最近寫關于回應式編程的東西有點多,很多同學反映對Flux和Mono這兩個Reactor中的概念有點懵逼,但是目前Java回應式編程中我們對這兩個物件的接觸又最多,諸如Spring WebFlux、RSocket、R2DBC,我開始也對這兩個物件頭疼,所以今天我們就簡單來探討一下它們,
2. 回應流的特點
要搞清楚這兩個概念,必須說一下回應流規范,它是回應式編程的基石,他具有以下特點:
- 回應流必須是無阻塞的,
- 回應流必須是一個資料流,
- 它必須可以異步執行,
- 并且它也應該能夠處理背壓,
背壓是反應流中的一個重要概念,可以理解為,生產者可以感受到消費者反饋的消費壓力,并根據壓力進行動態調整生產速率,形象點可以按照下面理解:

3. Publisher
由于回應流的特點,我們不能再回傳一個簡單的POJO物件來表示結果了,必須回傳一個類似Java中的Future的概念,在有結果可用時通知消費者進行消費回應,
Reactive Stream規范中這種被定義為Publisher<T> ,Publisher<T>是一個可以提供0-N個序列元素的提供者,并根據其訂閱者Subscriber<? super T>的需求推送元素,一個Publisher<T>可以支持多個訂閱者,并可以根據訂閱者的邏輯進行推送序列元素,下面這個Excel計算就能說明一些Publisher<T>的特點,

A1-A9就可以看做Publisher<T>及其提供的元素序列,A10-A13分別是求和函式SUM(A1:A9)、平均函式AVERAGE(A1:A9)、最大值函式MAX(A1:A9)、最小值函式MIN(A1:A9),可以看作訂閱者Subscriber,假如說我們沒有A10-A13,那么A1-A9就沒有實際意義,它們并不產生計算,這也是回應式的一個重要特點:當沒有訂閱時發布者什么也不做,
而Flux和Mono都是Publisher<T>在Reactor 3實作,Publisher<T>提供了subscribe方法,允許消費者在有結果可用時進行消費,如果沒有消費者Publisher<T>不會做任何事情,他根據消費情況進行回應, Publisher<T>可能回傳零或者多個,甚至可能是無限的,為了更加清晰表示期待的結果就引入了兩個實作模型Mono和Flux,
4. Flux
Flux 是一個發出(emit)0-N個元素組成的異步序列的Publisher<T>,可以被onComplete信號或者onError信號所終止,在回應流規范中存在三種給下游消費者呼叫的方法 onNext, onComplete, 和onError,下面這張圖表示了Flux的抽象模型:

以上的的講解對于初次接觸反應式編程的依然是難以理解的,所以這里有一個循序漸進的理解程序,
有些類比并不是很妥當,但是對于你循序漸進的理解這些新概念還是有幫助的,
傳統資料處理
我們在平常是這么寫的:
public List<ClientUser> allUsers() {
return Arrays.asList(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
我們通過迭代回傳值List來get這些元素進行再處理(消費),這種方式有點類似廚師做了很多菜,吃不吃在于食客,需要食客主動去來吃就行了(pull的方式),至于喜歡吃什么不喜歡吃什么自己隨意,怎么吃也自己隨意,
流式資料處理
在Java 8中我們可以改寫為流的表示:
public Stream<ClientUser> allUsers() {
return Stream.of(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
依然是廚師做了很多菜,但是這種就更加高級了一些,提供了菜品的搭配方式(不包含具體細節),食客可以按照說明根據自己的習慣搭配著去吃,一但開始概不退換,吃完為止,過期不候,
反應式資料處理
在Reactor中我們又可以改寫為Flux表示:
public Flux<ClientUser> allUsers(){
return Flux.just(new ClientUser("felord.cn", "reactive"),
new ClientUser("Felordcn", "Reactor"));
}
這時候食客只需要訂餐就行了,做好了自然就呈上來,而且可以隨時根據食客的飯量進行調整,如果沒有食客訂餐那么廚師就什么都不用做,當然不止有這么點特性,不過對于方便我們理解來說這就夠了,
5. Mono
Mono 是一個發出(emit)0-1個元素的Publisher<T>,可以被onComplete信號或者onError信號所終止,

這里就不翻譯了,整體和Flux差不多,只不過這里只會發出0-1個元素,也就是說不是有就是沒有,象Flux一樣,我們來看看Mono的演化程序以幫助理解,
傳統資料處理
public ClientUser currentUser () {
return isAuthenticated ? new ClientUser("felord.cn", "reactive") : null;
}
直接回傳符合條件的物件或者null,
Optional的處理方式
public Optional<ClientUser> currentUser () {
return isAuthenticated ? Optional.of(new ClientUser("felord.cn", "reactive"))
: Optional.empty();
}
這個Optional我覺得就有反應式的那種味兒了,當然它并不是反應式,當我們不從回傳值Optional取其中具體的物件時,我們不清楚里面到底有沒有,但是Optional是一定客觀存在的,不會出現NPE問題,
反應式資料處理
public Mono<ClientUser> currentUser () {
return isAuthenticated ? Mono.just(new ClientUser("felord.cn", "reactive"))
: Mono.empty();
}
和Optional有點類似的機制,當然Mono不是為了解決NPE問題的,它是為了處理回應流中單個值(也可能是Void)而存在的,
6. 總結
Flux和Mono是Java反應式中的重要概念,但是很多同學包括我在開始都難以理解它們,這其實是規定了兩種流式范式,這種范式讓資料具有一些新的特性,比如基于發布訂閱的事件驅動,異步流、背壓等等,另外資料是推送(Push)給消費者的以區別于平時我們的拉(Pull)模式,同時我們可以像Stream Api一樣使用類似map、flatmap等運算子(operator)來操作它們,對Flux和Mono這兩個概念需要花一些時間去理解它們,不能操之過急,如果你對我的這種看法有不同的觀點可以留言討論,多多關注:碼農小胖哥 獲取更多干貨知識,
關注公眾號:Felordcn 獲取更多資訊
個人博客:https://felord.cn
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/136238.html
標籤:Java
上一篇:一個半月拿到阿里位元組拼多多美團Offer,最后去了阿里螞蟻金服
下一篇:java抽象類和抽象方法
