2020年5月18日月曜日

[Web開發] RxJS 鎖鏈戰記


開始接觸Angular 9,沒想到前提是必須了解RxJS跟TypeScript。過程中也遇到許多難以理解的部分。
加上RxJS本身的精神就是連鎖處理,聯想到S社的手遊チェインクロニクル,簡稱チェインクロ,戲稱chain苦勞,於是有了這個標題。

本篇先談RxJS的部分。TypeScript的定義部分就不討論,加上TypeScript的定義,複雜度會直線上升。
希望看到的各位不再chain苦勞。



RxJS的基本運作方式:定義「觀察對象」,稱為Observable。
負責有任何資料的變化,使用一至多個next指令,一次傳入一個資料結構(也可以無資料),對外發出通知;所有的動作完成,發出complete。
最基本的定義動作如下:
let obj = {a : "b"};
let ob$ = Observable.create((observer) => {
  observer.next(obj);  //可重複,所以大部分的動作都是靠next處理
  observer.next(); //也可以無資料
  observer.complete();
//observer.complete()之後,call observer.next()或是observer.error()等等動作都沒意義。下層只會執行到complete()
});

再定義「觀察對象」的「訂閱」的動作,稱為observable.subscribe()。
傳入一個物件,裡面可以有三個參數值:next/complete/error,都必須為function以供執行。若是只傳入一個function,或是arrow function,都是代表只處理next的動作。
也可以不傳入物件。因為任何動作都不處理,相當於只觸發整個定義好的執行流程。

next動作可能會被觸發多次,complete / error都只會執行一次。
complete / error被執行之後,記憶體就會被釋放。
例:
let subOb: Subscription = ob$.subscribe({
  next: _ => {
    console.log("next=", _);
  }
  , complete: () => {
    console.log("completed"); // 注意,complete動作無參數傳入
  }
  , error: _ => {
    console.log("error=", _);
  }
});

//某種條件下的停止訂閱
subOb.unSubscribe();


「訂閱」的動作,執行之後回傳的是一個Subscription物件。可以執行unsubscribe/查詢是不是已經被unsubscribe()/加上或是移除子訂閱等動作。unsubscribe()被執行之後,「訂閱」就不會被觸發任何動作。

「觀察對象」的所有定義內容,在沒有被執行「訂閱」之前不會被執行。
在執行「訂閱」之後就會被執行一次。這是RxJS的重要概念:後執行。

「觀察對象」的「訂閱」的動作,不能chain,可以執行多次。每做一次,該觀察者上面就多掛一組。各個Subscription執行的順序就是「訂閱」的順序。各個訂閱之間的資料獨立,不會互相影響。除非在定義執行function的時候,使用了外部參數存取。

因為「觀察對象」可以多次發出next(),每一次next()內的資料,會依照定義好的執行流程一直往後執行,形成了「資料流」的概念。「流」的單位就是傳入next()資料結構。


「觀察對象」的產生方式:除了上面提到最基本的Observable.create()之外,RxJS提供了許多方便撰寫的api。
分成兩類:單一觀察對象,跟多個觀察對象的合併。

