主頁 > 移動端開發 > RxJava2 使用 及 原始碼閱讀

RxJava2 使用 及 原始碼閱讀

2020-09-13 23:36:20 移動端開發

RxJava2 使用 及 原始碼閱讀

RxJava是什么?根據RxJava在GitHub上給出的描述:
RxJava – Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java

大致意思是:
RxJava—一個可以在JVM上運行的,基于觀察者模式 實作異步操作的java庫,

RxJava的作用:
就是異步RxJava的使用,可以使“邏輯復雜的代碼”保持極強的閱讀性,

Rxjava github地址

RxAndorid的作用:
Android中RxAndorid與RxJava配合使用; RxAndorid 封裝了AndroidSchedulers.mainThread(),Android開發者使用程序中,可以輕松的將任務post Andorid主執行緒中,執行頁面更新操作,

RxAndroid github地址

使用方式

1、Observable

  • Observable:被觀察者
  • Observer:觀察者,可接收Observable發送的資料

a、Rxjava 實作執行緒切換:

//
Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        //1、“異步執行緒” 執行耗時操作
        //2、“執行完畢” 呼叫onNext觸發回呼,通知觀察者
        e.onNext("1");
        e.onComplete();
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                // 訂閱執行緒  訂閱的那一刻在訂閱執行緒中執行
            }

            @Override
            public void onNext(String value) {
                // “主執行緒”執行的方法
            }

            @Override
            public void one rror(Throwable e) {
                // "主執行緒"執行的方法
            }

            @Override
            public void onComplete() {
                // "主執行緒"執行的方法
            }
        });

b、Rxjava 使用運算子

Observable.create(new ObservableOnSubscribe<String>() {
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception {
        // IO 執行緒
        // 請求網路資料
        e.onNext("123456");
    }
}).map(new Function<String, Integer>() {
    @Override
    public Integer apply(String s) {
        // IO 執行緒
        // 網路資料決議(資料轉化)
        //
        // throw new RequestFailException("獲取網路請求失敗");
        return 123;
    }
}).doOnNext(new Consumer<Integer>() {    //保存登錄結果UserInfo
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // IO 執行緒
        // 保存網路資料

    }
}).subscribeOn(Schedulers.io())   //IO執行緒
.observeOn(AndroidSchedulers.mainThread())  //主執行緒
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(@NonNull Integer bean) throws Exception {
        // 更新UI
    }
}, new Consumer<Throwable>() {
    @Override
    public void accept(@NonNull Throwable throwable) throws Exception {
        // 錯誤 顯示錯誤頁面
    }
});

2、Flowable

Flowable是為了應對Backpressure產生的,
Flowable是一個被觀察者,與Subscriber(觀察者)配合使用

//
Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
        //1、“異步執行緒” 執行耗時操作
        //2、“執行完畢” 呼叫onNext觸發回呼,通知觀察者
        emitter.onNext(0);
        emitter.onComplete();
    }
    // 若消費者消費能力不足,則拋出MissingBackpressureException例外
}, BackpressureStrategy.ERROR)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                // 訂閱時執行,發生在“訂閱執行緒”
                // 這個方法是用來向生產者申請可以消費的事件數量
                // 這里表明消費者擁有Long.MAX_VALUE的消費能力
                s.request(Long.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                // “主執行緒”執行的方法
            }

            @Override
            public void one rror(Throwable t) {
                // "主執行緒"執行的方法
            }

            @Override
            public void onComplete() {
                // "主執行緒"執行的方法
            }
        });

a、 Backpressure(背壓)

Backpressure(背壓)生產者的生產速度大于消費者的消費能力引起的問題,

在RxJava中有一種情況就是被觀察者發送訊息十分迅速以至于觀察者不能及時的回應這些訊息

例如:

Observable.create(new ObservableOnSubscribe<Integer>() {
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception {
	    // “異步執行緒”中 生產者有無限的生產能力
        while (true){
            e.onNext(1);
        }
    }
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
    @Override
    public void accept(Integer integer) throws Exception {
	    // “主執行緒”中 消費者消費能力不足,從而造成事件無限堆積,最后導致OOM
        Thread.sleep(2000);
        System.out.println(integer);
    }
});

