一、RXjava介紹
首先看一下Rxjava這個名字,其中java代表java語言,而Rx是什么意思呢?Rx是Reactive Extensions的簡寫,翻譯過來就是,回應式拓展,所以Rxjava的名字的含義就是,對java語言的拓展,讓其可以實作對資料的回應式編程,
那么回應的是什么呢?回應的是上游資料的變化,常規用法是,對資料源進行監聽,然后做出回應,
RxJava的整體結構是一條鏈,其中有這三個角色,
- 鏈的上游:生產者 Observable
- 鏈的下游:觀察者 Observer
- 鏈的中間:各個中介節點,既是下游的Observable,又是上游的Observer
二、Rxjava基本使用
Single.just("hfhuaizhi").subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe")
}
override fun onSuccess(t: String) {
Log.e(TAG, "onSuccess:$t")
}
override fun one rror(e: Throwable) {
Log.e(TAG, "onError:$e")
}
})
上面這段代碼是對Rxjava簡單的使用,其中
- Single 發出單個資料的被觀察者Observable,只發送一次,只有Success和Error兩種狀態,沒有next,在Rxjava2中新增
- just 被觀察者生產的資料,引數型別是一個泛型,這里傳進去的是一個String
- subscribe 觀察者Observer,這里宣告的是SingleObserver,用來對Single中產生的資料進行回應
- SingleObserver
- onSubscribe 訂閱成功后就會回呼,一般會在此方法中進行一些初始化操作,其引數型別是Disposable,可以通過呼叫d.dispose() 取消對Observable的監聽,并讓其停止發送訊息,
- onSuccess 接收資料成功后就會回呼,只會回呼一次,其引數型別和Observable中just方法傳入的資料型別一致,這里是String型別
- onError 發生錯誤時回呼,引數是Throwable,包含錯誤資訊,
運行效果
2021-12-18 13:54:12.450 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 13:54:12.451 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:hfhuaizhi
可以看到首先onSubscribe被呼叫,表明注冊了觀察者,然后接收資料成功,列印出’hfhuaizhi’, 到這里我們就了解了Rxjava最基本的用法,接下來分析一下函式的內部做了什么,
三、Rxjava原理決議
1. just方法分析
public static <@NonNull T> Single<T> just(T item) {
Objects.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
}
-
對方法引數進行判空
-
呼叫
RxJavaPlugins.onAssembly方法,其引數是一個SingleJust,構造方法傳入了item
- 其中onAssembly方法內部對傳入的引數進行一些處理,然后回傳原引數型別,所以接下來分析的程序中會忽略此方法,可以簡單認為just方法直接回傳了一個SingleJust實體,
// onSingleAssembly 引數默認是空的,所以這個方法原樣回傳了source,當設定onSingleAssembly后,
// 會先對source進行處理后再回傳
public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source) {
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
final T value;
public SingleJust(T value) {
this.value = value;
}
SingleJust將構造方法傳入的item保存在value欄位中, 由上述分析可知,Single.just方法會回傳一個SingleJust實體,所以在我們鏈式呼叫中的subscribe方法,實際上呼叫的是SingleJust的subscribe方法
public final void subscribe(@NonNull SingleObserver<? super T> observer) {
// 1. 判空
Objects.requireNonNull(observer, "observer is null");
// 2. 對引數中的observer進行處理后又回傳observer
observer = RxJavaPlugins.onSubscribe(this, observer);
// 3. 對Observer進行判空
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try {
// 4. 呼叫真實注冊方法
subscribeActual(observer);
} catch (NullPointerException ex) {
throw ex;
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
}
}
subscrib方法中主要做了注釋中所寫的四步操作,其中重要的是第4步subscribeActual,這里才是真正做事的,之前都是資料的校驗,因為我們這個類的實體是SingleJust,所以接下來看一下SingleJust的subscribeActual方法做了什么,
@Override
protected void subscribeActual(SingleObserver<? super T> observer) {
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
}
可以看到內容十分簡單
- 呼叫observer的onSubscribe方法,表明訂閱成功,引數是Disposable.disposed()回傳值
- 呼叫observer的onSuccess方法,表明資料回呼成功,引數是value,而value就是通過Single的just函式傳進來的,通過構造方法傳入SingleJust實體中,因此,這一步的操作就是簡單地將構造方法中傳入的值,通過observer的onSuccess方法回呼給我們定義的觀察者SingleObserver,
這樣就完事了,因為之前說過Single.just是最簡單的RxJava使用方式,先呼叫onSubscribe表明注冊監聽,然后又緊接著通過onSuccess回呼資料,所以不會有失敗的情況,

2. map方法分析
map是Rxjava中比較常用的用法,用來實作資料型別的轉換 比如像這樣,我們發送的資料型別是Integer,接收的資料型別是String,這樣當然是無法直接接收的,所以需要進行一下轉換,將上游資料發送的Integer轉換為String,然后由下游接收,
private fun testMap(view: View) {
Single.just(123).map(object : Function<Int, String> {
override fun apply(t: Int): String {
return "$t"
}
}).subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe")
}
override fun onSuccess(t: String) {
Log.e(TAG, "onSuccess:$t")
}
override fun onError(e: Throwable) {
Log.e(TAG, "onError:$e")
}
})
}
列印結果
021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:123
just方法傳入的123是Integer型別,onSuccess處接收的資料是String型別,通過map進行轉換,其中map方法傳入的引數是一個Function<T,E>,此類有兩個泛型引數,T代表輸入資料型別,E表示輸出資料型別,這里的輸入資料型別是Integer,回傳型別是String,apply方法中回傳了String型別的輸出資料,
map(object : Function<Int, String> {
override fun apply(t: Int): String {
return "$t"
}
})
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
}
進入map方法內部,此方法判空后,回傳了SingleMap實體,其構造方法傳入了當前SingleJust實體和mapper轉換引數,并將其分別保存在source和mapper成員變數中,
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
...
}
好,map方法暫且看到這兒,我們接下來繼續分析鏈式呼叫中的subscribe方法,
subccribe傳入了一個SingleObserver,和之前分析的類似,但是區別在于呼叫的不再是SingleJust的subscribe方法,而是map方法回傳的SingleMap的subscribe方法,由之前的分析可知,此方法呼叫會在資料的判空后呼叫到SingleMap的subscribeActual方法, 由之前的分析可知,鏈式呼叫到subscribe方法會呼叫到SingleMap的subscribeActual方法
public final class SingleMap<T, R> extends Single<R> {
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper) {
this.source = source;
this.mapper = mapper;
}
@Override
protected void subscribeActual(final SingleObserver<? super R> t) {
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
}
...
}
由之前的分析可知,source就是map的上游SingleJust, 所以在single的實際subscribe方法中會呼叫其上游的subscribe方法,并傳入了一個封裝好的新的MapSingleObserver,MapSingleObserver的構造方法中第一個引數t,是下游觀察者,在我們這塊代碼中就是鏈式呼叫的時候傳入的SingleObserver,第二個引數是我們在map方法中傳入的資料型別轉換轉換器mapper, 由之前的分析可知,當source,也就是SingleJust的subscribe方法呼叫后,會依次呼叫其引數傳入的Observer的onSubscribe方法和onSuccess方法,此時引數傳入的Observer就是上面代碼塊里的MapSingleObserver
static final class MapSingleObserver<T, R> implements SingleObserver<T> {
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper) {
this.t = t;
this.mapper = mapper;
}
@Override
public void onSubscribe(Disposable d) {
t.onSubscribe(d);
}
@Override
public void onSuccess(T value) {
R v;
try {
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
one rror(e);
return;
}
t.onSuccess(v);
}
@Override
public void one rror(Throwable e) {
t.onError(e);
}
}
onSubscribe方法原封不動的呼叫了t.onSubscribe(d);而t就是在MapSingleObserver構造方法傳入的下游觀察者,也就是SingleObserver實體,這里直接呼叫了其onSubscribe方法表示注冊監聽成功, onSuccess方法中呼叫了mapper.apply(value),這個mapper就是我們在map方法中傳入的轉換函式,這里輸入了Integer資料型別,得到了String型別輸出,最后呼叫t.onSuccess回呼轉換后的資料,也就是呼叫我們subscribe方法傳入的實體的onSuccess,
map方法總結
map主要做的就是一個承上啟下,鏈式呼叫中subscribe方法呼叫后,會依次向上呼叫中間節點的subscribe方法,直到呼叫到最初始的沒有上游的Observable,最上層的Observable會在其subscribeActual方法中呼叫其下游觀察者的onSubscribe和onSuccess/onError,將資料一層一層傳下去,資料傳遞的程序中,中間節點可能會對資料進行處理后再接著向下傳,最終傳遞到最底層的Observer,整個流程如圖所示