單一觀察對象:通常為一個,或是一組資料。
  • of():參數的傳遞方式為「物件本身」。可以傳入多個參數。
    除了基本資料,物件跟可列舉物件(例如array/map)也都可以傳入。但是可列舉物件,還是當成一個物件處理。也就是對於subscribe的next動作,跟pipe內的函式來說,只會接到一次資料。

    丟event類的動作進去(例如promise),會等不到event的內層回傳值(例如promise的resolve / reject)。不過包一層之後會提到的from()之後再subscribe的話就可以取得資料。

    運作流程就是依照參數順序丟出資料,然後丟出complete。
    若是不傳入任何參數,作用等於empty(),會直接丟出complete。(empty()已被停用)

  • from():參數的傳遞為「觸發式」。也是fromEvent這種網頁event觸發型的觀察對象,跟interval /timer這類時間觸發型的觀察對象的基礎。
    文件上的定義是可以傳入的參數為可列舉物件或是promise。不可傳入基本資料跟一般物件,只能傳入1個參數。

    傳入可列舉物件的話,會把可列舉物件的內容,列舉拆開為一個個的資料往後觸發。然後丟出complete。
    其實可以丟一個字串進去,會當做字元陣列處理,就會一個個發出文字。

    傳入promise,會得到promise的處理結果。resolve會繼續pipe後面的處理,處理完之後執行subscribe的next的動作。reject的話就看pipe內有沒有catchError()的流程,沒有的話就會走subscribe的error的動作。
  • interval():傳入以millisecond為單位的數字,作為時間間隔產生資料。發出的資料為時間的觸發次數。第一次觸發為0,第二次為1,以此類推。

  • throwError(): 可以傳入物件。subscribe接到這個,就會執行error的指定function。也可以當作pipe內使用的operator,這時不要加上小括號。

多個觀察對象的合併:把一或多個觀察對象,組合成一整個觀察對象。
所以各個動作所傳入的資料必須為「觀察對象」。
  • concat():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是前一個觀察對象發出「完成」之後,後續的觀察對象的資料才會開始被取得並發送。因此要是其中有一個觀察對象是不會結束的fromEvent(例如滑鼠點擊)的話,後續的觀察對象的資料就不會被取得。
    適合各個觀察對象的資料一定要照順序取得的設計。
  • merge():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是只要任一個觀察對象發出資料就會往下傳,沒有先後順序問題。可以說是不管complete的設計。
  • race():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是只取第一個來的資料。
  • zip():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是在所有的觀察對象都發出一筆資料之後,從各個觀察對象取一筆資料,依照觀察對象傳入的順序,產生一個資料陣列往下傳。等到所有的觀察對象都發出complete之後,發出complete。
    因為要所有的觀察對象都發出資料之後各取一筆,要注意會快速累積資料的觀察對象,可能會產生資料堆積。
    發出complete的條件:任一觀察對象發出complete起算,直到發出了該觀察對象的筆數的資料,就會發出complete。
  • forkJoin():把一或多個觀察對象,依參數的傳入順序組合成一個觀察對象。特性是在所有的觀察對象都發出「完成」之後,從各個觀察對象取得最後一筆資料,依照觀察對象傳入的順序,產生一個資料陣列往下傳。
其他的產生方式就請參考文件。

前面有提到各個產生觀察對象的方法,參數可以是物件。當然也可以把觀察對象當做資料傳入。因此在pipe內再做subscribe這樣的階層處理動作,在所難免。
後面在operators的部份會提到如何處理這樣的狀況。這部份也是RxJS的威力所在,
可以讓資料流的切換,整合輕而易舉。相對的,要設計處理流程跟方法就不是易事。

在各個觀察對象產生之後,可以使用pipe指令依序傳入各種operators來對資料本身,甚至是觀察者做各種處理。pipe也可以接著pipe連下去。
要注意的是pipe()回傳值的TypeScript定義相當麻煩。
觀察對象加上pipe之後的回傳值,其資料結構定義將會是Observable<各個emitter的type>。


