原文鏈接: https://segmentfault.com/a/1190000009924164
RxJS 是回應式編程 (reactive programming) 強大的工具,今天我們將深入介紹 Observables 和 Observers 的內容,以及介紹如何創建自己的運算子 (operators),
如果你之前已經使用過 RxJS,并希望了解 Observable 及 Operators (運算子) 的內部作業原理,那么這篇文章非常適合你,
什么是 Observable
Observable 就是一個擁有以下特性的函式:
- 它接收一個
observer物件作為引數,該物件中包含next、error和complete方法 - 它回傳一個函式,用于在銷毀 Observable 時,執行清理操作
在我們實作的示例中,我們將定義一個簡單的 unsubscribe 函式來實作取消訂閱的功能,然而在 RxJS 中,回傳的是 Subcription 物件,該物件中包含一個 unsubscribe 方法,
一個 Observable 物件設定觀察者 (observer),并將它與生產者關聯起來,該生產者可能是 DOM 元素產生的 click 或 input 事件,也可能是更復雜的事件,如 HTTP,
為了更好地理解 Observable,我們來自定義 Observable,首先,我們先來看一個訂閱的例子:
const node = document.querySelector('input[type=text]');
const input$ = Rx.Observable.fromEvent(node, 'input');
input$.subscribe({
next: (event) => console.log(`You just typed ${event.target.value}!`),
error: (err) => console.log(`Oops... ${err}`),
complete: () => console.log(`Complete!`)
});
該示例中,Rx.Observable.formEvent() 方法接收一個 input 元素和事件名作為引數,然后回傳一個 $input Observable 物件,接下來我們使用 subscribe() 方法來定于該 Observable 物件,當觸發 input 事件后,對應的值將會傳遞給 observer 物件,
什么是 Observer
Observer (觀察者) 非常簡單,在上面的示例中,觀察者是一個普通的物件,該物件會作為 subscribe() 方法的引數,此外 subscribe(next, error, complete) 也是一個有效的語法,但在本文中我們將討論物件字面量的形式,
當 Observable 物件產生新值的時候,我們可以通過呼叫 next() 方法來通知對應的觀察者,若出現例外,則會呼叫觀察者的 error() 方法,當我們訂閱 Observable 物件后,只要有新的值,都會通知對應的觀察者,但在以下兩種情況下,新的值不會再通知對應的觀察者:
- 已呼叫 observer 物件的
complete()方法 - 消費者對資料不再感興趣,執行取消訂閱操作
此外在執行最終的 subscribe() 訂閱操作前,我們傳遞的值可以經過一系列的鏈式處理操作,執行對應操作的東西叫運算子,每個運算子執行完后會回傳一個新的 Observable 物件,然后繼續我們的處理流程,
什么是 Operator
正如上面所說的,Observable 物件能夠執行鏈式操作,具體如下所示:
const input$ = Rx.Observable.fromEvent(node, 'input')
.map(event => event.target.value)
.filter(value => value.length >= 2)
.subscribe(value => {
// use the `value`
});
上面代碼的執行流程如下:
- 假設用戶在輸入框中輸入字符
a - Observable 物件回應對應的
input事件,然后把值傳遞給 observer map()運算子回傳一個新的 Observable 物件filter()運算子執行過濾操作,然后又回傳一個新的 Observable 物件- 最后我們通過呼叫
subscribe()方法,來獲取最終的值
簡而言之,Operator 就是一個函式,它接收一個 Observable 物件,然后回傳一個新的 Observable 物件,當我們訂閱新回傳的 Observable 物件時,它內部會自動訂閱前一個 Observable 物件,
自定義 Observable
Observable 建構式
function Observable(subscribe) {
this.subscribe = subscribe;
}
每個 subscribe 回呼函式被賦值給 this.subscribe 屬性,該回呼函式將會被我們或其它 Observable 物件呼叫,
Observer 示例
在我們深入介紹前,我們先來看一個簡單的示例,之前我們已經創建完 Observable 函式,現在我們可以呼叫我們的觀察者 (observer),然后傳遞數值 1,然后訂閱它:
const one$ = new Observable((observer) => {
observer.next(1);
observer.complete();
});
one$.subscribe({
next: (value) => console.log(value) // 1
});
即我們訂閱我們創建的 Observable 實體,然后通過 subscribe() 方法呼叫通過建構式設定的回呼函式,
Observable.fromEvent
下面就是我們需要的基礎結構,即在 Observable 物件上需要新增一個靜態方法 fromEvent :
Observable.fromEvent = (element, name) => { };
接下來我們將參考 RxJS 為我們提供的方法來實作自定義的 fromEvent() 方法:
const node = document.querySelector('input');
const input$ = Observable.fromEvent(node, 'input');
按照上面的使用方式,我們的 fromEvent() 方法需要接收兩個引數,同時需要回傳一個新的 Observable 物件,具體如下:
Observable.fromEvent = (element, name) => {
return new Observable((observer) => {
});
};
接下來我們來實作事件監聽功能:
Observable.fromEvent = (element, name) => {
return new Observable((observer) => {
element.addEventListener(name, (event) => {}, false);
});
};
那么我們的 observer 引數來自哪里? 其實 observer 物件就是包含 next、error 和 complete 方法的物件字面量,
需要注意的是,我們的 observer 引數不會被傳遞,直到 subscribe() 方法被呼叫,這意味著 addEventListener() 方法不會被呼叫,除非你訂閱該 Observable 物件,
當我們呼叫 subscribe() 方法,之前設定的 this.subscribe 回呼函式會被呼叫,對應的引數是我們定義的 observer 物件字面量,接下來將使用新的值,作為 next() 方法的引數,呼叫該方法,
很好,那接下來我們要做什么?之前版本我們只是設定了監聽,但沒有呼叫 observer 物件的 next() 方法,接下來讓我們來修復這個問題:
Observable.fromEvent = (element, name) => {
return new Observable((observer) => {
element.addEventListener(name, (event) => {
observer.next(event);
}, false);
});
};
如你所知,當銷毀 Observables 物件時,需要呼叫一個函式用來執行清理操作,針對目前的場景,在銷毀時我們需要移除事件監聽:
Observable.fromEvent = (element, name) => {
return new Observable((observer) => {
const callback = (event) => observer.next(event);
element.addEventListener(name, callback, false);
return () => element.removeEventListener(name, callback, false);
});
};
我們沒有呼叫 complete() 方法,因為該 Observable 物件處理的 DOM 相關的事件,在時間維度上它們可能是無終止的,
現在讓我們來驗證一下最終實作的功能:
const node = document.querySelector('input');
const p = document.querySelector('p');
function Observable(subscribe) {
this.subscribe = subscribe;
}
Observable.fromEvent = (element, name) => {
return new Observable((observer) => {
const callback = (event) => observer.next(event);
element.addEventListener(name, callback, false);
return () => element.removeEventListener(name, callback, false);
});
};
const input$ = Observable.fromEvent(node, 'input');
const unsubscribe = input$.subscribe({
next: (event) => {
p.innerHTML = event.target.value;
}
});
// automatically unsub after 5ssetTimeout(unsubscribe, 5000);
自定義運算子
創建我們自己的運算子應該會更容易一些,現在我們了解 Observable 和 Observable 背后的概念,我們將在 Observable 的原型物件上添加一個方法:
Observable.prototype.map = function (mapFn) { };
該方法的功能與 JavaScript 中的 Array.prototype.map 方法類似:
const input$ = Observable.fromEvent(node, 'input')
.map(event => event.target.value);
所以我們需要應用回呼函式并呼叫它,這用于獲取我們所需要的資料,在我們這樣做之前,我們需要流中的最新值,這里是巧妙的部分,在 map() 運算子中,我們需要訪問 Observable 實體,因為 map 方法在原型上,我們可以通過以下方式訪問 Observable 實體:
Observable.prototype.map = function (mapFn) {
const input = this;
};
接下來我們在回傳的 Observable 物件中執行 input 物件的訂閱操作:
Observable.prototype.map = function(mapFn) {
const input = this;
return new Observable((observer) => {
return input.subscribe();
});
};
我們回傳了 input.subscribe() 方法執行的結果,因為當我們執行取消訂閱操作時,將會依次呼叫每個 Observable 物件取消訂閱的方法,
最后我們來完善一下 map 運算子的內部代碼:
Observable.prototype.map = function (mapFn) {
const input = this;
return new Observable((observer) => {
return input.subscribe({
next: (value) => observer.next(mapFn(value)),
error: (err) => observer.error(err),
complete: () => observer.complete()
});
});
};
現在我們已經可以執行鏈式操作了:
const input$ = Observable.fromEvent(node, 'input')
.map(event => event.target.value);
input$.subscribe({
next: (value) => {
p.innerHTML = value;
}
});
我有話說
Observable 與 Promise 有什么區別?
Observable(可觀察物件)是基于推送(Push)運行時執行(lazy)的多值集合,
Untitled
- Promise
- 回傳單個值
- 不可取消的
- Observable
- 隨著時間的推移發出多個值
- 可以取消的
- 支持 map、filter、reduce 等運算子
- 延遲執行,當訂閱的時候才會開始執行
什么是 SafeObserver ?
上面的示例中,我們使用一個包含了 next、error、complete 方法的普通 JavaScript 物件來定義觀察者,一個普通的 JavaScript 物件只是一個開始,在 RxJS 5 里面,為開發者提供了一些保障機制,來保證一個更安全的觀察者,以下是一些比較重要的原則:
- 傳入的
Observer物件可以不實作所有規定的方法 (next、error、complete 方法) - 在
complete或者error觸發之后再呼叫next方法是沒用的 - 呼叫
unsubscribe方法后,任何方法都不能再被呼叫了 complete和error觸發后,unsubscribe也會自動呼叫- 當
next、complete和error出現例外時,unsubscribe也會自動呼叫以保證資源不會浪費 next、complete和error是可選的,按需處理即可,不必全部處理
為了完成上述目標,我們得把傳入的匿名 Observer 物件封裝在一個 SafeObserver 里以提供上述保障,
若想進一步了解詳細資訊,請參考 Observable詳解 文章中 "自定義 Observable" 章節的內容,
參考資源
- rxjs-observables-observers-operators
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/254050.html
標籤:其他
