一、事件分發流程:
常規創建Observable觀察者:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("");
emitter.onComplete();
}
}).flatMap(new Function<String, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull String s) throws Exception {
System.out.println("我們這里需要重寫的方法");
return Observable.just("");
}
}).observeOn(Schedulers.io())
.subscribeOn(Schedulers.newThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
System.out.println("方法一:onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
System.out.println("方法二:onNext");
}
@Override
public void one rror(@NonNull Throwable e) {
System.out.println("方法三:onError");
}
@Override
public void onComplete() {
System.out.println("方法四:onComplete");
}
});
}
從Observable.create()點進去,通過一個靜態方法,追蹤到
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
}
這里構造方法中保存的source就是外部傳進來的ObservableOnSubscribe,存盤下來后,當外部呼叫subscribe時,會執行到Observable的subscribeActual方法,而ObservableCreate實作了該方法,
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
這里做了三件事情:
1、創建分發器CreateEmitter物件,
2、回呼生命周期給觀察者,
3、執行subscribe方法,并傳入CreateEmitter物件,
解釋一:創建CreateEmitter用來把物件句柄傳遞給外部使用,同時把觀察者和被觀察者關聯起來,這里的CreateEmitter就是被觀察者,而Observer物件則屬于觀察者
解釋二:這里的observer.onSubscribe(parent),則是執行了 “System.out.println("方法一:onSubscribe")”,在整個處理流開始執行之前,把生命周期回呼給了訂閱者
解釋三:通過source.subscribe(parent),把分發器傳遞給被觀察著用來發送事件,這里的subscribe,就是呼叫了外部傳入的ObservableOnSubscribe物件,通知被觀察者開始發送事件,這里會執行到”System.out.println("開始發送事件")“,我們可以通過subscribe中傳遞過來的emitter物件進行事件的分發處理
當我們呼叫emitter.onNext("")時,可以理解為被觀察者發送了一個事件出來,因為CreateEmitter實作了ObservableEmitter,因此這里會回呼到CreateEmitter的onNext方法,代碼如下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
// 會回呼到這里
@Override
public void onNext(T t) {
if (t == null) {
one rror(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
在onNext內部,CreateEmitter會呼叫觀察者物件的onNext方法,把事件(也就是資料)傳遞給觀察者,也就是會執行到前面demo中的System.out.println("方法二:onNext");方法,同時我們也可以從代碼中看到,當判空時,會執行到onError()方法,這里同樣會呼叫觀察者的onError()方法,把事件傳遞出去
二、訂閱與取消
在CreateEmitter的每次回呼中,我們都可以看到isDisposed()的判斷,這個判斷是用來檢測觀察者是否已取消訂閱,如果觀察者取消訂閱的話,那么就不會把執行結果通知到觀察者,這里的典型使用場景:在activity中請求網路資料,當資料請求回來后重繪界面,但是如果資料請求回來前用戶手動關閉了activity的話,當資料請求回來后,因為activity內部的view已經被銷毀,會出現控制元件報空指標的問題,在這里我們可以通過dispose()方法取消事件的回呼傳遞,這里涉及到一個類:DisposableHelper
public enum DisposableHelper implements Disposable {
/**
* The singleton instance representing a terminal, disposed state, don't leak it.
*/
DISPOSED
;
/**
* Checks if the given Disposable is the common {@link #DISPOSED} enum value.
* @param d the disposable to check
* @return true if d is {@link #DISPOSED}
*/
public static boolean isDisposed(Disposable d) {
return d == DISPOSED;
}
}
DisposableHelper是一個列舉類,有一個DISPOSED列舉型別,用來記錄觀察者當前的狀態,當執訂閱者執行,代碼如下:
@Override
public void dispose() {
DisposableHelper.dispose(this);
}
其內部實作為:
public static boolean dispose(AtomicReference<Disposable> field) {
Disposable current = field.get();
Disposable d = DISPOSED;
if (current != d) {
current = field.getAndSet(d);
if (current != d) {
if (current != null) {
current.dispose();
}
return true;
}
}
return false;
}
通過這里把觀察者的狀態設定為DISPOSED,用來標志觀察者已取消訂閱,在被觀察者每次執行生命周期時,通過對觀察者的狀態判斷,用以確定是否需要把資料回呼給被觀察者
三、執行緒調度:
這里通過observeOn舉例說明:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
// 省略非關鍵代碼
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
ObservableObserveOn實作了AbstractObservableWithUpstream,進而實作了Observable
這里可以追蹤到ObservableObserveOn的原始碼中,可以看到:
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
// 省略非關鍵代碼
}
它的構造器呼叫了super(),而super內部則保存了前一個觀察者物件:
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
}
當我們呼叫subcribe()時,內部會執行到ObservableObserveOn如下方法:
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 這里的if用來判斷是否調度為當前執行緒,如果是,則不需要調度器來切換新執行緒
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
通過 scheduler.createWorker()來切換到指定的執行緒,此時外部呼叫onNext()時,會執行到這里,在指定執行緒進行處理:
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
// 省略非關鍵代碼
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
schedule是一個抽象方法,我們選擇一個具體實作NewThreadWorker,這個NewThreadWorker是用來切換新執行緒的,我們定位到它的具體實作,看下內部是怎么處理的:
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
// 省略非關鍵代碼
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
// 省略非關鍵代碼
}
return sr;
}
ObserveOnObserver繼承了Runnable,這個executor.submit((Callable<Object>)sr);就是執行緒池中用來提交任務的方法,執行緒池內部會呼叫runnable的run方法,從而呼叫到:
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
接著:
void drainNormal() {
// 省略非關鍵代碼
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
// 省略非關鍵代碼
a.onNext(v);
missed = addAndGet(-missed);
if (missed == 0) {
break;
}
}
}
實作生命周期的回呼,類似的,其它生命周期回呼也是同樣的流程
補充一下執行緒調度的各引數說明:
| 引數型別 | 解釋 | 使用場景 |
| Schedulers.immediate() | 當前執行緒 = 不指定執行緒 | 默認 |
| AndroidSchedulers.mainThread() | Android主執行緒 | 操作UI |
| Schedulers.newThread() | 常規新執行緒 | 耗時等操作 |
| Schedulers.io() | io操作執行緒 | 網路請求、讀寫檔案等io密集型操作 |
| Schedulers.computation() | CPU計算操作執行緒 | 大量計算操作 |
四、流式轉換:
當我們呼叫flatMap()時,經過重重方法多載,最侄訓走到這里
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
// 省略非關鍵代碼
return RxJavaPlugins.onAssembly(new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize));
}
我們點擊去ObservableFlatMap,可以看到,它最終也是實作了Observable類,那么前面分析過,當外部呼叫subscribe()時,會到這里:
@Override
public void subscribeActual(Observer<? super U> t) {
// 省略非關鍵代碼
source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
}
我們從MergeObserver類追進去,可以看到,當外部呼叫onNext()時,會走到這里:
@Override
public void onNext(T t) {
// 省略非關鍵代碼
try {
p = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper returned a null ObservableSource");
} catch (Throwable e) {
// 省略非關鍵代碼
}
}
重點在mapper.apply(t)這一行,點擊去可以看到這是一個抽象方法,那么它的具體實作在哪里呢?一步一步的回溯,發現這個mapper就是我們外部傳入的new Function<String, ObservableSource<String>>(),那么這個apply就是外部我們需要重寫的方法,也就是這個列印這個log的位置:System.out.println("我們這里需要重寫的方法");至此,整個回路追蹤可以完整串聯起來
解釋:其它省略的方法內容,因為flatMap可以執行多個,省略的代碼就是回圈執行的分發流程,我們只追尋主要流程,這些分支流程沒有詳細區分
轉載請註明出處,本文鏈接:https://www.uj5u.com/yidong/387948.html
標籤:其他
