RxJava2 使用 及 原始碼閱讀
RxJava是什么?根據RxJava在GitHub上給出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java
大致意思是:
RxJava—一個可以在JVM上運行的,基于觀察者模式 實作異步操作的java庫,
RxJava的作用:
就是異步RxJava的使用,可以使“邏輯復雜的代碼”保持極強的閱讀性,
Rxjava github地址
RxAndorid的作用:
Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了AndroidSchedulers.mainThread(),Android開發者使用程序中,可以輕松的將任務post Andorid主執行緒中,執行頁面更新操作,
RxAndroid github地址
使用方式
1、Observable
- Observable:被觀察者
- Observer:觀察者,可接收Observable發送的資料
a、Rxjava 實作執行緒切換:
//
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
//1、“異步執行緒” 執行耗時操作
//2、“執行完畢” 呼叫onNext觸發回呼,通知觀察者
e.onNext("1");
e.onComplete();
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
// 訂閱執行緒 訂閱的那一刻在訂閱執行緒中執行
}
@Override
public void onNext(String value) {
// “主執行緒”執行的方法
}
@Override
public void one rror(Throwable e) {
// "主執行緒"執行的方法
}
@Override
public void onComplete() {
// "主執行緒"執行的方法
}
});
b、Rxjava 使用運算子
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
// IO 執行緒
// 請求網路資料
e.onNext("123456");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) {
// IO 執行緒
// 網路資料決議(資料轉化)
//
// throw new RequestFailException("獲取網路請求失敗");
return 123;
}
}).doOnNext(new Consumer<Integer>() { //保存登錄結果UserInfo
@Override
public void accept(@NonNull Integer bean) throws Exception {
// IO 執行緒
// 保存網路資料
}
}).subscribeOn(Schedulers.io()) //IO執行緒
.observeOn(AndroidSchedulers.mainThread()) //主執行緒
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer bean) throws Exception {
// 更新UI
}
}, new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
// 錯誤 顯示錯誤頁面
}
});
2、Flowable
Flowable是為了應對Backpressure產生的,
Flowable是一個被觀察者,與Subscriber(觀察者)配合使用
//
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
//1、“異步執行緒” 執行耗時操作
//2、“執行完畢” 呼叫onNext觸發回呼,通知觀察者
emitter.onNext(0);
emitter.onComplete();
}
// 若消費者消費能力不足,則拋出MissingBackpressureException例外
}, BackpressureStrategy.ERROR)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// 訂閱時執行,發生在“訂閱執行緒”
// 這個方法是用來向生產者申請可以消費的事件數量
// 這里表明消費者擁有Long.MAX_VALUE的消費能力
s.request(Long.MAX_VALUE);
}
@Override
public void onNext(Integer integer) {
// “主執行緒”執行的方法
}
@Override
public void one rror(Throwable t) {
// "主執行緒"執行的方法
}
@Override
public void onComplete() {
// "主執行緒"執行的方法
}
});
a、 Backpressure(背壓)
Backpressure(背壓) 即生產者的生產速度大于消費者的消費能力引起的問題,
在RxJava中有一種情況就是被觀察者發送訊息十分迅速以至于觀察者不能及時的回應這些訊息,
例如:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
// “異步執行緒”中 生產者有無限的生產能力
while (true){
e.onNext(1);
}
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
// “主執行緒”中 消費者消費能力不足,從而造成事件無限堆積,最后導致OOM
Thread.sleep(2000);
System.out.println(integer);
}
});
異步執行緒中 生產者有無限的生產能力;
主執行緒中 消費者消費能力不足,從而造成事件無限堆積,最后導致OOM,
上述的現象,有個專有的名詞來來形容,即:Backpressure(背壓)
b、Subscription.request(long n);
Subscription.request(long n) 方法是用來向生產者申請可以消費的事件數量,
- 當呼叫了
request(long n)方法后,生產者便發送對應數量的事件供消費者消費; - 如果
不顯示呼叫request就表示消費能力為0,
在異步呼叫時,RxJava中有個快取池,用來快取消費者處理不了暫時快取下來的資料,快取池的默認大小為128,即只能快取128個事件,
無論request()中傳入的數字比128大或小,快取池中在剛開始都會存入128個事件;當然如果本身并沒有這么多事件需要發送,則不會存128個事件,
BackpressureStrategy.ERROR策略下,如果生產者生產的事件大于128個,快取池便會溢位,從而拋出MissingBackpressureException例外;BackpressureStrategy.BUFFER策略:將RxJava中默認的128個事件的快取池換成一個更大的快取池,這樣,消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,但是這種方式比較消耗記憶體,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會產生OOM,總之BUFFER要慎用,BackpressureStrategy.DROP策略:當消費者處理不了事件,則丟棄,消費者通過request()傳入其需求n,然后生產者把n個事件傳遞給消費者供其消費,其他消費不掉的事件就丟掉,BackpressureStrategy.LATEST策略: LATEST與DROP功能基本一致,消費者通過request()傳入其需求n,然后生產者把n個事件傳遞給消費者供其消費,其他消費不掉的事件就丟掉,唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最后一個事件,
原始碼閱讀——簡單例子 (一)
注:當前使用的原始碼版本 rxjava:2.1.9
從這段不涉及運算子和執行緒切換的簡單例子開始:
// 創建觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
@Override
public void onNext(String o) {
}
@Override
public void one rror(@NonNull Throwable e) {
Log.d(TAG, "onError data is :" + e.toString());
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// 訂閱
observable.subscribe(observer);
a、ObservableOnSubscribe.java
先看一下ObservableOnSubscribe.java這個類
public interface ObservableOnSubscribe<T> {
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}
由代碼可知 ObservableOnSubscribe是一個回呼介面,回呼方法中引數為ObservableEmitter,下邊看一下ObservableEmitter 這個類,
ObservableEmitter.java
ObservableEmitter字面意思是被觀察者發射器,看一下原始碼:
public interface ObservableEmitter<T> extends Emitter<T> {
void setDisposable(@Nullable Disposable d);
void setCancellable(@Nullable Cancellable c);
boolean isDisposed();
@NonNull
ObservableEmitter<T> serialize();
@Experimental
boolean tryOnError(@NonNull Throwable t);
}
ObservableEmitter是對Emitter的擴展,而擴展的方法正是 RxJava2.0 之后引入的,提供了可中途取消等新能力,我們看 Emitter 原始碼:
public interface Emitter<T> {
void onNext(@NonNull T value);
void one rror(@NonNull Throwable error);
void onComplete();
}
Emitter字面意思是發射器,這里邊的三個方法,大家都很熟悉了,其對應了以下這段代碼:
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
回呼說完,下邊我們來看Observable.create(ObservableOnSubscribe<T> source) 這段代碼,
b、Observable.create(ObservableOnSubscribe source)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
- RxJavaPlugins 先忽略
- 我們看到傳入的
ObservableOnSubscribe被用來創建ObservableCreate,其實ObservableCreate就是Observable的一個實作類
因此 Observable.create(ObservableOnSubscribe<T> source) 這段代碼,實際是:
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO執行緒中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
- 這里我們知道:當
ObservableOnSubscribe.subscribe方法被執行時,用戶通過呼叫ObservableEmitter.onNext方法,將資料發送出去(發送給觀察者)
下邊我們看一下ObservableCreate 這個類
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代碼 ...
}
ObservableOnSubscribe.subscribe方法是在ObservableCreate.subscribeActual方法中第四行中被執行了;subscribe方法中,用戶通過呼叫ObservableEmitter.onNext方法,將資料發送出去;- 而
subscribeActual方法第二行,呼叫了observer.onSubscribe(parent);方法, 訂閱發生時,在訂閱執行緒主動執行了observer的onSubscribe方法; CreateEmitter是對ObservableCreate.subscribeActual(Observer<? super T> observer)方法傳入的Observer的封裝;CreateEmitter的作用是任務取消時,可以不再回呼其封裝的觀察者;observer的onNext方法,由CreateEmitter.onNext方法呼叫;
Observable.create(ObservableOnSubscribe<T> source); 方法最侄訓傳一個 ObservableCreate 物件,
下邊看 observable.subscribe(observer); 方法
c、observable.subscribe(observer);
observable.subscribe(observer);即 訂閱發生的那一刻,- 這里
observable.subscribe(observer);實際是ObservableCreate.subscribe(observer);
下邊查看Observable 的subscribe(observer)方法
Observable.subscribe(Observer observer)
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// Observable的subscribe方法,實際執行的是subscribeActual方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
//
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
- 呼叫
observable.subscribe(observer);方法時,實際是呼叫了observable.subscribeActual(observer)方法, observable為ObservableCreate的參考,因此這里呼叫的是ObservableCreate.subscribeActual(observer)方法,
我們又回到 ObservableCreate 這個類的subscribeActual方法
ObservableCreate.java
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
// subscribeActual 方法在 訂閱發生的那一刻被呼叫 既 observable.subscribe(observer);時被呼叫
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回呼
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代碼 ...
}
subscribeActual方法在 訂閱發生的那一刻被呼叫的;在observable.subscribe(observer);時被呼叫;observer.onSubscribe(parent);訂閱發生時,在訂閱執行緒回呼observer的onSubscribe方法subscribeActual方法中,傳入的Observer會被包裝成一個CreateEmitter;若中途任務取消,通過CreateEmitter可終止對observer中方法onNext 、onError等的回呼;
subscribeActual 中第二行代碼 observer.onSubscribe(parent);
observer.onSubscribe(parent); 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法,這里回到了以下代碼
// 創建觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "onSubscribe");
}
// ... 省略onNext、onError、onComplete
};
- 這里傳入的引數為
new CreateEmitter<T>(observer),其實作了Disposable介面,若任務取消,則不回呼傳入的觀察者observer對應的onNext 、onError、onComplete等方法
subscribeActual 中第四行代碼 source.subscribe(parent);
source.subscribe(parent); 是ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));
代碼最侄訓到ObservableOnSubscribe 的 subscribe :
new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
}
- 在
subscribe中,呼叫到CreateEmitter類的onNext 、onComplete、onError方法,將資料發送CreateEmitter中的觀察者
到此,“這段不涉及運算子和執行緒切換的簡單例子” 的代碼跟蹤結束,
原始碼閱讀——執行緒切換 (二)
注:當前使用的原始碼版本 rxjava:2.1.9
從這段執行緒切換的簡單例子開始:
// 創建觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 訂閱執行緒 訂閱的那一刻在訂閱執行緒中執行
}
@Override
public void onNext(String o) {
// Android 主執行緒中執行
}
@Override
public void one rror(@NonNull Throwable e) {
// Android 主執行緒中執行
}
@Override
public void onComplete() {
// Android 主執行緒中執行
}
};
// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO執行緒中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
// 被觀察者 IO 執行緒
observable = observable.subscribeOn(Schedulers.io());
// 觀察者 Android主執行緒
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 訂閱
observable.subscribe(observer);
先來個我總結的RxJava2的整個代碼執行流程:
a、Observable.create(ObservableOnSubscribe source)
在 原始碼閱讀——簡單例子 (一) 中我們了解到了Observable.create(ObservableOnSubscribe<T> source)實際是 如下代碼:
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO執行緒中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
ObservableCreate中含有一個subscribeActual(observer)方法,用于執行傳入觀察者的observer.onSubscribe方法,和間接呼叫 觀察者的onNext、onComplete等方法;
ObservableCreate
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
// 省略部分代碼 ...
}
subscribeActual方法第二行,呼叫了傳入的觀察者的observer.onSubscribe(parent);方法; 訂閱發生時,在訂閱執行緒主動執行了observer的onSubscribe方法;subscribeActual方法第四行,呼叫了傳入的觀察者的observer.subscribe方法;subscribe方法中,用戶通過呼叫CreateEmitter.onNext方法,將資料發送出去;CreateEmitter是對ObservableCreate.subscribeActual(Observer<? super T> observer)方法傳入的Observer的封裝;CreateEmitter的作用是任務取消時,可以不再回呼其封裝的觀察者;observer的onNext方法,由CreateEmitter.onNext方法呼叫;
下邊查看observable.subscribeOn(Schedulers.io())相關代碼
注:
ObservableEmitter是CreateEmitter的參考,是對Observer的進一步封裝,CreateEmitter在執行onNext時,如果任務取消,則不再回呼Observer的onNext方法,
b、observable.subscribeOn(Schedulers.io())
下邊我們查看Observable 類的subscribeOn(Scheduler scheduler)方法
Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 生成一個ObservableSubscribeOn物件
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
- 繼續忽略
RxJavaPlugins - 最侄訓傳一個
ObservableSubscribeOn物件
這里Observable observable = observableCreate.subscribeOn(Schedulers.io())代碼實際是
ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
- 因此
observable.subscribeOn(Schedulers.io())回傳的是一個ObservableSubscribeOn的參考
下邊查看ObservableSubscribeOn
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
// ... 省略部分代碼
}
看一下ObservableSubscribeOn中的subscribeActual 方法
subscribeActual方法第二行代碼中,執行了傳入Observer的onSubscribe方法;subscribeActual方法第三行: 在scheduler對應的IO執行緒中,執行observableCreate的subscribe方法,傳入引數為SubscribeOnObserver,即:IO執行緒中執行observableCreate.subscribe(new SubscribeOnObserver(observer));
因此,無論ObservableSubscribeOn.subscribeActual(observer)在哪個執行緒中被呼叫observableCreate.subscribe(new SubscribeOnObserver<T>(observer))均在IO執行緒中執行,因此觀察者的e.onNext("hello"); e.onComplete(); 亦在IO執行緒中執行;
c、observable.observeOn(AndroidSchedulers.mainThread())
下邊我們查看Observable 類的observeOn(Scheduler scheduler)方法
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
//
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
這里可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())實際是:
ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
因此 ,observable.observeOn(AndroidSchedulers.mainThread()) 回傳的是ObservableObserveOn 的參考,
下邊查看ObservableObserveOn
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ... 省略部分代碼
}
看一下ObservableObserveOn中的subscribeActual 方法
subscribeActual方法第五行代碼,實際為observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));ObserveOnObserver的作用是在ObserveOnObserver的onNext方法被實行時;將observer的onNext方法post到Android主執行緒中;
d、observable.subscribe(observer)
- 我們知道
Observable的subscribe(Observer<? super T> observer)方法,實際呼叫到了Observable的subscribeActual(Observer<? super T> observer)方法; - 而這里的
observable實際是ObservableObserveOn的參考;
因此,observable.subscribe(observer)實際執行的是observableObserveOn.subscribeActual(observer)
到這里,我們 執行緒切換 (二) 的小例子變換為了以下代碼:
// 創建觀察者
Observer observer = new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
// 訂閱執行緒 訂閱的那一刻在訂閱執行緒中執行
}
@Override
public void onNext(String o) {
// Android 主執行緒中執行
}
@Override
public void one rror(@NonNull Throwable e) {
// Android 主執行緒中執行
}
@Override
public void onComplete() {
// Android 主執行緒中執行
}
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
@Override
public void subscribe(@NonNull ObservableEmitter e) throws Exception {
// IO執行緒中執行
e.onNext("hello");
e.onNext("world");
e.onComplete();
}
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);
下邊我們查看observableObserveOn.subscribeActual(observer)
ObservableObserveOn.java
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
// source 為 observableSubscribeOn
super(source);
// scheduler 為AndroidSchedulers.mainThread()
this.scheduler = scheduler;
// false
this.delayError = delayError;
// 128
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// AndroidSchedulers.mainThread() 為 HandlerScheduler,因此會走到else部分代碼
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
}
// 代碼會走到else 部分
else {
Scheduler.Worker w = scheduler.createWorker();
// source 為 observableSubscribeOn
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
// ... 省略部分代碼
}
subscribeActual方法中,AndroidSchedulers.mainThread()為HandlerScheduler,因此 if 中的判斷陳述句直接忽略,直接走到代碼的 else 部分,subscribeActual方法中,將觀察者observer封裝成了ObserveOnObserver;并且呼叫observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))- 而
observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))實際是
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“訂閱執行緒中” —— 執行onSubscribe, 實際執行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 執行subscribe ;IO執行緒 subscribe方法中,用戶主動呼叫ObserveOnObserver的onNext、onError、onComplete方法,將資料發出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
- 用戶呼叫
SubscribeOnObserver的onNext是將資料發送出去 SubscribeOnObserver.onNext呼叫了observeOnObserver.onNextobserveOnObserver.onNext通過HandlerScheduler將observer.onNext、observer.onError、observer.onComplete等方法post到Android主執行緒中執行,
e、整體流程圖如下
最后總結一下RxJava2的整個執行流程:
參考
手把手教你使用 RxJava 2.0(一)
RxJava2 原始碼決議(一)
RxJava2 原始碼決議——流程
========== THE END ==========

轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/28331.html
標籤:Android
上一篇:Only fullscreen opaque activities can request orientation 原因及解決方案