異步執行緒中 生產者有無限的生產能力;
主執行緒中 消費者消費能力不足,從而造成事件無限堆積,最后導致OOM,

上述的現象,有個專有的名詞來來形容,即:Backpressure(背壓)

b、Subscription.request(long n);

Subscription.request(long n) 方法是用來向生產者申請可以消費的事件數量

  • 當呼叫了request(long n)方法后,生產者便發送對應數量的事件供消費者消費;
  • 如果不顯示呼叫request就表示消費能力為0

在異步呼叫時,RxJava中有個快取池,用來快取消費者處理不了暫時快取下來的資料,快取池的默認大小為128,即只能快取128個事件,
無論request()中傳入的數字比128大或小,快取池中在剛開始都會存入128個事件;當然如果本身并沒有這么多事件需要發送,則不會存128個事件,

  • BackpressureStrategy.ERROR策略下,如果生產者生產的事件大于128個,快取池便會溢位,從而拋出MissingBackpressureException例外;
  • BackpressureStrategy.BUFFER策略:將RxJava中默認的128個事件的快取池換成一個更大的快取池,這樣,消費者通過request()即使傳入一個很大的數字,生產者也會生產事件,但是這種方式比較消耗記憶體,除非是我們比較了解消費者的消費能力,能夠把握具體情況,不會產生OOM,總之BUFFER要慎用,
  • BackpressureStrategy.DROP策略:當消費者處理不了事件,則丟棄,消費者通過request()傳入其需求n,然后生產者把n個事件傳遞給消費者供其消費,其他消費不掉的事件就丟掉,
  • BackpressureStrategy.LATEST策略: LATEST與DROP功能基本一致,消費者通過request()傳入其需求n,然后生產者把n個事件傳遞給消費者供其消費,其他消費不掉的事件就丟掉,唯一的區別就是LATEST總能使消費者能夠接收到生產者產生的最后一個事件,

原始碼閱讀——簡單例子 (一)

注:當前使用的原始碼版本 rxjava:2.1.9

從這段不涉及運算子和執行緒切換的簡單例子開始:

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }

    @Override
    public void onNext(String o) {

    }

    @Override
    public void one rror(@NonNull Throwable e) {
        Log.d(TAG, "onError data is :" + e.toString());
    }

    @Override
    public void onComplete() {
        Log.d(TAG, "onComplete");
    }
};

// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 訂閱
observable.subscribe(observer);

a、ObservableOnSubscribe.java

先看一下ObservableOnSubscribe.java這個類

public interface ObservableOnSubscribe<T> {
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
}

由代碼可知 ObservableOnSubscribe是一個回呼介面,回呼方法中引數為ObservableEmitter,下邊看一下ObservableEmitter 這個類,

ObservableEmitter.java

ObservableEmitter字面意思是被觀察者發射器,看一下原始碼:

public interface ObservableEmitter<T> extends Emitter<T> {

    void setDisposable(@Nullable Disposable d);

    void setCancellable(@Nullable Cancellable c);

    boolean isDisposed();

    @NonNull
    ObservableEmitter<T> serialize();

    @Experimental
    boolean tryOnError(@NonNull Throwable t);
}

ObservableEmitter是對Emitter的擴展,而擴展的方法正是 RxJava2.0 之后引入的,提供了可中途取消等新能力,我們看 Emitter 原始碼:

public interface Emitter<T> {

    void onNext(@NonNull T value);

    void one rror(@NonNull Throwable error);

    void onComplete();
}

Emitter字面意思是發射器,這里邊的三個方法,大家都很熟悉了,其對應了以下這段代碼:

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}

回呼說完,下邊我們來看Observable.create(ObservableOnSubscribe<T> source) 這段代碼,

b、Observable.create(ObservableOnSubscribe source)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
  • RxJavaPlugins 先忽略
  • 我們看到傳入的ObservableOnSubscribe被用來創建ObservableCreate,其實ObservableCreate就是Observable的一個實作類