RxJs的operators分為以下幾類:
  • 直接執行
    • startWith():觀察對象的訂閱發生後,在資料開始往外丟之前,先丟出此operator所傳入的一或多筆資料。
      後續有接delay()之類的動作,會被影響丟出時間

  • 接到資料馬上執行
    • tap():傳入一個function,參數為該筆資料。
      在資料來的時候會執行。
      特性是不會改變原資料,因此不需回傳值(有回傳值也不會被使用)

    • map():傳入一個function,參數為該筆資料,回傳值為改變後的資料。
      在資料來的時候會執行。
      特性是會改變原資料,因此必需回傳值

  • 條件式
    • debounceTime():傳入以millisecond為單位的數字,作為時間間隔。運作方式是觀察對象的資料之間的時間間隔,比參數要短的話就不會被發出。例:interval(100).pipe(debounceTime(100)) 不會發出任何資料。
      interval(10).pipe(debounceTime(100)) 就會每110ms發出一次資料。

    • filter():傳入判斷式。判斷式的傳入參數為資料,回傳值為boolean。回傳true的話資料就會往下傳。判斷式使用大括號卻沒寫return的話等於undefined(也就是false)。

    • first():傳入判斷式。判斷式的傳入參數為資料,回傳值為boolean。會傳出第一筆符合判斷條件的資料,並發出complete。不傳入條件式的話代表不作判斷,一定會丟出第一筆。
      因為具有發出complete的特性,可以考慮用在停止不會發出complete的觀察對象。
      若是收到complete之前沒有任何符合條件的資料,會丟出EmptyError。

    • last():傳入判斷式,在觀察對象發出complete之後,發出最後一筆符合條件式的資料。不傳入條件式的話代表不作判斷。
      會根據complete來運作,所以用在不會結束的觀察對象,就不會發出資料。
      要是前面有使用startsWith這類動作,會被擋住直到complete出現。

    • find():傳入判斷式。判斷式的傳入參數為資料,回傳值為boolean。會傳出第一筆符合判斷條件的資料,並發出complete。判斷式為必要參數。
      因為具有發出complete的特性,可以考慮用在停止不會發出complete的觀察對象。
      若是收到complete之前沒有任何符合條件的資料,並不會發出EmptyError。

    • skip():傳入數字,收到該數字指定筆數之後的資料才會往下傳。

    • skipUntil():傳入一個觀察對象,在此觀察對象發出任何資料之後,開始往後傳出在此之後收到的所有資料。

    • reduce():傳入定義function,第二筆資料之後都會執行一次。function的第一參數為初始值為第一筆資料的物件,第二參數為新進資料。
      function的回傳值為下一次執行function的第一參數。
      在觀察對象發出complete之後,丟出最後一次執行function的回傳值。

      function格式:  (<經過處理的物件>, <該筆資料>){
        ...
        return <經過處理的物件>;
      }


    • distinct():傳入function,參數為該筆資料,回傳值為要往下傳的資料。
      傳出的資料會被記錄下來,若是經過function取得要往下傳的資料,跟之前傳過的資料有相同的,就不會被傳出。只會傳出不相同的資料。

    • distinctUntilChanged():傳入function,參數為該筆資料,回傳值為要往下傳的資料。
      傳出的資料的最後一筆會被記錄下來,若是經過function取得要往下傳的資料,跟前一筆資料相同,就不會被傳出。只會傳出跟前一筆不相同的資料。

    • buffer():傳入一個觸發用的觀察對象(本文之後統稱為「觸發者」)。此觀察對象輸出資料的時候,被buffer的觀察對象,會把當時收集到的所有資料,轉成一個陣列往下傳。
      觸發者要是發出complete,被buffer的觀察對象也會發出complete。若是被buffer的觀察對象,在被觸發之前先發出complete,將不會輸出之前收集到的資料。

    • bufferCount():傳入數字,在收集到固定數字的資料之後,轉成一個陣列往下傳。若是被buffer的觀察對象,在被觸發之前先發出complete,將不會輸出之前收集到的資料。