圖片含義解釋
最上游的Single就是我們呼叫Single.just產生的SingleJust,其subscribe方法中會呼叫onSubscribe()和onSuccess(),向下方觀察者傳遞Integer型別的結果,中間觀察者SingleObserver由map方法創建,其接收到上游傳遞下來的資料后,將其轉換為String,然后傳遞給下方觀察者,最后下游收到的資料結果就是String型別,
3. 執行緒切換
執行緒切換可以說是RxJava中最常用的操作了,甚至很多人選擇RxJava,就是因為RxJava可以和方便地實作執行緒切換, 執行緒切換主要用到這兩個函式:
- subscribeOn
- observerOn
private fun testSubscribe(view: View) {
Single.just("hfhuaizhi").subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).subscribe(object : SingleObserver<String> {
override fun onSubscribe(d: Disposable) {
Log.e(TAG, "onSubscribe")
}
override fun onSuccess(t: String) {
Log.e(TAG, "onSuccess:$t")
}
override fun one rror(e: Throwable) {
Log.e(TAG, "onError:$e")
}
})
}
這樣寫,可以實作subscribe呼叫之前的訊息發送在io執行緒,observerOn呼叫之后的Observer回呼在android主執行緒,其中AndroidSchedulers類不在Rxjava標準庫中,需要額外引入RxAndroid依賴,
subscribeOn
public final Single<T> subscribeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
}
subscribeOn方法回傳一個SingleSubscribeOn實體,其構造方法中傳入了this(上游被觀察者)和scheduler(執行緒調度器,我們傳入的是Schedulers.io()), 由之前的分析可知,鏈式呼叫中最終subscribe方法呼叫的時候,會由下向上依次呼叫各個節點的subscribe方法,這里我們看一下SingleSubscribeOn這一執行緒切換的節點的subscribe方法做了什么,因為SingleSubscribeOn和SingleJust一樣繼承自Single,其subscribe方法也是呼叫到了subscribeActual方法
public final class SingleSubscribeOn<T> extends Single<T> {
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler) {
// 上層被觀察物件
this.source = source;
// 執行緒型別
this.scheduler = scheduler;
}
@Override
protected void subscribeActual(final SingleObserver<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
}
}
-
將observer(下游觀察者)和source(上游被觀察者)封裝進一個新的觀察者SubscribeOnObserver
-
呼叫下游觀察者的onSubscribe方法
-
呼叫scheduler的scheduleDirect方法,引數傳入剛封裝的新的觀察者SubscribeOnObserver實體
-
將parent的task變數替換為由傳入的scheduler生成的Disposable
final SequentialDisposable task;- 這個task的引數型別是Disposable,之前有提到過,在Observer的onSubscribe方法中會傳入一個Disposable,呼叫Disposable的dispose()方法后,會取消注冊并讓上游停止發送任務,這個Disposable繼承自AtomicReference 實作了Disposable介面,AtomicReference是java里的原子參考型別,可以執行緒安全地對物件參考進行修改,類似地還有AtomicInteger等,所以這里的parent.task.replace(f)就是將parent中的task這個disposable執行緒安全地替換為scheduler創建地這個新的Disposable,從而可以實作任務的取消,
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
...
}
接下來分析一下第3步主要做了什么
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
scheduleDirect方法中傳入了一個Runnable型別引數,因為SubscribeOnObserver類實作了Runnable介面,所以可以被當作Runnable傳進去,
因為我們傳入的scheduler引數是由Schedulers.io()方法創建的,而此方法默認會回傳一個IoScheduler