因此 Observable.create(ObservableOnSubscribe<T> source) 這段代碼,實際是:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO執行緒中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
  • 這里我們知道:當 ObservableOnSubscribe.subscribe 方法被執行時,用戶通過呼叫ObservableEmitter.onNext方法,將資料發送出去(發送給觀察者)

下邊我們看一下ObservableCreate 這個類

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}
  • ObservableOnSubscribe.subscribe 方法是在ObservableCreate.subscribeActual 方法中第四行中被執行了;subscribe方法中,用戶通過呼叫ObservableEmitter.onNext方法,將資料發送出去;
  • subscribeActual方法第二行,呼叫了observer.onSubscribe(parent);方法, 訂閱發生時,在訂閱執行緒主動執行了observeronSubscribe方法;
  • CreateEmitter 是對ObservableCreate.subscribeActual(Observer<? super T> observer)方法傳入的Observer的封裝;
  • CreateEmitter的作用是任務取消時,可以不再回呼其封裝的觀察者;observeronNext方法,由CreateEmitter.onNext方法呼叫;

Observable.create(ObservableOnSubscribe<T> source); 方法最侄訓傳一個 ObservableCreate 物件,
下邊看 observable.subscribe(observer); 方法

c、observable.subscribe(observer);

  • observable.subscribe(observer); 即 訂閱發生的那一刻,
  • 這里 observable.subscribe(observer); 實際是ObservableCreate.subscribe(observer);

下邊查看Observablesubscribe(observer)方法

Observable.subscribe(Observer observer)

public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
        // Observable的subscribe方法,實際執行的是subscribeActual方法
        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        RxJavaPlugins.onError(e);
        //
        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}
  • 呼叫 observable.subscribe(observer); 方法時,實際是呼叫了observable.subscribeActual(observer) 方法,
  • observableObservableCreate的參考,因此這里呼叫的是ObservableCreate.subscribeActual(observer) 方法,

我們又回到 ObservableCreate 這個類的subscribeActual方法

ObservableCreate.java

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //  subscribeActual 方法在 訂閱發生的那一刻被呼叫 既 observable.subscribe(observer);時被呼叫
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
	    // 若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回呼
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        // 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}
  • subscribeActual 方法在 訂閱發生的那一刻被呼叫的;在 observable.subscribe(observer); 時被呼叫;
  • observer.onSubscribe(parent); 訂閱發生時,在訂閱執行緒回呼observeronSubscribe方法
  • subscribeActual 方法中,傳入的Observer會被包裝成一個CreateEmitter;若中途任務取消,通過CreateEmitter 可終止對observer中方法onNext 、onError 等的回呼;

subscribeActual 中第二行代碼 observer.onSubscribe(parent);

observer.onSubscribe(parent); 訂閱發生時,執行 觀察者的onSubscribe(Disposable d) 方法,這里回到了以下代碼

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        Log.d(TAG, "onSubscribe");
    }
    // ... 省略onNext、onError、onComplete
};
  • 這里傳入的引數為 new CreateEmitter<T>(observer) ,其實作了Disposable介面,若任務取消,則不回呼傳入的觀察者observer 對應的onNext 、onError、onComplete 等方法

subscribeActual 中第四行代碼 source.subscribe(parent);

source.subscribe(parent);ObservableOnSubscribe.subscribe(new CreateEmitter<T>(observer));

代碼最侄訓到ObservableOnSubscribesubscribe :

new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
}
  • subscribe中,呼叫到 CreateEmitter 類的onNext 、onComplete、onError 方法,將資料發送CreateEmitter中的觀察者

到此,“這段不涉及運算子和執行緒切換的簡單例子” 的代碼跟蹤結束,

原始碼閱讀——執行緒切換 (二)

注:當前使用的原始碼版本 rxjava:2.1.9

從這段執行緒切換的簡單例子開始:

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 訂閱執行緒  訂閱的那一刻在訂閱執行緒中執行
    }

    @Override
    public void onNext(String o) {
        // Android 主執行緒中執行
    }

    @Override
    public void one rror(@NonNull Throwable e) {
        // Android 主執行緒中執行
    }

    @Override
    public void onComplete() {
        // Android 主執行緒中執行
    }
};

