內容導航
目錄- 內容導航
- RxJS是什么
- RxJS的主要成員
- Observable (可觀察物件)
- 創建 Observable
- 訂閱 Observables
- 執行 Observables
- 清理 Observable 執行
- Observer (觀察者)
- Subscription (訂閱)
- Subject (主體)
- 多播的 Observables
- BehaviorSubject
- ReplaySubject
- AsyncSubject
- Scheduler (調度器)
- 調度器型別
- Pipeable(運算子)
- 常用的運算子
- 創建運算子
- 連接創建運算子
- 轉換運算子
- 過濾運算子
- 組合運算子
- 多播運算子
- 錯誤處理運算子
- 工具運算子
- 條件和布爾運算子
- 數學和聚合運算子
- Observable (可觀察物件)
RxJS是什么
RxJS 是一個庫,它通過使用 observable 序列來撰寫異步和基于事件的程式,它提供了一個核心型別 Observable,附屬型別 (Observer、 Schedulers、 Subjects) 和受 [Array#extras] 啟發的運算子 (map、filter、reduce、every, 等等),這些陣列運算子可以把異步事件作為集合來處理,
可以把 RxJS 當做是用來處理事件的 Lodash ,
ReactiveX 結合了 觀察者模式、迭代器模式 和 使用集合的函式式編程,以滿足以一種理想方式來管理事件序列所需要的一切,
RxJS的主要成員
- Observable (可觀察物件): 表示一個概念,這個概念是一個可呼叫的未來值或事件的集合,
- Observer (觀察者): 一個回呼函式的集合,它知道如何去監聽由 Observable 提供的值,
- Subscription (訂閱): 表示 Observable 的執行,主要用于取消 Observable 的執行,
- Operators (運算子): 采用函式式編程風格的純函式 (pure function),使用像
map、filter、concat、flatMap等這樣的運算子來處理集合, - Subject (主體): 相當于 EventEmitter,并且是將值或事件多路推送給多個 Observer 的唯一方式,
- Schedulers (調度器): 用來控制并發并且是中央集權的調度員,允許我們在發生計算時進行協調,例如
setTimeout或requestAnimationFrame或其他,
Observable (可觀察物件)
RxJS 是基于觀察者模式和迭代器模式以函式式編程思維來實作的,RxJS 中含有兩個基本概念:Observables 與 Observer,Observables 作為被觀察者,是一個值或事件的流集合;而 Observer 則作為觀察者,根據 Observables 進行處理,Observables 是多個值的惰性推送集合,
- of():用于創建簡單的Observable,該Observable只發出給定的引數,在發送完這些引數后發出完成通知.
- from():從一個陣列、類陣列物件、promise、迭代器物件或者類Observable物件創建一個Observable.
- fromEvent(),:把event轉換成Observable.
- range():在指定起始值回傳指定數量數字.
- interval():基于給定時間間隔發出數字序列,回傳一個發出無限自增的序列整數,可以選擇固定的時間間隔進行發送,
- timer():創建一個Observable,該Observable在初始延時之后開始發送并且在每個時間周期后發出自增的數字
創建 Observable
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const Observable1 = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
setTimeout(() => {
subscriber.next(4);
subscriber.complete();
}, 1000);
} catch (err) {
subscriber.error(err); //傳遞一個錯誤物件,如果捕捉到例外的話,
}
});
const Observable2 = from([
{ name: 'Dave', age: 34, salary: 2000 },
{ name: 'Nick', age: 37, salary: 32000 },
{ name: 'Howie', age: 40, salary: 26000 },
{ name: 'Brian', age: 40, salary: 30000 },
{ name: 'Kevin', age: 47, salary: 24000 },
]);
const Observable3 = of("Dave","Nick");//把所有引陣列合到陣列,逐個提供給消費者
const Observable4 = range(1,10);
const Observable5 = interval(3000);//從零開始每3000毫秒自增并提供給消費者
const Observable6 = timer(3000,1000);//等待3000毫秒后,從零開始每1000毫秒自增并提供給消費者
訂閱 Observables
因為 Observable 執行可能會是無限的,并且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行,因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或記憶體資源,
當呼叫了 observable.subscribe ,觀察者會被附加到新創建的 Observable 執行中,這個呼叫還回傳一個物件,即 Subscription (訂閱):
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable1 = range(1,10);
observable1.subscribe(
num => {
console.log(num);
},
err => console.log(err),
() => console.log("Streaming is over.")
);
執行 Observables
Observable 執行可以傳遞三種型別的值:
- "Next" 通知: 發送一個值,比如數字、字串、物件,等等,
- "Error" 通知: 發送一個 JavaScript 錯誤 或 例外,
- "Complete" 通知: 不再發送任何值,
"Next" 通知是最重要,也是最常見的型別:它們表示傳遞給觀察者的實際資料,"Error" 和 "Complete" 通知可能只會在 Observable 執行期間發生一次,并且只會執行其中的一個,
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
import { map } from 'rxjs/operators';
const observable = new Observable(subscriber => {
try{
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
subscriber.next(4); // 因為違反規約,所以不會發送
} catch (err) {
subscriber.error(err); //傳遞一個錯誤物件,如果捕捉到例外的話,
}
});
清理 Observable 執行
因為 Observable 執行可能會是無限的,并且觀察者通常希望能在有限的時間內中止執行,所以我們需要一個 API 來取消執行,因為每個執行都是其對應觀察者專屬的,一旦觀察者完成接收值,它必須要一種方法來停止執行,以避免浪費計算能力或記憶體資源
當你訂閱了 Observable,你會得到一個 Subscription ,它表示進行中的執行,只要呼叫
unsubscribe()方法就可以取消執行,
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
const observable = new Observable(subscriber => {
let intervalID = setInterval(() => {
subscriber.next('hi');
}, 1000);
// 提供取消和清理 interval 資源的方法
return function unsubscribe() {
clearInterval(intervalID);
};
});
let subscription = observable.subscribe(x => console.log(x));
subscription.unsubscribe();
Observer (觀察者)
觀察者是由 Observable 發送的值的消費者,觀察者只是一組回呼函式的集合,每個回呼函式對應一種 Observable 發送的通知型別:next、error 和 complete ,下面的示例是一個典型的觀察者物件:
觀察者只是有三個回呼函式的物件,每個回呼函式對應一種 Observable 發送的通知型別,
observable.subscribe(
next: x => console.log('Observer got a next value: ' + x),
error: err => console.error('Observer got an error: ' + err),
complete: () => console.log('Observer got a complete notification')
);
Subscription (訂閱)
Subscription 是表示可清理資源的物件,通常是 Observable 的執行,Subscription 有一個重要的方法,即 unsubscribe,它不需要任何引數,只是用來清理由 Subscription 占用的資源,在上一個版本的 RxJS 中,Subscription 叫做 "Disposable" (可清理物件),
Subscription 基本上只有一個
unsubscribe()函式,這個函式用來釋放資源或去取消 Observable 執行,
import { Observable,of,from,fromEvent,range,interval } from 'rxjs';
var observable1 = interval(1000);
var subscription1 = observable1.subscribe(x => console.log(x));
// 稍后:
// 這會取消正在進行中的 Observable 執行
// Observable 執行是通過使用觀察者呼叫 subscribe 方法啟動的
subscription1.unsubscribe();
var observable2 = interval(400);
var observable3 = interval(300);
var subscription2 = observable2.subscribe(x => console.log('first: ' + x));
var childSubscription = observable3.subscribe(x => console.log('second: ' + x));
subscription2.add(childSubscription);
setTimeout(() => {
// subscription 和 childSubscription 都會取消訂閱
subscription2.unsubscribe();
}, 1000);
Subject (主體)
RxJS Subject 是一種特殊型別的 Observable,它允許將值多播給多個觀察者,所以 Subject 是多播的,而普通的 Observables 是單播的(每個已訂閱的觀察者都擁有 Observable 的獨立執行),
Subject 像是 Observable,但是可以多播給多個觀察者,Subject 還像是 EventEmitters,維護著多個監聽器的注冊表,
還有一些特殊型別的 Subject:
BehaviorSubject、ReplaySubject和AsyncSubject,
每個 Subject 都是 Observable , - 對于 Subject,你可以提供一個觀察者并使用 subscribe 方法,就可以開始正常接收值,從觀察者的角度而言,它無法判斷 Observable 執行是來自普通的 Observable 還是 Subject ,
在 Subject 的內部,subscribe 不會呼叫發送值的新執行,它只是將給定的觀察者注冊到觀察者串列中,類似于其他庫或語言中的 addListener 的作業方式,
每個 Subject 都是觀察者, - Subject 是一個有如下方法的物件: next(v)、error(e) 和 complete() ,要給 Subject 提供新值,只要呼叫 next(theValue),它會將值多播給已注冊監聽該 Subject 的觀察者們,
import { Subject,from } from 'rxjs';
//我們為 Subject 添加了兩個觀察者,然后給 Subject 提供一些值
var subject1 = new Subject();
subject1.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject1.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject1.next(1);
subject1.next(2);
//因為 Subject 是觀察者,這也就在意味著你可以把 Subject 作為引數傳給任何 Observable 的 subscribe 方法
var subject2 =new Subject();
subject2.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject2.subscribe({
next: (v) => console.log('observerB: ' + v)
});
var observable = from([1, 2, 3]);
observable.subscribe(subject2); // 你可以提供一個 Subject 進行訂閱
多播的 Observables
“多播 Observable” 通過 Subject 來發送通知,這個 Subject 可能有多個訂閱者,然而普通的 “單播 Observable” 只發送通知給單個觀察者,
多播 Observable 在底層是通過使用 Subject 使得多個觀察者可以看見同一個 Observable 執行,
在底層,這就是 multicast 運算子的作業原理:觀察者訂閱一個基礎的 Subject,然后 Subject 訂閱源 Observable ,
import { Subject } from 'rxjs/internal/Subject';
import { take, multicast } from 'rxjs/operators';
const source = timer(1000, 2500).pipe(take(5));
const subject = new Subject();
subject.subscribe({
next: (v) => console.log('observerC: ' + v)
});
subject.subscribe({
next: (v) => console.log('observerD: ' + v)
});
const multicasted = source.pipe(multicast(subject));
multicasted.subscribe({
next: (v) => console.log('observerA: ' + v)
});
multicasted.subscribe({
next: (v) => console.log('observerB: ' + v)
});
source.subscribe(subject);
BehaviorSubject
Subject 的其中一個變體就是 BehaviorSubject,它有一個“當前值”的概念,它保存了發送給消費者的最新值,并且當有新的觀察者訂閱時,會立即從 BehaviorSubject 那接收到“當前值”,
BehaviorSubjects 適合用來表示“隨時間推移的值”,舉例來說,生日的流是一個 Subject,但年齡的流應該是一個 BehaviorSubject ,
import { BehaviorSubject } from 'rxjs';
//BehaviorSubject 使用值0進行初始化,當第一個觀察者訂閱時會得到0,第二個觀察者訂閱時會得到值2,盡管它是在值2發送之后訂閱的,
const subject = new BehaviorSubject(0); // 0是初始值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(3);
ReplaySubject
ReplaySubject 類似于 BehaviorSubject,它可以發送舊值給新的訂閱者,但它還可以記錄 Observable 執行的一部分,
ReplaySubject記錄 Observable 執行中的多個值并將其回放給新的訂閱者,除了緩沖數量,你還可以指定 window time (以毫秒為單位)來確定多久之前的值可以記錄,
import { ReplaySubject } from 'rxjs';
const subject = new ReplaySubject(3); // 為新的訂閱者緩沖最后3個值
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
//我們快取數量100,但 window time 引數只設定了120毫秒
const subject = new ReplaySubject(100, 120 /* windowTime */);
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
let i = 1;
setInterval(() => subject.next(i++), 200);
setTimeout(() => {
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
}, 1000);
AsyncSubject
AsyncSubject 是另一個 Subject 變體,只有當 Observable 執行完成時(執行 complete()),它才會將執行的最后一個值發送給觀察者,
AsyncSubject 和
last()運算子類似,因為它也是等待complete通知,以發送一個單個值,
import { AsyncSubject } from 'rxjs';
const subject = new AsyncSubject();
subject.subscribe({
next: (v) => console.log('observerA: ' + v)
});
subject.next(1);
subject.next(2);
subject.next(3);
subject.next(4);
subject.subscribe({
next: (v) => console.log('observerB: ' + v)
});
subject.next(5);
subject.complete();
Scheduler (調度器)
調度器控制著何時啟動 subscription 和何時發送通知,它由三部分組成:
- 調度器是一種資料結構, 它知道如何根據優先級或其他標準來存盤任務和將任務進行排序,
- 調度器是執行背景關系, 它表示在何時何地執行任務(舉例來說,立即的,或另一種回呼函式機制(比如 setTimeout 或 process.nextTick),或影片幀),
- 調度器有一個(虛擬的)時鐘, 調度器功能通過它的 getter 方法
now()提供了“時間”的概念,在具體調度器上安排的任務將嚴格遵循該時鐘所表示的時間,
調度器可以讓你規定 Observable 在什么樣的執行背景關系中發送通知給它的觀察者,
import { asyncScheduler, Observable } from 'rxjs';
//我們使用普通的 Observable ,它同步地發出值`1`、`2`、`3`,并使用運算子 `observeOn` 來指定 `async` 調度器發送這些值,
const observable = new Observable(subscriber => {
subscriber.next(1);
subscriber.next(2);
subscriber.next(3);
subscriber.complete();
})
.pipe(
observeOn(asyncScheduler)
);
console.log('just before subscribe');
observable.subscribe({
next: x => console.log('got value ' + x),
error: err => console.error('something wrong occurred: ' + err),
complete: () => console.log('done'),
});
console.log('just after subscribe');
//你會發現"just after subscribe"在"got value..."之前就出現了
//just before subscribe
//just after subscribe
//got value 1
//got value 2
//got value 3
//done
調度器型別
async 調度器是 RxJS 提供的內置調度器中的一個,可以通過使用 Scheduler 物件的靜態屬性創建并回傳其中的每種型別的調度器,
| 調度器 | 目的 |
|---|---|
null |
不傳遞任何調度器的話,會以同步遞回的方式發送通知,用于定時操作或尾遞回操作, |
queueScheduler |
當前事件幀中的佇列調度(蹦床調度器),用于迭代操作, |
asapScheduler |
微任務的佇列調度,它使用可用的最快速的傳輸機制,比如 Node.js 的 process.nextTick() 或 Web Worker 的 MessageChannel 或 setTimeout 或其他,用于異步轉換, |
asyncScheduler |
使用 setInterval 的調度,用于基于時間的運算子, |
animationFrameScheduler |
計劃將在下一次瀏覽器內容重新繪制之前發生的任務, 可用于創建流暢的瀏覽器影片, |
Pipeable(運算子)
運算子就是函式,管道運算子本質上是一個純函式,它將一個Observable作為輸入并生成另一個Observable作為輸出,訂閱輸出Observable也將訂閱輸入Observable, 運算子有兩種:
管道運算子是一個將Observable作為其輸入并回傳另一個Observable的函式,這是一個純粹的操作:以前的Observable保持不變,
-
管道運算子是可以使用語法
observableInstance.pipe(operator())傳遞給Observable的型別, 這些包括filter()和mergeMap(), 呼叫時,它們不會更改現有的Observable實體, 相反,它們回傳一個新的Observable,其訂閱邏輯基于第一個Observable, -
創建運算子是另一種運算子,可以稱為獨立函式來創建新的Observable,例如:
of(1,2,3)創建一個observable ,該物件將依次發射1、2和3,創建運算子將在后面的部分中詳細討論,
obs.pipe(
op1(),
op2(),
op3(),
op3(),
)
常用的運算子
finalize<T>(callback: () => void): MonoTypeOperatorFunction<T>:
回傳原始Observable,但在Observable完成或發生錯誤終止時將呼叫指定的函式,
創建運算子
ajaxbindCallbackbindNodeCallbackdeferemptyfromfromEventfromEventPatterngenerateintervalofrangethrowErrortimeriif
連接創建運算子
These are Observable creation operators that also have join functionality -- emitting values of multiple source Observables.
combineLatestconcatforkJoinmergepartitionracezip
轉換運算子
bufferbufferCountbufferTimebufferTogglebufferWhenconcatMapconcatMapToexhaustexhaustMapexpandgroupBymapmapTomergeMapmergeMapTomergeScanpairwisepartitionpluckscanswitchMapswitchMapTowindowwindowCountwindowTimewindowTogglewindowWhen
過濾運算子
auditauditTimedebouncedebounceTimedistinctdistinctKeydistinctUntilChangeddistinctUntilKeyChangedelementAtfilterfirstignoreElementslastsamplesampleTimesingleskipskipLastskipUntilskipWhiletaketakeLasttakeUntiltakeWhilethrottlethrottleTime
組合運算子
Also see the Join Creation Operators section above.
combineAllconcatAllexhaustmergeAllstartWithwithLatestFrom
多播運算子
multicastpublishpublishBehaviorpublishLastpublishReplayshare
錯誤處理運算子
catchErrorretryretryWhen
工具運算子
tapdelaydelayWhendematerializematerializeobserveOnsubscribeOntimeIntervaltimestamptimeouttimeoutWithtoArray
條件和布爾運算子
defaultIfEmptyeveryfindfindIndexisEmpty
數學和聚合運算子
countmaxminreduce
轉載請註明出處,本文鏈接:https://www.uj5u.com/qiye/225527.html
標籤:JavaScript
