開始接觸Angular 9,沒想到前提是必須了解RxJS跟TypeScript。過程中也遇到許多難以理解的部分。
加上RxJS本身的精神就是連鎖處理,聯想到S社的手遊チェインクロニクル,簡稱チェインクロ,戲稱chain苦勞,於是有了這個標題。
本篇先談RxJS的部分。TypeScript的定義部分就不討論,加上TypeScript的定義,複雜度會直線上升。
希望看到的各位不再chain苦勞。
RxJS的基本運作方式:定義「觀察對象」,稱為Observable。
負責有任何資料的變化,使用一至多個next指令,一次傳入一個資料結構(也可以無資料),對外發出通知;所有的動作完成,發出complete。
最基本的定義動作如下:
let obj = {a : "b"};
let ob$ = Observable.create((observer) => {
observer.next(obj); //可重複,所以大部分的動作都是靠next處理
observer.next(); //也可以無資料
observer.complete(); //observer.complete()之後,call observer.next()或是observer.error()等等動作都沒意義。下層只會執行到complete()
});再定義「觀察對象」的「訂閱」的動作,稱為observable.subscribe()。
傳入一個物件,裡面可以有三個參數值:next/complete/error,都必須為function以供執行。若是只傳入一個function,或是arrow function,都是代表只處理next的動作。
也可以不傳入物件。因為任何動作都不處理,相當於只觸發整個定義好的執行流程。
傳入一個物件,裡面可以有三個參數值:next/complete/error,都必須為function以供執行。若是只傳入一個function,或是arrow function,都是代表只處理next的動作。
也可以不傳入物件。因為任何動作都不處理,相當於只觸發整個定義好的執行流程。
next動作可能會被觸發多次,complete / error都只會執行一次。
complete / error被執行之後,記憶體就會被釋放。
例:
let subOb: Subscription = ob$.subscribe({
next: _ => {
console.log("next=", _);
}
, complete: () => {
console.log("completed"); // 注意,complete動作無參數傳入
}
, error: _ => {
console.log("error=", _);
}
});
//某種條件下的停止訂閱
subOb.unSubscribe();
「訂閱」的動作,執行之後回傳的是一個Subscription物件。可以執行unsubscribe/查詢是不是已經被unsubscribe()/加上或是移除子訂閱等動作。unsubscribe()被執行之後,「訂閱」就不會被觸發任何動作。
「觀察對象」的所有定義內容,在沒有被執行「訂閱」之前不會被執行。
在執行「訂閱」之後就會被執行一次。這是RxJS的重要概念:後執行。
「觀察對象」的「訂閱」的動作,不能chain,可以執行多次。每做一次,該觀察者上面就多掛一組。各個Subscription執行的順序就是「訂閱」的順序。各個訂閱之間的資料獨立,不會互相影響。除非在定義執行function的時候,使用了外部參數存取。
因為「觀察對象」可以多次發出next(),每一次next()內的資料,會依照定義好的執行流程一直往後執行,形成了「資料流」的概念。「流」的單位就是傳入next()資料結構。
「觀察對象」的產生方式:除了上面提到最基本的Observable.create()之外,RxJS提供了許多方便撰寫的api。
分成兩類:單一觀察對象,跟多個觀察對象的合併。
單一觀察對象:通常為一個,或是一組資料。
- of():參數的傳遞方式為「物件本身」。可以傳入多個參數。
除了基本資料,物件跟可列舉物件(例如array/map)也都可以傳入。但是可列舉物件,還是當成一個物件處理。也就是對於subscribe的next動作,跟pipe內的函式來說,只會接到一次資料。
丟event類的動作進去(例如promise),會等不到event的內層回傳值(例如promise的resolve / reject)。不過包一層之後會提到的from()之後再subscribe的話就可以取得資料。
運作流程就是依照參數順序丟出資料,然後丟出complete。
若是不傳入任何參數,作用等於empty(),會直接丟出complete。(empty()已被停用) - from():參數的傳遞為「觸發式」。也是fromEvent這種網頁event觸發型的觀察對象,跟interval /timer這類時間觸發型的觀察對象的基礎。
文件上的定義是可以傳入的參數為可列舉物件或是promise。不可傳入基本資料跟一般物件,只能傳入1個參數。
傳入可列舉物件的話,會把可列舉物件的內容,列舉拆開為一個個的資料往後觸發。然後丟出complete。
其實可以丟一個字串進去,會當做字元陣列處理,就會一個個發出文字。
傳入promise,會得到promise的處理結果。resolve會繼續pipe後面的處理,處理完之後執行subscribe的next的動作。reject的話就看pipe內有沒有catchError()的流程,沒有的話就會走subscribe的error的動作。
- interval():傳入以millisecond為單位的數字,作為時間間隔產生資料。發出的資料為時間的觸發次數。第一次觸發為0,第二次為1,以此類推。
- throwError(): 可以傳入物件。subscribe接到這個,就會執行error的指定function。也可以當作pipe內使用的operator,這時不要加上小括號。
多個觀察對象的合併:把一或多個觀察對象,組合成一整個觀察對象。
所以各個動作所傳入的資料必須為「觀察對象」。
- concat():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是前一個觀察對象發出「完成」之後,後續的觀察對象的資料才會開始被取得並發送。因此要是其中有一個觀察對象是不會結束的fromEvent(例如滑鼠點擊)的話,後續的觀察對象的資料就不會被取得。
適合各個觀察對象的資料一定要照順序取得的設計。 - merge():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是只要任一個觀察對象發出資料就會往下傳,沒有先後順序問題。可以說是不管complete的設計。
- race():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是只取第一個來的資料。
- zip():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是在所有的觀察對象都發出一筆資料之後,從各個觀察對象取一筆資料,依照觀察對象傳入的順序,產生一個資料陣列往下傳。等到所有的觀察對象都發出complete之後,發出complete。
因為要所有的觀察對象都發出資料之後各取一筆,要注意會快速累積資料的觀察對象,可能會產生資料堆積。
發出complete的條件:任一觀察對象發出complete起算,直到發出了該觀察對象的筆數的資料,就會發出complete。 - forkJoin():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是在所有的觀察對象都發出「完成」之後,從各個觀察對象取得最後一筆資料,依照觀察對象傳入的順序,產生一個資料陣列往下傳。
其他的產生方式就請參考文件。
前面有提到各個產生觀察對象的方法,參數可以是物件。當然也可以把觀察對象當做資料傳入。因此在pipe內再做subscribe這樣的階層處理動作,在所難免。
後面在operators的部份會提到如何處理這樣的狀況。這部份也是RxJS的威力所在,
可以讓資料流的切換,整合輕而易舉。相對的,要設計處理流程跟方法就不是易事。
後面在operators的部份會提到如何處理這樣的狀況。這部份也是RxJS的威力所在,
可以讓資料流的切換,整合輕而易舉。相對的,要設計處理流程跟方法就不是易事。
在各個觀察對象產生之後,可以使用pipe指令依序傳入各種operators來對資料本身,甚至是觀察者做各種處理。pipe也可以接著pipe連下去。
要注意的是pipe()回傳值的TypeScript定義相當麻煩。
要注意的是pipe()回傳值的TypeScript定義相當麻煩。
觀察對象加上pipe之後的回傳值,其資料結構定義將會是Observable<各個emitter的type>。
RxJs的operators分為以下幾類:
- 直接執行
- startWith():觀察對象的訂閱發生後,在資料開始往外丟之前,先丟出此operator所傳入的一或多筆資料。
後續有接delay()之類的動作,會被影響丟出時間 - 接到資料馬上執行
- tap():傳入一個function,參數為該筆資料。
在資料來的時候會執行。
特性是不會改變原資料,因此不需回傳值(有回傳值也不會被使用) - map():傳入一個function,參數為該筆資料,回傳值為改變後的資料。
在資料來的時候會執行。
特性是會改變原資料,因此必需回傳值 - 條件式
- debounceTime():傳入以millisecond為單位的數字,作為時間間隔。運作方式是觀察對象的資料之間的時間間隔,比參數要短的話就不會被發出。例:interval(100).pipe(debounceTime(100)) 不會發出任何資料。
interval(10).pipe(debounceTime(100)) 就會每110ms發出一次資料。 - filter():傳入判斷式。判斷式的傳入參數為資料,回傳值為boolean。回傳true的話資料就會往下傳。判斷式使用大括號卻沒寫return的話等於undefined(也就是false)。
- first():傳入判斷式。判斷式的傳入參數為資料,回傳值為boolean。會傳出第一筆符合判斷條件的資料,並發出complete。不傳入條件式的話代表不作判斷,一定會丟出第一筆。
因為具有發出complete的特性,可以考慮用在停止不會發出complete的觀察對象。
若是收到complete之前沒有任何符合條件的資料,會丟出EmptyError。 - last():傳入判斷式,在觀察對象發出complete之後,發出最後一筆符合條件式的資料。不傳入條件式的話代表不作判斷。
會根據complete來運作,所以用在不會結束的觀察對象,就不會發出資料。
要是前面有使用startsWith這類動作,會被擋住直到complete出現。 - find():傳入判斷式。判斷式的傳入參數為資料,回傳值為boolean。會傳出第一筆符合判斷條件的資料,並發出complete。判斷式為必要參數。
因為具有發出complete的特性,可以考慮用在停止不會發出complete的觀察對象。
若是收到complete之前沒有任何符合條件的資料,並不會發出EmptyError。 - skip():傳入數字,收到該數字指定筆數之後的資料才會往下傳。
- skipUntil():傳入一個觀察對象,在此觀察對象發出任何資料之後,開始往後傳出在此之後收到的所有資料。
- reduce():傳入定義function,第二筆資料之後都會執行一次。function的第一參數為初始值為第一筆資料的物件,第二參數為新進資料。
function的回傳值為下一次執行function的第一參數。
在觀察對象發出complete之後,丟出最後一次執行function的回傳值。function格式: (<經過處理的物件>, <該筆資料>){
...
return <經過處理的物件>;
} - distinct():傳入function,參數為該筆資料,回傳值為要往下傳的資料。
傳出的資料會被記錄下來,若是經過function取得要往下傳的資料,跟之前傳過的資料有相同的,就不會被傳出。只會傳出不相同的資料。 - distinctUntilChanged():傳入function,參數為該筆資料,回傳值為要往下傳的資料。
傳出的資料的最後一筆會被記錄下來,若是經過function取得要往下傳的資料,跟前一筆資料相同,就不會被傳出。只會傳出跟前一筆不相同的資料。 - buffer():傳入一個觸發用的觀察對象(本文之後統稱為「觸發者」)。此觀察對象輸出資料的時候,被buffer的觀察對象,會把當時收集到的所有資料,轉成一個陣列往下傳。
觸發者要是發出complete,被buffer的觀察對象也會發出complete。若是被buffer的觀察對象,在被觸發之前先發出complete,將不會輸出之前收集到的資料。 - bufferCount():傳入數字,在收集到固定數字的資料之後,轉成一個陣列往下傳。若是被buffer的觀察對象,在被觸發之前先發出complete,將不會輸出之前收集到的資料。
其他的operators請參考文件。
- 最後,是個人覺得RxJS最難懂的部份:觀察對象群的「展開」。
RxJS的觀察對象的資料是任意物件,那當然也可以產生「以觀察對象為資料的觀察對象」,本文之後統稱「階層觀察對象」。
在angular的開發裡面,http模組都是回傳「觀察對象」。為了加速網頁顯示,要同時取多個http api資料;或是順序取,或是取完http api的資料之後再依資料內容去執行其他的http api都是常見的狀況。加上RxJS是「先定義,後執行」(subscribe driven)的設計,執行subscribe之後,http才會動作。沒有「展開」的功能,可能會重現callback地獄。
因此,「觀察對象群」的「展開」就是必須了解的部分。
RxJS提供了以下的展開方法:- concatAll():無參數。作用是在此等待送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。直到收到complete之後,停止訂閱,繼續等待下一個送過來的「(下層)觀察對象」,來了就訂閱,以此類推,直到該層的「觀察對象」送出完成,之後進行下一步。
因為是照順序一個個做,一樣有「(下層)觀察對象」不送出complete就不會對後來的「(下層)觀察對象」執行訂閱的特性。 - mergeAll():無參數。作用是在此等待所有送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。
任何的「(下層)觀察對象」出現,就會開始訂閱並往下送資料;直到「(下層)觀察對象」送出complete之後,停止訂閱。因為會有多個「(下層)觀察對象」同時被訂閱的狀況,此時對訂閱者來說,各個「(下層)觀察對象」的資料會混在一起。
直到該層的「觀察對象」送出完成,之後進行下一步。 - switchAll():無參數。作用是在此等待送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。
「(下層)觀察對象」一出現就會開始訂閱並往下送資料;直到收到complete之後,停止訂閱。但是在觀察對象的訂閱期間,有其他的「(下層)觀察對象」出現,就會把目前訂閱的「(下層)觀察對象」解除訂閱,並訂閱新來的「(下層)觀察對象」的資料。
直到該層的「觀察對象」送出完成,之後進行下一步。
注意:根據測試,要是兩個以上的「(下層)觀察對象」同時出現(只要沒有時間差,像是在observer的create裡面連續執行兩次next(),各個觀察對象也沒有使用delay之類的動作),同時出現的「(下層)觀察對象」會同時一起被訂閱。可能會達不到switch的效果。
注意:以上的展開動作,要是接到資料類型不是「觀察對象」的資料,就會拋出TypeError。
階層觀察對象的complete條件:下層跟上層的觀察對象全部完成。
注意:前面提到的觀察對象的組合操作( 例如concat() ),組合出來的是一個「觀察對象」。不是有階層的「以觀察對象為資料的觀察對象」。要組合出有階層的「觀察對象」,必須使用of()跟from()來做。
例外是zip()跟forkJoin()居然可以使用展開operator。
別忘了這兩個operator都是組合多個「觀察對象」成為一個「觀察對象」,所以三個展開operators的運作結果都一樣:效果會是原本以陣列往下傳的資料,會被順序展開成一個個資料下傳。 - concatAll():無參數。作用是在此等待送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。直到收到complete之後,停止訂閱,繼續等待下一個送過來的「(下層)觀察對象」,來了就訂閱,以此類推,直到該層的「觀察對象」送出完成,之後進行下一步。
- 「展開」的組合operator。前面有提到,map()傳入一個function,參數為該筆資料,回傳值為改變後的資料。這改變後的資料當然也可以是「(下層)觀察對象」。為了方便寫map+all,就出現了組合operators。
- mergeMap():等同於map() + mergeAll()
- switchMap():等同於map() + switchAll()。
- concatMap():等同於map() + concatAll()。
注意:三個組合operators都是傳入一個function,參數為該筆資料,回傳值為「(下層)觀察對象」。回傳值的資料類型不是觀察對象的話,TypeScript編譯的時候就會直接錯誤。若使用不需編譯的設計,會在subscribe的時候拋出TypeError。
一些觀念:
簡單,資料結構可掌握的資料流,可以在subscribe的next裡面定義處理方式。
在資料複雜化,next收到的資料結構很難掌握的時候,通常會在pipe裡面使用tap 或是 map 取出,不需被限制在非使用next處理不可。next/complete就當作是一種資料取得進程event,或是動作完成的event。
在資料複雜化,next收到的資料結構很難掌握的時候,通常會在pipe裡面使用tap 或是 map 取出,不需被限制在非使用next處理不可。next/complete就當作是一種資料取得進程event,或是動作完成的event。
參考文件:
https://rxjs.dev/ RxJS官網
https://angular.io/guide/observables angular大部分的系統Api都是RxJS的設計,需要了解angular對於RxJS的使用狀況。
0 件のコメント:
コメントを投稿