// 創建被觀察者
Observable observable = Observable.create(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO執行緒中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
// 被觀察者 IO 執行緒
observable = observable.subscribeOn(Schedulers.io());
// 觀察者  Android主執行緒
observable = observable.observeOn(AndroidSchedulers.mainThread());
// 訂閱
observable.subscribe(observer);

先來個我總結的RxJava2的整個代碼執行流程:
這里寫圖片描述

a、Observable.create(ObservableOnSubscribe source)

原始碼閱讀——簡單例子 (一) 中我們了解到了Observable.create(ObservableOnSubscribe<T> source)實際是 如下代碼:

//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO執行緒中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});

  • ObservableCreate 中含有一個subscribeActual(observer) 方法,用于執行傳入觀察者的observer.onSubscribe方法,和間接呼叫 觀察者的onNext、onComplete 等方法;

ObservableCreate

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    // 省略部分代碼 ...
}
  • subscribeActual方法第二行,呼叫了傳入的觀察者的observer.onSubscribe(parent);方法; 訂閱發生時,在訂閱執行緒主動執行了observeronSubscribe方法;
  • subscribeActual方法第四行,呼叫了傳入的觀察者的observer.subscribe 方法;subscribe方法中,用戶通過呼叫CreateEmitter.onNext方法,將資料發送出去;
  • CreateEmitter 是對ObservableCreate.subscribeActual(Observer<? super T> observer)方法傳入的Observer的封裝;
  • CreateEmitter的作用是任務取消時,可以不再回呼其封裝的觀察者;observeronNext方法,由CreateEmitter.onNext方法呼叫;

下邊查看observable.subscribeOn(Schedulers.io())相關代碼

注:
ObservableEmitterCreateEmitter的參考,是對Observer的進一步封裝,CreateEmitter在執行onNext時,如果任務取消,則不再回呼ObserveronNext方法,

b、observable.subscribeOn(Schedulers.io())

下邊我們查看Observable 類的subscribeOn(Scheduler scheduler)方法

Observable.java

public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    // 生成一個ObservableSubscribeOn物件
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
  • 繼續忽略RxJavaPlugins
  • 最侄訓傳一個ObservableSubscribeOn 物件

這里Observable observable = observableCreate.subscribeOn(Schedulers.io())代碼實際是

ObservableSubscribeOn observable = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
  • 因此 observable.subscribeOn(Schedulers.io()) 回傳的是一個ObservableSubscribeOn 的參考

下邊查看ObservableSubscribeOn

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    // ... 省略部分代碼
}

看一下ObservableSubscribeOn中的subscribeActual 方法

  • subscribeActual 方法第二行代碼中,執行了傳入ObserveronSubscribe 方法;
  • subscribeActual 方法第三行: 在 scheduler 對應的IO執行緒中,執行observableCreatesubscribe 方法,傳入引數為SubscribeOnObserver,即:IO執行緒中 執行observableCreate.subscribe(new SubscribeOnObserver(observer));

因此,無論ObservableSubscribeOn.subscribeActual(observer)在哪個執行緒中被呼叫observableCreate.subscribe(new SubscribeOnObserver<T>(observer))均在IO執行緒中執行,因此觀察者的e.onNext("hello"); e.onComplete(); 亦在IO執行緒中執行;

c、observable.observeOn(AndroidSchedulers.mainThread())

下邊我們查看Observable 類的observeOn(Scheduler scheduler)方法

public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}
// 
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

這里可以看到 Observable observable = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread())實際是:

ObservableObserveOn observable = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);

因此 ,observable.observeOn(AndroidSchedulers.mainThread()) 回傳的是ObservableObserveOn 的參考,

下邊查看ObservableObserveOn

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代碼
}

看一下ObservableObserveOn中的subscribeActual 方法

  • subscribeActual 方法第五行代碼,實際為observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
  • ObserveOnObserver 的作用是在ObserveOnObserveronNext方法被實行時;將observeronNext方法post到 Android主執行緒中;

d、observable.subscribe(observer)

  • 我們知道Observablesubscribe(Observer<? super T> observer)方法,實際呼叫到了ObservablesubscribeActual(Observer<? super T> observer) 方法;
  • 而這里的observable 實際是ObservableObserveOn的參考;