其他的operators請參考文件。
  • 最後,是個人覺得RxJS最難懂的部份:觀察對象群的「展開」。
    RxJS的觀察對象的資料是任意物件,那當然也可以產生「以觀察對象為資料的觀察對象」,本文之後統稱「階層觀察對象」。

    在angular的開發裡面,http模組都是回傳「觀察對象」。為了加速網頁顯示,要同時取多個http api資料;或是順序取,或是取完http api的資料之後再依資料內容去執行其他的http api都是常見的狀況。加上RxJS是「先定義,後執行」(subscribe driven)的設計,執行subscribe之後,http才會動作。沒有「展開」的功能,可能會重現callback地獄。
    因此,「觀察對象群」的「展開」就是必須了解的部分。

    RxJS提供了以下的展開方法:
    • concatAll():無參數。作用是在此等待送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。直到收到complete之後,停止訂閱,繼續等待下一個送過來的「(下層)觀察對象」,來了就訂閱,以此類推,直到該層的「觀察對象」送出完成,之後進行下一步。
      因為是照順序一個個做,一樣有「(下層)觀察對象」不送出complete就不會對後來的「(下層)觀察對象」執行訂閱的特性。

    • mergeAll():無參數。作用是在此等待所有送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。
      任何的「(下層)觀察對象」出現,就會開始訂閱並往下送資料;直到「(下層)觀察對象」送出complete之後,停止訂閱。因為會有多個「(下層)觀察對象」同時被訂閱的狀況,此時對訂閱者來說,各個「(下層)觀察對象」的資料會混在一起。
      直到該層的「觀察對象」送出完成,之後進行下一步。

    • switchAll():無參數。作用是在此等待送過來的「(下層)觀察對象」,然後對送過來的「(下層)觀察對象」開始訂閱,收集資料並往下傳。
      「(下層)觀察對象」一出現就會開始訂閱並往下送資料;直到收到complete之後,停止訂閱。但是在觀察對象的訂閱期間,有其他的「(下層)觀察對象」出現,就會把目前訂閱的「(下層)觀察對象」解除訂閱,並訂閱新來的「(下層)觀察對象」的資料。
      直到該層的「觀察對象」送出完成,之後進行下一步。

      注意:根據測試,要是兩個以上的「(下層)觀察對象」同時出現(只要沒有時間差,像是在observer的create裡面連續執行兩次next(),各個觀察對象也沒有使用delay之類的動作),同時出現的「(下層)觀察對象」會同時一起被訂閱。可能會達不到switch的效果。

    注意:以上的展開動作,要是接到資料類型不是「觀察對象」的資料,就會拋出TypeError。

    階層觀察對象的complete條件:下層跟上層的觀察對象全部完成。

    注意:前面提到的觀察對象的組合操作( 例如concat() ),組合出來的是一個「觀察對象」。不是有階層的「以觀察對象為資料的觀察對象」。要組合出有階層的「觀察對象」,必須使用of()跟from()來做。

    例外是zip()跟forkJoin()居然可以使用展開operator。
    別忘了這兩個operator都是組合多個「觀察對象」成為一個「觀察對象」,所以三個展開operators的運作結果都一樣:效果會是原本以陣列往下傳的資料,會被順序展開成一個個資料下傳。

  • 「展開」的組合operator。前面有提到,map()傳入一個function,參數為該筆資料,回傳值為改變後的資料。這改變後的資料當然也可以是「(下層)觀察對象」。為了方便寫map+all,就出現了組合operators。
    • mergeMap():等同於map() + mergeAll()
    • switchMap():等同於map() + switchAll()。
    • concatMap():等同於map() + concatAll()。
注意:三個組合operators都是傳入一個function,參數為該筆資料,回傳值為「(下層)觀察對象」。回傳值的資料類型不是觀察對象的話,TypeScript編譯的時候就會直接錯誤。若使用不需編譯的設計,會在subscribe的時候拋出TypeError。


一些觀念:
簡單,資料結構可掌握的資料流,可以在subscribe的next裡面定義處理方式。
在資料複雜化,next收到的資料結構很難掌握的時候,通常會在pipe裡面使用tap 或是 map 取出,不需被限制在非使用next處理不可。next/complete就當作是一種資料取得進程event,或是動作完成的event。




參考文件:
https://rxjs.dev/ RxJS官網
https://angular.io/guide/observables angular大部分的系統Api都是RxJS的設計,需要了解angular對於RxJS的使用狀況。


0 件のコメント:

コメントを投稿