這個Scheduler的注釋寫著,會創建并快取一個執行緒池,所以我們知道了scheduleDirect方法會將傳入的Runnable放入一個執行緒池里執行,從而實作任務的異步執行,所以接下來我們去看一下SubscribeOnObserver的run方法里做了什么,
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> downstream;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source) {
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
}
@Override
public void run() {
source.subscribe(this);
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this, d);
}
@Override
public void onSuccess(T value) {
downstream.onSuccess(value);
}
...
}
SubscribeOnObserver的run方法中會呼叫source.subscribe,并傳入自己(自己也是一個Observer),由之前分析我們知道source就是我們監聽的上游,這里呼叫了SingleJust的subscribe,由之前的分析我們知道subscribe會呼叫到subscribeActual,這里做任務的真正執行,因此就這樣實作了讓上游任務在異步執行緒中的執行,上游任務執行過后,會將資料向下傳遞,傳遞到當前SubscribeOnObserver節點的時候會呼叫其onSuccess方法,其呼叫downstream,也就是下游觀察者的onSuccess方法,將資料繼續向下傳遞,此時資料傳遞的執行緒也是run方法執行的執行緒,因為此時并沒有再次對執行緒進行切換,
observerOn
public final Single<T> observeOn(@NonNull Scheduler scheduler) {
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
}
observeOn函式回傳了一個SingleObserveOn,也是需要傳入this(上游被觀察者),和scheduler(執行緒調度器型別,此時我們傳入的是AndroidSchedulers.mainThread()),由之前分析可知我們此時應該去看SingleObserveOn的subscribeActual方法呼叫
protected void subscribeActual(final SingleObserver<? super T> observer) {
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
}
此方法中呼叫了其上游的subscribe方法,和之前分析的資料流轉程序一致,需要依次呼叫到最根節點的subscribe,引數傳入的是封裝后的觀察者ObserveOnSingleObserver,其構造方法中傳入了下游觀察者和執行緒調度型別,接下來我們看一下當ObserveOnSingleObserver收到上游傳下來的資料后進行了怎樣的操作,
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable {
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler) {
this.downstream = actual;
this.scheduler = scheduler;
}
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.setOnce(this, d)) {
downstream.onSubscribe(this);
}
}
@Override
public void onSuccess(T value) {
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
}
@Override
public void run() {
Throwable ex = error;
if (ex != null) {
downstream.onError(ex);
} else {
downstream.onSuccess(value);
}
}
...
}
可以看到在onSuccess方法中呼叫了scheduler.scheduleDirect(this),并穿了個this,而且自身實作了runnable介面,由之前分析可知,run方法會在某一時刻被呼叫,傳入的scheduler是AndroidSchedulers.mainThread()其回傳的是HandlerScheduler,其內部封裝了個Handler,將Runnable 弄到主執行緒去執行,最終結果就是ObserveOnSingleObserver的run方法在主執行緒中被呼叫, 其run方法呼叫了下游觀察者downstream的onSuccess/onError, 由此分析可知,observerOn方法控制此節點后的被觀察者收到資料時所在的執行緒,無法影響其上游節點,
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/387941.html
標籤:其他