因此,observable.subscribe(observer)實際執行的是observableObserveOn.subscribeActual(observer)

到這里,我們 執行緒切換 (二) 的小例子變換為了以下代碼:

// 創建觀察者
Observer observer = new Observer<String>() {
    @Override
    public void onSubscribe(@NonNull Disposable d) {
        // 訂閱執行緒  訂閱的那一刻在訂閱執行緒中執行
    }

    @Override
    public void onNext(String o) {
        // Android 主執行緒中執行
    }

    @Override
    public void one rror(@NonNull Throwable e) {
        // Android 主執行緒中執行
    }

    @Override
    public void onComplete() {
        // Android 主執行緒中執行
    }
};
//
ObservableCreate observableCreate = new ObservableCreate<T>(new ObservableOnSubscribe() {
    @Override
    public void subscribe(@NonNull ObservableEmitter e) throws Exception {
        // IO執行緒中執行
        e.onNext("hello");
        e.onNext("world");
        e.onComplete();
    }
});
//
ObservableSubscribeOn observableSubscribeOn = new ObservableSubscribeOn<T>(observableCreate, Schedulers.io())
//
ObservableObserveOn observableObserveOn = new ObservableObserveOn<T>(observableSubscribeOn, AndroidSchedulers.mainThread(), false, 128);
//
observableObserveOn.subscribeActual(observer);

下邊我們查看observableObserveOn.subscribeActual(observer)

ObservableObserveOn.java

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
    final Scheduler scheduler;
    final boolean delayError;
    final int bufferSize;
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
	    // source 為 observableSubscribeOn
        super(source);
        // scheduler 為AndroidSchedulers.mainThread()
        this.scheduler = scheduler;
        // false
        this.delayError = delayError;
        // 128
        this.bufferSize = bufferSize;
    }

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
	    // AndroidSchedulers.mainThread() 為 HandlerScheduler,因此會走到else部分代碼
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        }
        // 代碼會走到else 部分
         else {
            Scheduler.Worker w = scheduler.createWorker();
            // source 為 observableSubscribeOn
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    // ... 省略部分代碼
}
  • subscribeActual 方法中,AndroidSchedulers.mainThread()HandlerScheduler ,因此 if 中的判斷陳述句直接忽略,直接走到代碼的 else 部分,
  • subscribeActual 方法中,將觀察者observer封裝成了ObserveOnObserver;并且呼叫observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))
  • observableSubscribeOn.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))實際是
ObserveOnObserver observeOnObserver = new ObserveOnObserver<T>(observer, w, delayError, bufferSize)
// 1、“訂閱執行緒中” —— 執行onSubscribe, 實際執行的是observer的onSubscribe方法
observeOnObserver.onSubscribe(new SubscribeOnObserver<T>(observeOnObserver));
// 2、“IO程中” —— 執行subscribe ;IO執行緒 subscribe方法中,用戶主動呼叫ObserveOnObserver的onNext、onError、onComplete方法,將資料發出去
observableCreate.subscribe(new SubscribeOnObserver<T>(observeOnObserver))
  • 用戶呼叫SubscribeOnObserveronNext 是將資料發送出去
  • SubscribeOnObserver.onNext呼叫了observeOnObserver.onNext
  • observeOnObserver.onNext通過HandlerSchedulerobserver.onNext、observer.onError、observer.onComplete 等方法post到Android主執行緒中執行,

e、整體流程圖如下

最后總結一下RxJava2的整個執行流程:

這里寫圖片描述

參考

手把手教你使用 RxJava 2.0(一)
RxJava2 原始碼決議(一)
RxJava2 原始碼決議——流程

========== THE END ==========

wx_gzh.jpg

轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/28331.html

標籤:Android

上一篇:Only fullscreen opaque activities can request orientation 原因及解決方案

下一篇:Android JsBridge原始碼學習

標籤雲
其他(157675) Python(38076) JavaScript(25376) Java(17977) C(15215) 區塊鏈(8255) C#(7972) AI(7469) 爪哇(7425) MySQL(7132) html(6777) 基礎類(6313) sql(6102) 熊猫(6058) PHP(5869) 数组(5741) R(5409) Linux(5327) 反应(5209) 腳本語言(PerlPython)(5129) 非技術區(4971) Android(4554) 数据框(4311) css(4259) 节点.js(4032) C語言(3288) json(3245) 列表(3129) 扑(3119) C++語言(3117) 安卓(2998) 打字稿(2995) VBA(2789) Java相關(2746) 疑難問題(2699) 细绳(2522) 單片機工控(2479) iOS(2429) ASP.NET(2402) MongoDB(2323) 麻木的(2285) 正则表达式(2254) 字典(2211) 循环(2198) 迅速(2185) 擅长(2169) 镖(2155) 功能(1967) .NET技术(1958) Web開發(1951) python-3.x(1918) HtmlCss(1915) 弹簧靴(1913) C++(1909) xml(1889) PostgreSQL(1872) .NETCore(1853) 谷歌表格(1846) Unity3D(1843) for循环(1842)

熱門瀏覽
  • 【從零開始擼一個App】Dagger2

    Dagger2是一個IOC框架,一般用于Android平臺,第一次接觸的朋友,一定會被搞得暈頭轉向。它延續了Java平臺Spring框架代碼碎片化,注解滿天飛的傳統。嘗試將各處代碼片段串聯起來,理清思緒,真不是件容易的事。更不用說還有各版本細微的差別。 與Spring不同的是,Spring是通過反射 ......

    uj5u.com 2020-09-10 06:57:59 more
  • Flutter Weekly Issue 66

    新聞 Flutter 季度調研結果分享 教程 Flutter+FaaS一體化任務編排的思考與設計 詳解Dart中如何通過注解生成代碼 GitHub 用對了嗎?Flutter 團隊分享如何管理大型開源專案 插件 flutter-bubble-tab-indicator A Flutter librar ......

    uj5u.com 2020-09-10 06:58:52 more
  • Proguard 常用規則

    介紹 Proguard 入口,如何查看輸出,如何使用 keep 設定入口以及使用實體,如何配置壓縮,混淆,校驗等規則。

    ......

    uj5u.com 2020-09-10 06:59:00 more
  • Android 開發技術周報 Issue#292

    新聞 Android即將獲得類AirDrop功能:可向附近設備快速分享檔案 谷歌為安卓檔案管理應用引入可安全隱藏資料的Safe Folder功能 Android TV新主界面將顯示電影、電視節目和應用推薦內容 泄露的Android檔案暗示了傳說中的谷歌Pixel 5a與折疊屏新機 谷歌發布Andro ......

    uj5u.com 2020-09-10 07:00:37 more
  • AutoFitTextureView Error inflating class

    報錯: Binary XML file line #0: Binary XML file line #0: Error inflating class xxx.AutoFitTextureView 解決: <com.example.testy2.AutoFitTextureView android: ......

    uj5u.com 2020-09-10 07:00:41 more
  • 根據Uri,Cursor沒有獲取到對應的屬性

    Android: 背景:呼叫攝像頭,拍攝視頻,指定保存的地址,但是回傳的Cursor檔案,只有名稱和大小的屬性,沒有其他諸如時長,連ID屬性都沒有 使用 cursor.getInt(cursor.getColumnIndexOrThrow(MediaStore.Video.Media.DURATIO ......

    uj5u.com 2020-09-10 07:00:44 more
  • Android連載29-持久化技術

    一、持久化技術 我們平時所使用的APP產生的資料,在記憶體中都是瞬時的,會隨著斷電、關機等丟失資料,因此android系統采用了持久化技術,用于存盤這些“瞬時”資料 持久化技術包括:檔案存盤、SharedPreference存盤以及資料庫存盤,還有更復雜的SD卡記憶體儲。 二、檔案存盤 最基本存盤方式, ......

    uj5u.com 2020-09-10 07:00:47 more
  • Android Camera2Video整合到自己專案里

    背景: Android專案里呼叫攝像頭拍攝視頻,原本使用的 MediaStore.ACTION_VIDEO_CAPTURE, 后來因專案需要,改成了camera2 1.Camera2Video 官方demo有點問題,下載后,不能直接整合到專案 問題1.多次拍攝視頻崩潰 問題2.雙擊record按鈕, ......

    uj5u.com 2020-09-10 07:00:50 more
  • Android 開發技術周報 Issue#293

    新聞 谷歌為Android TV開發者提供多種新功能 Android 11將自動填表功能整合到鍵盤輸入建議中 谷歌宣布Android Auto即將支持更多的導航和數字停車應用 谷歌Pixel 5只有XL版本 搭載驍龍765G且將比Pixel 4更便宜 [圖]Wear OS將迎來重磅更新:應用啟動時間 ......

    uj5u.com 2020-09-10 07:01:38 more
  • 海豚星空掃碼投屏 Android 接收端 SDK 集成 六步驟

    掃碼投屏,開放網路,獨占設備,不需要額外下載軟體,微信掃碼,發現設備。支持標準DLNA協議,支持倍速播放。視頻,音頻,圖片投屏。好點意思。還支持自定義基于 DLNA 擴展的操作動作。好像要收費,沒體驗。 這里簡單記錄一下集成程序。 一 跟目錄的build.gradle添加私有mevan倉庫 mave ......

    uj5u.com 2020-09-10 07:01:43 more
最新发布
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:40:31 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:40:11 more
  • 歡迎頁輪播影片

    如圖,引導開始,球從上落下,同時淡入文字,然后文字開始輪播,最后一頁時停止,點擊進入首頁。 在來看看效果圖。 重力球先不講,主要歡迎輪播簡單實作 首先新建一個類 TextTranslationXGuideView,用于影片展示 文本是類似的,最后會有個圖片箭頭影片,布局很簡單,就是一個 TextVi ......

    uj5u.com 2023-04-20 08:39:36 more
  • 【FAQ】關于華為推送服務因營銷訊息頻次管控導致服務通訊類訊息

    一. 問題描述 使用華為推送服務下發IM訊息時,下發訊息請求成功且code碼為80000000,但是手機總是收不到訊息; 在華為推送自助分析(Beta)平臺查看發現,訊息發送觸發了頻控。 二. 問題原因及背景 2023年1月05日起,華為推送服務對咨詢營銷類訊息做了單個設備每日推送數量上限管理,具體 ......

    uj5u.com 2023-04-20 08:39:13 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:16:23 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:16:15 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:15:46 more
  • iOS從UI記憶體地址到讀取成員變數(oc/swift)

    開發除錯時,我們發現bug時常首先是從UI顯示發現例外,下一步才會去定位UI相關連的資料的。XCode有給我們提供一系列debug工具,但是很多人可能還沒有形成一套穩定的除錯流程,因此本文嘗試解決這個問題,順便提出一個暴論:UI顯示例外問題只需要兩個步驟就能完成定位作業的80%: 定位例外 UI 組 ......

    uj5u.com 2023-04-19 09:14:53 more
  • FIDE重磅更新!性能飛躍!體驗有禮!

    FIDE 開發者工具重構升級啦!實作500%性能提升,誠邀體驗! 一直以來不少開發者朋友在社區反饋,在使用 FIDE 工具的程序中,時常會遇到諸如加載不及時、代碼預覽/渲染性能不如意的情況,十分影響開發體驗。 作為技術團隊,我們深知一件趁手的開發工具對開發者的重要性,因此,在2023年開年,FinC ......

    uj5u.com 2023-04-19 09:14:08 more
  • 游戲內嵌社區服務開放,助力開發者提升玩家互動與留存

    華為 HMS Core 游戲內嵌社區服務提供快速訪問華為游戲中心論壇能力,支持玩家直接在游戲內瀏覽帖子和交流互動,助力開發者擴展內容生產和觸達的場景。 一、為什么要游戲內嵌社區? 二、游戲內嵌社區的典型使用場景 1、游戲內打開論壇 您可以在游戲內繪制論壇入口,為玩家提供沉浸式發帖、瀏覽、點贊、回帖、 ......

    uj5u.com 2023-04-19 09:08:34 more