RxJS 入門 〜導入編〜
RxJS 入門 〜導入編〜:
RxJS(v5)の備忘録です。
導入編ということで、まず理解した方が良い基本情報に関して主に記載しております。
私も学習中のため、間違いがあればご指摘をいただけると大変助かります。
RxJS は「リアクティブ・プログラミング」というプログラミングパラダイムを採用しているため、通常の JavaScript とは問題解決の考え方やコードの書き方が異なります(JavaScript の一般的なプログラミングパラダイムは「オブジェクト指向」)。
※プログラミングパラダイムとは「何かをプログラムで実現したい場合、それをどうやって実現するかという考え方やスタイル、書き方」のこと。
そのため、RxJS を学習する際には、通常の JavaScript のコードを書く時の考え方は捨てて、新しい言語を学ぶぐらいの気持ちで臨んだ方が良いと思います。
「同期処理やイベント駆動の処理を、簡潔で一貫性を持たせ、可読性を高くコーディングできるようにすること」を目的とした JavaScript ライブラリ。
ざっくり言ってしまえば、複雑な非同期処理を効率良く実装できる JavaScript ライブラリ。
Angular などにも採用されている。
以下のようなメリットがあるから(特に非同期処理の実装において)。
RxJS のアーキテクチャに基づいたアプリケーション(コード)は、主に以下の要素で構成される(他にもあるが、ここでは最低限理解しておきたいものだけ記載)。
Observer へ通知を送るオブジェクト。
以下のような通知設定も定義されている。
Observable から通知を受けとり、通知の種類に応じた処理をするオブジェクト。
Observable を生成、変換するメソッド。
約90種類ほど存在し、それらを組みあわせて、様々な通知設定が定義された Observable を生成できる。
実際に Observable、Observer、Operators で構成された以下のコードを読み解いていく。
デモ
上記のコードはコメントにも書いてあるように、以下を行っている。
今回は Operators である
生成した Observable(
Observable の通知は
subscribe をしたら値の通知が開始され、Observer の処理が実行される。
今回生成した Observable(
というわけで、Observable、Observer、Operators を利用してコンソール出力できた。
Observable、Observer、Operators を理解していないと、他の構成要素や複雑な利用方法を理解するのは難しいため、まずはこれらを理解する。
前述の概念以外に RxJS をスムーズに学習するために覚えておきたい基礎知識は以下の通り。
RxJS の Observable には「Observable」と「Observable インスタンス」が存在する。
Observable は以下のような Observable(クラス)のことを指す。
そして、以下のようなメソッドを「Observable メソッド」と言う。
つまり、
以下のような Observable メソッドから生成された Observable(インスタンス)のことを指す。
そして、以下のような Observable インスタンスのメソッドを「Observable インスタンスメソッド」と言う。
Observable インスタンスメソッド から生成された Observable も Observable インスタンスである。
大きな違いは利用できるメソッド(Operator など)が異なる。
どちらでも利用できるものもあれば、どちらかでしか利用できないものがある。
数値や文字列などから Observable を生成する
通知する値を変換する
Observable を 1 つにマージする
書き方が少し異なるが、生成される Observable は同じなので留意しておく。
前述の通り、Observable(もしくは Observable インスタンス)メソッドは、Observable を生成する。
つまり、メソッドの戻り値は Observable のため、以下のようにメソッドチェーンで簡潔に書ける。特に理由がない限り、メソッドチェーンで書くのが一般的である。
デモ
前述の「Observable、Observer、Operators で構成されたコードを読み解く」で記載したサンプルコードの場合、Observer を以下のように定義した。
これ以外にも様々な定義の仕方がある。
また、以下のように
このように Observer は様々な定義の仕方があるので留意しておく。
RxJS を利用して何かを実装する場合、目的に沿った様々な Observable を生成する必要がある。
そのため、RxJS を有効に利用するためには多種多様な Operators を理解することが特に重要である(と思っている)。
Operators はいくつかの種類に分類できる。その中で頻繁に利用されている以下の Operators を種類毎に記載していく。
数値、文字列、オブジェクト、配列などの Observable ではないものから Observable を生成できる Operators。
引数の値を
デモ
デモ
引数のイテラブル(配列や文字列など)や Promise の結果を
デモ
デモ
Promise が resolve されると
デモ
第1引数の要素が、第2引数のイベントを発火すると Event Object を
デモ
引数で指定した時間(ms)毎に
通知する値は
デモ
引数で指定した時間(ms)後に
デモ
第 2 引数も指定すれば、第 1 引数で指定した時間後に
デモ
Observable を変換(現在の Observable に基づいて、新しい Observable を生成)する Operators。
このメソッドを実行した Observable が
デモ
固定の値を
デモ
クリック時などに固定の値を
デモ
このメソッドを実行した Observable が
デモ
このメソッドを実行した Observable が
以下の場合、
デモ
つまり、上記で生成される Observable は以下のように生成される Observable と同じ。
デモ
上記の処理は
デモ
そのため、生成した Observable から通知をさせたい場合は
デモ
ユーティリティーな Operators。
Observable に影響を与えずに、通知の途中でロギングなどの処理を実行できる。
デモ
このメソッドを実行した Observable の通知を、引数で指定した時間(ms)遅らせる。
デモ
Observable のエラーハンドリングができる Operators。
このメソッドを実行した Observable が
再開した Observable が通知を正常にできた場合、Observer に
デモ
このメソッドを実行した Observable が
再試行する回数は引数で指定した数。指定しない場合は無限に再試行する。
デモ
デモ
複数の Observer へ同一の通知を送れる Operators。
複数の Observer への通知を共有する。
1 つの Observable から複数の Observer に通知をしても、それぞれの Observer が受け取る通知は別物になる。
デモ
それぞれの Observer が受け取る通知は共通のものになる。
デモ
現在の Observable が通知する値に基づいて、特定の値だけを通知する Observable を生成する Operators。
このメソッドを実行した Observable が通知する値に対して条件判定をする関数を実行し、条件に合った(関数の戻り値が
デモ
条件に合う値がなければ
引数で指定した回数の
デモ
引数の Observable が
デモ
このメソッドを実行した Observable が通知する値に対して条件判定をする関数を実行し、条件に合わなかった(関数の戻り値が
デモ
前回
デモ
値がオブジェクトの場合、オブジェクトを比較する関数を定義すれば、前回のオブジェクトと異なるオブジェクトのみを通知できる。
デモ
複数の Observable を結合する Operators。
複数の Observable をマージする。
デモ
指定した Observable が順番に通知を開始する Observable を生成する。
以下のように、1つの目の Observable が
デモ
デモ
指定した Observable が全て
デモ
1 つ前の
デモ
そのため、以下のように通知が 1 回しかされない Observable の場合、
デモ
引数の値を最初に通知する Observable を生成する。
デモ
指定した Observable が全て
デモ
指定した Observable が全て
デモ
指定した Observable の中で、
デモ
指定した Observable が全て
デモ
このメソッドを実行した Observable が
その他の Operators(ドキュメントで明記されていないものはこちらに記載、v6 にも存在する)。
AJAX(HTTP)リクエストの結果を通知する Observable を生成する。
デモ
デモ
RxJS のメリットを確認するために、以下のサンプルを RxJS を利用する場合と利用しない場合で実装し、それぞれのコードを比較する。
※ Chrome の「Network」タブで回線を絞れば再現できます。
リクエストには
デモ
デモ
RxJS を利用しない場合と比べて以下のようになった。
学習が難しいですが、複雑な非同期処理を実装する際には有用なライブラリだと思います。
今回は導入編のため、次回はより RxJS の理解を深めるために知っておいた方が良い以下の項目などに関して投稿する予定です(多分)。
本記事では記載しませんでしたが、RxJS の Observable は マーブルダイアグラム(Marble Diagram)という図で表現されていることが多いです。
以下のサイトでは、様々な Operators を利用して生成した Observable のマーブルダイアグラムが掲載されていますので、RxJS 学習の手助けになると思います。
RxJS Marbles
とは言えども、複雑な機能の Operators から生成された Observable のマーブルダイアグラムだけ見ても理解するのは困難なので、実際にコードを書いてみて動作を確認した方が良いと思います。
以下のサイトだとマーブルダイアグラムがリアルタイムで描画されるため、わかりやすいです。また、自分で書いたコードもマーブルダイアグラムで描画できます(V6 対応なので注意)。
Rx Visualizer
所感ですが、RxJS の用語は同義語がいくつかあるため、記事によって用いられている言葉や説明の仕方がかなり異なります(他のライブラリやフレームワークの記事と比べて)。
例えば、この記事での「値を通知する」は、別の記事では「値を流す」と記載されています。どちらの表現が分かりやすいと感じるかは人それぞれだと思います。
そのため、どの記事が分かりやすいと感じるかも人それぞれだと思いますので、たくさんの記事に目を通してみることをおすすめします。
はじめに
RxJS(v5)の備忘録です。- 「RxJS の何が便利なのかよくわからない」
- 「ドキュメントやサンプルコードを読んだが意味不明だった」
- 「複雑な Operators の挙動がよくわからない」
導入編ということで、まず理解した方が良い基本情報に関して主に記載しております。
私も学習中のため、間違いがあればご指摘をいただけると大変助かります。
本記事の前提や注意点
- バージョンは 5 系を対象にしています。最新は 6 系ですが、書き方がかなり異なるので留意してください(基本的な概念は変わりません)。
- RxJS のイメージを掴んでもらうことや、理解のしやすさを重視しているため、厳密な定義とは異なる説明を記載している箇所もあります(Observable や一部 Operators の説明など)。
- RxJS はフロントエンドのライブラリの中では難しい部類に入ると思います。そのため、入門と言いつつ内容は難しめです。
RxJS 学習の心構え
RxJS は「リアクティブ・プログラミング」というプログラミングパラダイムを採用しているため、通常の JavaScript とは問題解決の考え方やコードの書き方が異なります(JavaScript の一般的なプログラミングパラダイムは「オブジェクト指向」)。※プログラミングパラダイムとは「何かをプログラムで実現したい場合、それをどうやって実現するかという考え方やスタイル、書き方」のこと。
そのため、RxJS を学習する際には、通常の JavaScript のコードを書く時の考え方は捨てて、新しい言語を学ぶぐらいの気持ちで臨んだ方が良いと思います。
RxJS とは
「同期処理やイベント駆動の処理を、簡潔で一貫性を持たせ、可読性を高くコーディングできるようにすること」を目的とした JavaScript ライブラリ。ざっくり言ってしまえば、複雑な非同期処理を効率良く実装できる JavaScript ライブラリ。
Angular などにも採用されている。
なぜ RxJS を利用するのか
以下のようなメリットがあるから(特に非同期処理の実装において)。- 簡潔なコードを書ける
- 可読性が高いコードを書ける
- 一貫性を持ったコードを書ける
- 副作用を減らせる
RxJS の概念(アーキテクチャ)
RxJS のアーキテクチャに基づいたアプリケーション(コード)は、主に以下の要素で構成される(他にもあるが、ここでは最低限理解しておきたいものだけ記載)。- Observable(オブザーバブル)
- Observer(オブザーバ)
- Operators(オペレータ)
Observable(オブザーバブル)
Observer へ通知を送るオブジェクト。以下のような通知設定も定義されている。
- 通知する値
- 通知するタイミング
- 通知の種類
-
next
: 値の通知 -
error
: エラー通知 -
complete
: 完了通知(値の通知が完了したら通知される)
Observer(オブザーバ)
Observable から通知を受けとり、通知の種類に応じた処理をするオブジェクト。
Operators(オペレータ)
Observable を生成、変換するメソッド。約90種類ほど存在し、それらを組みあわせて、様々な通知設定が定義された Observable を生成できる。
Observable、Observer、Operators で構成されたコードを読み解く
実際に Observable、Observer、Operators で構成された以下のコードを読み解いていく。// Operators(オペレータ)で Observable(オブザーバブル)を生成する const observable = Rx.Observable.of(1, 2, 3); // Observer(オブザーバ)を定義する const observer = { // `next`通知のコールバック関数 next: x => console.log(x), // `error`通知のコールバック関数 error: error => console.error(err), // `complete`通知のコールバック関数 complete: () => console.log('complete') }; // Observable(オブザーバブル)を subscribe する observable.subscribe(observer); // 以下がコンソール出力される // => 1 // => 2 // => 3 // => "complete"
1
、2
、3
、"complete"
をコンソール出力するだけの単純な処理。上記のコードはコメントにも書いてあるように、以下を行っている。
- Operators(オペレータ)で Observable(オブザーバブル)を生成する
- Observer(オブザーバ)を定義する
- Observable(オブザーバブル)を subscribe する
Operators(オペレータ)で Observable(オブザーバブル)を生成する
// Operators(オペレータ)で Observable(オブザーバブル)を生成する const observable = Rx.Observable.of(1, 2, 3);
of
を利用して Observable を生成している。生成した Observable(
Rx.Observable.of(1, 2, 3)
)が Observer へ通知する値や通知するタイミング、通知の種類は以下の通り。- 通知する値:
1
、2
、3
(それぞれを通知するため通知回数は 3 回)。 - 通知するタイミング: subscribe したら即通知する。
- 通知の種類: 値である
1
、2
、3
をnext
で通知する。値の通知が全て完了したらcomplete
を通知する。
Observer(オブザーバ)を定義する
// Observer(オブザーバ)を定義する const observer = { // `next`通知のコールバック関数、引数(`x`)に通知された値が渡される next: x => console.log(x), // `error`通知のコールバック関数、引数(`error`)に通知された値が渡される error: error => console.error(error), // `complete`通知のコールバック関数 complete: () => console.log('complete') };
next
、error
、complete
の全3種類のため、それぞれの通知に応じたコールバック関数を定義している。
Observable(オブザーバブル)を subscribe する
// Observable(オブザーバブル)を subscribe する observable.subscribe(observer);
今回生成した Observable(
Rx.Observable.of(1, 2, 3)
)は、値である1
、2
、3
をnext
で通知して、それらの通知が完了したらcomplete
を通知するため、Observer の以下の処理が実行される。-
next
通知のコールバック関数(next: x => console.log(x)
)が3回実行される -
complete
通知のコールバック関数(complete: () => console.log('complete')
)が1回実行される
1
、2
、3
、"complete"
がコンソール出力される。というわけで、Observable、Observer、Operators を利用してコンソール出力できた。
Observable、Observer、Operators を理解していないと、他の構成要素や複雑な利用方法を理解するのは難しいため、まずはこれらを理解する。
RxJS をスムーズに学習するために覚えておきたい基礎知識
前述の概念以外に RxJS をスムーズに学習するために覚えておきたい基礎知識は以下の通り。- Observable と Observable インスタンス
- メソッドチェーン
- Observer の様々な定義の仕方
Observable と Observable インスタンス
RxJS の Observable には「Observable」と「Observable インスタンス」が存在する。
Observable
Observable は以下のような Observable(クラス)のことを指す。Rx.Observable;
Rx.Observable.of;
of
のような Operators も Observable メソッドの一種である。
Observable インスタンス
以下のような Observable メソッドから生成された Observable(インスタンス)のことを指す。// `Rx.Observable.of`から生成された Observable(インスタンス) const observable = Rx.Observable.of(1);
// `Rx.Observable.of`から生成された Observable(インスタンス) const observable = Rx.Observable.of(1); // Observable インスタンスメソッド observable.map;
// `Rx.Observable.of`から生成された Observable(インスタンス) const observable = Rx.Observable.of(1); // `observable.map`から生成された Observable(インスタンス) const observable2 = observable.map(v => v * 10);
Observable と Observable インスタンスの違い
大きな違いは利用できるメソッド(Operator など)が異なる。どちらでも利用できるものもあれば、どちらかでしか利用できないものがある。
Observable でしか利用できないメソッド
数値や文字列などから Observable を生成するof
などは Observable でしか利用できない。// OK const observable = Rx.Observable.of(1); // NG const observable2 = observable.of(2);
Observable インスタンス でしか利用できないメソッド
通知する値を変換するmap
などは Observable インスタンスでしか利用できない。// NG const observable = Rx.Observable.map(v => v * 10); // OK const observable2 = Rx.Observable.of(2).map(v => v * 10);
Observable と Observable インスタンスで利用できるメソッド
Observable を 1 つにマージするmerge
などはどちらでも利用できる。// OK const observable = Rx.Observable.merge(Rx.Observable.of(1), Rx.Observable.of(2)); // OK const observable3 = Rx.Observable.of(1).merge(Rx.Observable.of(2));
メソッドチェーン
前述の通り、Observable(もしくは Observable インスタンス)メソッドは、Observable を生成する。つまり、メソッドの戻り値は Observable のため、以下のようにメソッドチェーンで簡潔に書ける。特に理由がない限り、メソッドチェーンで書くのが一般的である。
Rx.Observable.of(1, 2, 3, 4, 5) .filter(v => v % 2 === 0) .map(v => v * 10) .subscribe({ next: x => console.log(x), error: error => console.error(error), complete: () => console.log('complete') }); // => 20 // => 40 // => "complete"
Observer の様々な定義の仕方
前述の「Observable、Observer、Operators で構成されたコードを読み解く」で記載したサンプルコードの場合、Observer を以下のように定義した。// Observer(オブザーバ)を定義する const observer = { // `next`通知のコールバック関数、引数(`x`)に通知された値が渡される next: x => console.log(x), // `error`通知のコールバック関数、引数(`error`)に通知された値が渡される error: error => console.error(error), // `complete`通知のコールバック関数 complete: () => console.log('complete') };
next
通知のコールバック関数だけ定義する
next
通知以外のコールバック関数は必須でないため、以下のようにも定義できる。// Observer(オブザーバ)を定義する // この場合、`next`通知だけ受け取る。Observable が`complete`通知してもそれは受け取らない。 const observer = x => console.log(x);
subscribe
メソッドの引数にコールバック関数を指定する
また、以下のようにsubscribe
メソッドに直接定義できる。この場合、引数にオブジェクトではなくコールバック関数を指定できる。Rx.Observable.of(1, 2, 3).subscribe( x => console.log(x), error => console.error(err), () => console.log('complete') ); // ↑の Observer の定義は以下と同じ // .subscribe({ // next: x => console.log(x), // error: error => console.error(error), // complete: () => console.log('complete'), // });
多種多様な Operators を理解する
RxJS を利用して何かを実装する場合、目的に沿った様々な Observable を生成する必要がある。そのため、RxJS を有効に利用するためには多種多様な Operators を理解することが特に重要である(と思っている)。
Operators はいくつかの種類に分類できる。その中で頻繁に利用されている以下の Operators を種類毎に記載していく。
- Creation Operators
- of
- from
- fromEvent
- interval
- timer
- Transformation Operators
- map
- mapTo
- scan
- mergeMap
- switchMap
- Utility Operators
- do
- delay
- Error Handling Operators
- catch
- retry
- Multicasting Operators
- share
- Filtering Operators
- filter
- take
- takeUntil
- takeWhile
- distinctUntilChanged
- Combination Operators
- startWith
- merge
- concat
- zip
- forkJoin
- combineLatest
- withLatestFrom
- その他 Operators
- ajax
Creation Operators
数値、文字列、オブジェクト、配列などの Observable ではないものから Observable を生成できる Operators。
of
引数の値をnext
通知する Observable を生成する。
単一の値を通知する
Rx.Observable.of(1).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "complete"
複数の値を通知する
Rx.Observable.of(1, 2, 3).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "next: 2" // => "next: 3" // => "complete"
from
引数のイテラブル(配列や文字列など)や Promise の結果をnext
かerror
通知する Observable を生成する。
配列を通知する
Rx.Observable.from([1, 2, 3]).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "next: 2" // => "next: 3" // => "complete"
文字列を通知する
Rx.Observable.from('soarflat').subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: s" // => "next: o" // => "next: a" // => "next: r" // => "next: f" // => "next: l" // => "next: a" // => "next: t" // => "complete"
Promise の 結果を通知する
Promise が resolve されるとnext
を通知して、reject されるとerror
を通知する。const promiseResolve = () => { return new Promise(resolve => setTimeout(() => resolve('resolve!!'), 3000)); }; Rx.Observable.from(promiseResolve()).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 3秒後に以下がコンソール出力される // => "next: resolve!!" // => "complete" const promiseReject = () => { return new Promise((resolve, reject) => setTimeout(() => reject('reject!!'), 3000)); }; Rx.Observable.from(promiseReject()).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 3秒後に以下がコンソール出力される // => error: reject!!
fromEvent
第1引数の要素が、第2引数のイベントを発火すると Event Object をnext
通知する Observable を生成する。<input type="text" value="" />
const input = document.querySelectorAll('input[type="text"]'); Rx.Observable.fromEvent(input, 'input').subscribe( e => console.log(`next: ${e.target.value}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // `input`要素に文字列を入力すると、入力した文字列がコンソール出力される
interval
引数で指定した時間(ms)毎にnext
通知する Observable を生成する。通知する値は
0
から始まり、通知毎に1
が加算された値が通知される。interval
だけ利用した場合、next
通知が永遠にされるため、complete
通知はされない(別の Operators を利用すれば、complete
通知させることも可能)。Rx.Observable.interval(1000).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 1秒毎に以下がコンソール出力される // => "next: 0" // => "next: 1" // => "next: 2" // => "next: 3" // => "next: 4" // => ... // => "next: 99"
timer
引数で指定した時間(ms)後にnext
通知する Observable を生成する。Rx.Observable.timer(1000).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 1秒後に以下がコンソール出力される // => "next: 0" // => "complete"
第 2 引数も指定した場合
第 2 引数も指定すれば、第 1 引数で指定した時間後にnext
通知を開始し、その後は第 2 引数で指定した時間毎にnext
通知をする。Rx.Observable.timer(1000, 2000).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 1秒後に以下がコンソール出力される // => "next: 0" // 以降は2秒毎にコンソール出力される // => "next: 1" // => "next: 2" // => "next: 3" // => "next: 4" // => ... // => "next: 99"
Transformation Operators
Observable を変換(現在の Observable に基づいて、新しい Observable を生成)する Operators。
map
このメソッドを実行した Observable がnext
通知する値に対して任意の関数を実行し、その戻り値をnext
通知する Observable を生成する。Rx.Observable.of(1, 2, 3) // `map`を実行した Observable(`Observable.of(1, 2, 3)`)が通知する値(`1`、`2`、`3`)に対して // 値を10倍にする関数を実行。その戻り値(`10`、`20`、`30`)を`next`通知する Observable を生成する。 .map(value => value * 10) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 10" // => "next: 20" // => "next: 30" // => "complete"
mapTo
固定の値をnext
通知する Observable を生成する。Rx.Observable.of(1, 2, 3) .mapTo(10) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 10" // => "next: 10" // => "next: 10" // => "complete"
mapTo の使い所
クリック時などに固定の値をnext
通知したい時に利用する。<button id="increment">increment</button>
const increment = document.getElementById('increment'); Rx.Observable.fromEvent(increment, 'click') .mapTo(1) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // button をクリックする度に以下がコンソール出力される。 // => "next: 1"
scan
このメソッドを実行した Observable がnext
通知する値に対して任意の関数を実行し、その戻り値を累積してnext
通知する Observable を生成する。const observable = Rx.Observable.of(1, 2, 3, 4, 5); observable // `scan`を実行した Observable(`observable`)が`next`通知する値(`1`、`2`、`3`、`4`、`5`)に対して // 累積した値と通知された値を加算する関数(`(accumulator, value) => accumulator + value`)を実行し、その戻り値を累積して通知する。 // `accumulator` に累積された値が渡され(初期値が第2引数である`0`)、`value`に通知された値が渡される。 // そのため、今回の関数の場合、以下のように値を加算して累積する // 1回目の`next`通知では`accumulator`が`0`で`value`が`1`なので、`0 + 1`の戻り値である`1`を累積して`next`通知する // 2回目の`next`通知では`accumulator`が`1`で`value`が`2`なので、`1 + 2`の戻り値である`3`累積して`next`通知する // 3回目の`next`通知では`accumulator`が`3`で`value`が`3`なので、`3 + 3`の戻り値である`6`累積して`next`通知する // 4回目の`next`通知では`accumulator`が`6`で`value`が`4`なので、`6 + 4`の戻り値である`10`累積して`next`通知する // 5回目の`next`通知では`accumulator`が`10`で`value`が`5`なので、`10 + 5`の戻り値である`15`累積して`next`通知する .scan((accumulator, value) => accumulator + value, 0) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "next: 3" // => "next: 6" // => "next: 10" // => "next: 15" // => "complete"
mergeMap
このメソッドを実行した Observable がnext
通知をしたら、任意の Observable を生成する。以下の場合、
mergeMap
を実行した Observable がnext
通知する値に対して Observable を生成する関数を実行し、その戻り値である Observable をマージした Observable を生成する。const observable = Rx.Observable.of(2000, 1000, 3000); // `mergeMap`を実行した Observable(`observable`)が通知する値(`2000`、`1000`、`3000`)に対して // Observable を生成する関数(`v => Rx.Observable.timer(v).mapTo(v)`)を実行し、 // その戻り値である以下の Observable をマージした Observable を生成する。 // Rx.Observable.timer(2000).mapTo(2000) // Rx.Observable.timer(1000).mapTo(1000) // Rx.Observable.timer(3000).mapTo(3000) const merged = observable.mergeMap(v => Rx.Observable.timer(v).mapTo(v)); merged.subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1000" // => "next: 2000" // => "next: 3000" // => "complete"
つまり、上記で生成される Observable は以下のように生成される Observable と同じ。
Rx.Observable.merge( Rx.Observable.timer(2000).mapTo(2000), Rx.Observable.timer(1000).mapTo(1000), Rx.Observable.timer(3000).mapTo(3000) ).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1000" // => "next: 2000" // => "next: 3000" // => "complete"
mergeMap
を利用した方が柔軟に Observable を生成できる。
map と何が違うのか
上記の処理はmap
でも実現できそうだが、map
を利用すると以下のようにコンソール出力が異なる。const observable = Rx.Observable.of(2000, 1000, 3000); observable // このメソッドを実行した Observable(`observable`)が通知する値(`2000`、`1000`、`3000`)に対して // Observable を生成する関数(`v => Rx.Observable.timer(v).mapTo(v)`)を実行し、 // その戻り値(Observable)を通知する Observable を生成する。 // 今回の場合、通知する戻り値は以下の3つの Observable。 // Rx.Observable.timer(2000).mapTo(2000) // Rx.Observable.timer(1000).mapTo(1000) // Rx.Observable.timer(3000).mapTo(3000) .map(v => Rx.Observable.timer(v).mapTo(v)) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: [object Object]" // [object Object]は Rx.Observable.timer(2000).mapTo(2000) // // => "next: [object Object]" // [object Object]は Rx.Observable.timer(1000).mapTo(1000) // // => "next: [object Object]" // [object Object]は Rx.Observable.timer(3000).mapTo(3000) // // => "complete"
map
を利用した場合、生成した Observable をnext
通知する Observable が生成される。そのため、生成した Observable から通知をさせたい場合は
mergeMap
を利用する。
switchMap
mergeMap
と同様で、このメソッドを実行した Observable がnext
通知をしたら、任意の Observable を生成する。mergeMap
と異なる点は、このメソッドを実行した Observable がnext
通知する度に、前に生成された Observable は破棄されて新しい Observable が生成される。const observable = Rx.Observable.interval(5000); observable // `switchMap`を実行した Observable(`observable`)が`next`通知をしたら、 // 任意の Observable(`Rx.Observable.interval(1000)`)を生成する。 .switchMap(() => Rx.Observable.interval(1000)) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 0" // => "next: 1" // => "next: 2" // => "next: 3" // このタイミングで`observable`が`next`通知をするため、前に生成された Observable(`Rx.Observable.interval(1000)`)は // 破棄されて、新しい Observable(`Rx.Observable.interval(1000)`)が生成される。 // そのため、`next`通知される値も`0`からになる。 // => "next: 0" // => "next: 1" // => "next: 2" // => "next: 3" // ...
Utility Operators
ユーティリティーな Operators。
do
Observable に影響を与えずに、通知の途中でロギングなどの処理を実行できる。Rx.Observable.of(1, 2, 3, 4, 5) // `do`内の関数(処理)は Observable に影響を与えない .do(v => console.log(`map を実行前: ${v}`)) .map(v => v * 10) .do(v => console.log(`map を実行後: ${v}`)) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "map を実行前: 1" // => "map を実行後: 10" // => "next: 10" // => "map を実行前: 2" // => "map を実行後: 20" // => "next: 20" // => "map を実行前: 3" // => "map を実行後: 30" // => "next: 30" // => "map を実行前: 4" // => "map を実行後: 40" // => "next: 40" // => "map を実行前: 5" // => "map を実行後: 50" // => "next: 50" // => "complete"
delay
このメソッドを実行した Observable の通知を、引数で指定した時間(ms)遅らせる。// 3秒後に`1`を通知する Observable const observable = Rx.Observable.of(1).delay(3000); observable.subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 3秒後に以下がコンソール出力される // => "next: 1" // => "complete"
Error Handling Operators
Observable のエラーハンドリングができる Operators。
catch
このメソッドを実行した Observable がerror
通知をした際に、Observable を生成する関数を実行し、その戻り値である Observable から通知を再開する。再開した Observable が通知を正常にできた場合、Observer に
error
通知はされない。const promiseReject = () => new Promise((resolve, reject) => reject('reject!!')); const observable = Rx.Observable.from(promiseReject()); observable // このメソッドを実行した Observable(`observable`)が`error`を通知した際に、 // Observable を生成する関数(`error => Rx.Observable.of(error)`)を実行し、 // その戻り値である Observable(`Rx.Observable.of(error)`)から通知を再開する。 .catch(error => Rx.Observable.of(error)) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 今回`catch`で生成した Observable(`Rx.Observable.of(error)`)から`next`通知が // 再開されるため、以下がコンソール出力される。 // => "next: reject!!" // => "complete" // 再開した Observable が通知を正常にできたので、Observer に`error`通知はされない。
retry
このメソッドを実行した Observable がerror
通知をした場合、Observable を再試行する(通知を最初からやり直す)。再試行する回数は引数で指定した数。指定しない場合は無限に再試行する。
// `map`を実行した Observable(`Rx.Observable.interval(1000)`)が // `2`より大きな値を`next`通知したら、`error`通知をする Observable const observable = Rx.Observable.interval(1000).map(value => { if (value > 2) { throw new Error('2より大きい!'); } else { return value; } }); observable .retry(2) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 0" // => "next: 1" // => "next: 2" // このタイミングで`observable`から`error`通知がされるため、1回目の再試行をする。 // => "next: 0" // => "next: 1" // => "next: 2" // このタイミングで`observable`から`error`通知がされるため、2回目の再試行をする。 // => "next: 0" // => "next: 1" // => "next: 2" // 再試行が2回失敗した(依然として`observable`から`error`通知がされる)ため、Observer に`error`通知をして終了。 // => "error: Error: 2より大きい!"
再試行に失敗した後に catch でエラーハンドリングする
// `map`を実行した Observable(`Rx.Observable.interval(1000)`)が // `2`より大きな値を`next`通知したら、`error`通知をする Observable。 const observable = Rx.Observable.interval(1000).map(value => { if (value > 2) { throw new Error('2より大きい!'); } else { return value; } }); observable .retry(2) .catch(() => Rx.Observable.of('catch!!')) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 0" // => "next: 1" // => "next: 2" // このタイミングで`observable`から`error`通知がされるため、1回目の再試行をする。 // => "next: 0" // => "next: 1" // => "next: 2" // このタイミングで`observable`から`error`通知がされるため、2回目の再試行をする。 // => "next: 0" // => "next: 1" // => "next: 2" // 再試行が2回失敗した(依然として`error`通知がされる)ため、 // `catch`で生成した Observable(`Rx.Observable.of('catch!!')`)から`next`通知が再開されて以下がコンソール出力される。 // => "next: catch!!" // => "complete"
Multicasting Operators
複数の Observer へ同一の通知を送れる Operators。
share
複数の Observer への通知を共有する。
share を利用しない場合
1 つの Observable から複数の Observer に通知をしても、それぞれの Observer が受け取る通知は別物になる。const observable = Rx.Observable.interval(1000).take(5); // Observer1 を登録する observable.subscribe( x => console.log(`Observer1 next: ${x}`), error => console.error(`Observer1 error: ${error}`), () => console.log('Observer1 complete') ); setTimeout(() => { // Observer2 を登録する observable.subscribe( x => console.log(`Observer2 next: ${x}`), error => console.error(`Observer2 error: ${error}`), () => console.log('Observer2 complete') ); }, 2500); // => "Observer1 next: 0" // => "Observer1 next: 1" // このタイミングで Observer2 が登録され、通知が開始する(Observer1 とは異なる通知を受け取る)。 // => "Observer1 next: 2" // => "Observer2 next: 0" // => "Observer1 next: 3" // => "Observer2 next: 1" // => "Observer1 next: 4" // => "Observer1 complete" // => "Observer2 next: 2" // => "Observer2 next: 3" // => "Observer2 next: 4" // => "Observer2 complete"
share を利用する場合
それぞれの Observer が受け取る通知は共通のものになる。const observable = Rx.Observable.interval(1000) .take(5) .share(); // Observer1 を登録する observable.subscribe( x => console.log(`Observer1 next: ${x}`), error => console.error(`Observer1 error: ${error}`), () => console.log('Observer1 complete') ); setTimeout(() => { // Observer2 を登録する observable.subscribe( x => console.log(`Observer2 next: ${x}`), error => console.error(`Observer2 error: ${error}`), () => console.log('Observer2 complete') ); }, 2500); // => "Observer1 next: 0" // => "Observer1 next: 1" // このタイミングで Observer2 が登録され、Observer1 と同じ通知を受け取る。 // => "Observer1 next: 2" // => "Observer2 next: 2" // => "Observer1 next: 3" // => "Observer2 next: 3" // => "Observer1 next: 4" // => "Observer2 next: 4" // => "Observer1 complete" // => "Observer2 complete"
Filtering Operators
現在の Observable が通知する値に基づいて、特定の値だけを通知する Observable を生成する Operators。
filter
このメソッドを実行した Observable が通知する値に対して条件判定をする関数を実行し、条件に合った(関数の戻り値がtrue
になった)値を通知する Observable を生成する。Rx.Observable.of(1, 2, 3, 4) // `filter`を実行した Observable(`Observable.of(1, 2, 3, 4)`)が // 通知する値(1, 2, 3, 4)に対して条件判定をする関数(`value => value % 2 === 0`)を実行し、 // 条件に合った値(2, 4)を通知する Observable を生成する。 .filter(value => value % 2 === 0) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 2" // => "next: 4" // => "complete"
条件に合う値がなければ
complete
だけ通知される。Rx.Observable.of(1, 3) .filter(value => value % 2 === 0) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "complete"
take
引数で指定した回数のnext
通知をする Observable を生成する。Rx.Observable.interval(1000) .take(3) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 0" // => "next: 1" // => "next: 2" // => "complete"
takeUntil
引数の Observable がnext
を通知したら、このメソッドを実行した Observable のnext
通知を止め、complete
通知をする Observable を生成する。const timer = Rx.Observable.timer(4500); const interval = Rx.Observable.interval(1000); interval // 引数の Observable(`timter`)が `next`通知をしたら、Observable(`interval`)の // `next`通知を止め、`complete`通知をする Observable を生成する。 .takeUntil(timer) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 0" // => "next: 1" // => "next: 2" // => "next: 3" // => "complete"
takeWhile
このメソッドを実行した Observable が通知する値に対して条件判定をする関数を実行し、条件に合わなかった(関数の戻り値がfalse
になった)場合、このメソッドを実行した Observable のnext
通知を止め、complete
通知をする Observable を生成する。const of = Rx.Observable.of(1, 2, 3, 4, 5); of // Observable(`of`)が通知する値(1, 2, 3, 4, 5)に対して // 条件判定をする関数(`v => v <= 3`)を実行して、条件に合わなかった(関数の戻り値が`false`になった)場合、 // このメソッドを実行した Observable(`of`)の`next`通知を止め、`complete`通知をする Observable を生成する。 .takeWhile(v => v <= 3) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "next: 2" // => "next: 3" // => "complete"
distinctUntilChanged
前回next
通知された値と異なる値のみをnext
通知をする Observable を生成する。Rx.Observable.of(1, 1, 2, 4, 4, 5, 5, 5, 3) .distinctUntilChanged() .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "next: 2" // => "next: 4" // => "next: 5" // => "next: 3" // => "complete"
比較対象がオブジェクトの場合
値がオブジェクトの場合、オブジェクトを比較する関数を定義すれば、前回のオブジェクトと異なるオブジェクトのみを通知できる。Rx.Observable.of({ name: 'soarflat' }, { name: 'soarflat' }, { name: 'dom' }, { name: 'tom' }) .distinctUntilChanged((prev, current) => prev.name === current.name) .subscribe( x => console.log(`next: ${x.name}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: soarflat" // => "next: dom" // => "next: tom" // => "complete"
Combination Operators
複数の Observable を結合する Operators。
merge
複数の Observable をマージする。// 2秒後に`1`を通知する Observable const observable = Rx.Observable.timer(2000).mapTo(1); // 1秒後に`2`を通知する Observable const observable2 = Rx.Observable.timer(1000).mapTo(2); // 3秒後に`3`を通知する Observable const observable3 = Rx.Observable.timer(3000).mapTo(3); // 複数の Observable(`observable`、`observable2`、`observable3`) をマージする。 // 今回の場合、以下の通知をする Observable が生成される。 // 1秒後に`2`を通知する // 2秒後に`1`を通知する // 3秒後に`3`を通知する const merged = Rx.Observable.merge(observable, observable2, observable3); merged.subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 2" // => "next: 1" // => "next: 3" // => "complete"
concat
指定した Observable が順番に通知を開始する Observable を生成する。以下のように、1つの目の Observable が
complete
通知をしたら、2つ目の Observable が通知を開始する。そして、3つの目の Observable がcomplete
通知をしたら、3つ目の Observable が通知を開始する。const observable = Rx.Observable.of(1, 2, 3).delay(2000); const observable2 = Rx.Observable.of(4, 5, 6).delay(1000); const observable3 = Rx.Observable.of(7, 8, 9); Rx.Observable // 指定したObservable(`observable`、`observable2`、`observable3`)が // 順番に通知を開始するObservable を生成する。 .concat(observable, observable2, observable3) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1" // => "next: 2" // => "next: 3" // `observable`が`complete`通知をしたら、`observable2`が通知を開始する。 // => "next: 4" // => "next: 5" // => "next: 6" // `observable2`が`complete`通知をしたら、`observable3`が通知を開始する // => "next: 7" // => "next: 8" // => "next: 9" // => "complete"
指定した Observable がcomplete
通知をしない場合
complete
通知をしない Observable を指定した場合、それ以降の Observable は決して通知を開始しない。const observable = Rx.Observable.interval(1000); const observable2 = Rx.Observable.of(4, 5, 6); Rx.Observable.concat(observable, observable2).subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // `observable`は`complete`通知をしないため、`observable2`は決して通知を開始しない。 // => "next: 0" // => "next: 1" // => "next: 2" // => "next: 3" // ... // => "next: 67" // => "next: 68"
forkJoin
指定した Observable が全てcomplete
通知をしたら、それぞれの Observable が最後に通知した値をまとめた配列をnext
通知する Observable を生成する。const observable = Rx.Observable.timer(1000); const observable2 = Rx.Observable.of(1, 2, 3); Rx.Observable // 指定した Observable(`observable`、`observable2`)が全て`complete`通知をしたら、 // それぞれの Observable が最後に通知した値をまとめた配列を`next`通知する Observable を生成する。 // 今回、`observable` が最後に`next`通知する値は`0`であり、`observable2`が最後に`next`通知する値は`3`のため、 // この Observable が`next`通知する値は`[0, 3]` .forkJoin(observable, observable2) .subscribe( x => console.log(x), error => console.error(`error: ${error}`), () => console.log('complete') ); // => [0, 3] // => "complete"
pairwise
1 つ前のnext
通知の値と、現在のnext
通知の値を配列にまとめてnext
通知する Observable を生成する。Rx.Observable.of(1, 2, 3, 4, 5, 6) .pairwise() .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1,2" // => "next: 2,3" // => "next: 3,4" // => "next: 4,5" // => "next: 5,6" // => "complete"
pairwise を利用する上での注意点
pairwise
を実行する Observable が 2 回next
通知をしないと生成した Observable はnext
通知をしない。そのため、以下のように通知が 1 回しかされない Observable の場合、
complete
通知だけされる。Rx.Observable.of(1) .pairwise() .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "complete"
startWith
引数の値を最初に通知する Observable を生成する。Rx.Observable.of(1, 2, 3) .startWith(999) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 999" // => "next: 1" // => "next: 2" // => "next: 3" // => "complete"
zip
指定した Observable が全てnext
通知をしたら、通知された値を配列にまとめてnext
通知する Observable を生成する。const observable = Rx.Observable.of(1, 11); const observable2 = Rx.Observable.of(2, 12); const observable3 = Rx.Observable.of(3, 13); Rx.Observable // 指定した全ての Observable(`observable`、`observable2`、`observable3`)が`next`通知をしたら、 // 通知された値を配列にまとめて`next`通知する Observable(observableZipped)を生成する。 // 今回、指定した Observable が 1回目に`next`通知をする値はそれぞれ`1`、`2`、`3`であり、 // 2回目に`next`通知をする値は`11`、`12`、`13`である。 // そのため、この Observable が`next`通知する値は`[1, 2, 3]`(1回目)と`[11, 12, 13]`(2回目)である。 .zip(observable, observable2, observable3) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1,2,3" // => "next: 11,12,13" // => "complete"
生成した Observable がnext
通知するタイミング
指定した Observable が全てnext
通知したタイミングで通知する。const observable = Rx.Observable.of(1); const observable2 = Rx.Observable.of(2); // 1秒後に`0`を`next`通知する Observable const observable3 = Rx.Observable.timer(1000); Rx.Observable // 1秒後に`[1, 2, 0]`を`next`通知する Observable を生成する .zip(observable, observable2, observable3) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // 1秒後に以下がコンソール出力される // => "next: 1,2,0" // => "complete"
生成した Observable がnext
通知をする回数
指定した Observable の中で、next
通知が1番少ない回数だけ通知する。// 通知回数4回 const observable = Rx.Observable.of(1, 2, 3, 4); // 通知回数3回 const observable2 = Rx.Observable.of(5, 6, 7); // 通知回数無限 const observable3 = Rx.Observable.interval(1000); Rx.Observable // 今回指定した Observable の中で、`next`通知回数が1番少ないのは`observable2`である。 // そのため、今回生成した Observable が`next`通知をする回数は3回 .zip(observable, observable2, observable3) .subscribe( x => console.log(`next: ${x}`), error => console.error(`error: ${error}`), () => console.log('complete') ); // => "next: 1,5,0" // => "next: 2,6,1" // => "next: 3,7,2" // この時点で`observable2`が`complete`通知をするので、"complete"が出力される // => "complete"
combineLatest
指定した Observable が全てnext
通知をしたら、それぞれの Observable が最後にnext
通知した値を配列にまとめてnext
通知する Observable を生成する。// 1秒毎に`"A"`、`"B"`、`"C"`、`"D"`、`"E"`を通知する Observable const observable = Rx.Observable.interval(1000) .take(5) .map(index => 'ABCDE'[index]); // 1.4秒毎に`"F"`、`"G"`、`"H"`、`"I"`、`"J"`を通知する Observable const observable2 = Rx.Observable.interval(1400) .take(5) .map(index => 'FGHIJ'[index]); // 2.2秒毎に`"K"`、`"L"`、`"M"`、`"N"`、`"O"`を通知する Observable const observable3 = Rx.Observable.interval(2200) .take(5) .map(index => 'KLMNO'[index]); // 指定した Observable(`observable`、`observable2`、`observable3`)が全て`next`通知をしたら、 // それぞれの Observable が最後に`next`通知した値を配列にまとめて`next`通知する Observable を生成する。 const combined = Rx.Observable.combineLatest(observable, observable2, observable3); combined.subscribe( x => console.log(x), error => console.error(`error: ${error}`), () => console.log('complete') ); // 生成した Observable が最初に`next`通知をするタイミングは`observable3`が1回目(2.2秒後)の`next`通知をした時。 // それまでに以下の`next`通知がされている。 // 1. `observable`が`"A"`を`next`通知する。 // 2. `observable2`が`"F"`を`next`通知する。 // 3. `observable`が`"B"`を`next`通知する。 // 4. `observable3`が`"K"`を`next`通知する。 // このタイミングで指定した Observable が全て`next`通知をしたので、 // それぞれの Observable が最後に通知した値がまとめられた配列が`next`通知され、以下がコンソール出力される。 // => ["B", "F", "K"] // // これ以降は、いずれかの Observable(`observable`、`observable2`、`observable3`)が`next`通知する度に、 // それぞれの Observable が最後に通知した値を配列にまとめて`next`通知をするので、以下がコンソール出力される。 // => ["B", "G", "K"] // => ["C", "G", "K"] // => ["D", "G", "K"] // => ["D", "H", "K"] // => ["D", "H", "L"] // => ["E", "H", "L"] // => ["E", "I", "L"] // => ["E", "I", "M"] // => ["E", "J", "M"] // => ["E", "J", "N"] // => ["E", "J", "O"] // => "complete"
withLatestFrom
このメソッドを実行した Observable がnext
通知をした際に、引数の全ての Observable が一度でもnext
通知済みであれば、それぞれの Observable が最後にnext
通知した値を配列にまとめてnext
通知する Observable を生成する。// 1秒毎に`"A"`、`"B"`、`"C"`、`"D"`、`"E"`を通知する Observable const observable = Rx.Observable.interval(1000) .take(5) .map(index => 'ABCDE'[index]); // 1.4秒毎に`"F"`、`"G"`、`"H"`、`"I"`、`"J"`を通知する Observable const observable2 = Rx.Observable.interval(1400) .take(5) .map(index => 'FGHIJ'[index]); // 2.2秒毎に`"K"`、`"L"`、`"M"`、`"N"`、`"O"`を通知する Observable const observable3 = Rx.Observable.interval(2200) .take(5) .map(index => 'KLMNO'[index]); observable // このメソッドを実行した Observable(`observable`)が`next`通知をした際に、 // 引数の全ての Observable(`observable2`、`observable3`) // が一度でも`next`通知済みであれば、それぞれの Observable が最後に通知した値を配列にまとめて`next`通知する Observable を生成する。 .withLatestFrom(observable2, observable3) .subscribe( x => console.log(x), error => console.error(`error: ${error}`), () => console.log('complete') ); // 生成した Observable が最初に`next`通知をするタイミングは`observable`が3回目(3秒後)の // `next`通知をした時であり、それまでに以下の`next`通知がされている。 // 1. `observable`が`"A"`を`next`通知する。 // 2. `observable2`が`"F"`を`next`通知する。 // 3. `observable`が`"B"`を`next`通知する。 // 4. `observable2`が`"G"`を`next`通知する。 // 5. `observable3`が`"K"`を`next`通知する。 // 6. `observable`が`"C"`を`next`通知する。 // このタイミングで引数の全ての Observable(`observable2`、`observable3`)が`next`通知済みなので、 // それぞれの Observable が最後に通知した値をまとめた配列が`next`通知され、以下がコンソール出力される。 // => ["C", "G", "K"] // // これ以降は、`observable`が`next`通知をする度に、それぞれの Observable が最後に通知した値を // 配列にまとめて`next`通知をするので、以下がコンソール出力される。 // => ["D", "G", "K"] // => ["E", "H", "L"] // => "complete"
その他 Operators
その他の Operators(ドキュメントで明記されていないものはこちらに記載、v6 にも存在する)。
ajax
AJAX(HTTP)リクエストの結果を通知する Observable を生成する。
リクエスト(通信)が成功した場合
const observable = Rx.Observable.ajax('https://jsonplaceholder.typicode.com/post/1'); observable.subscribe( x => console.log(x.response), error => console.error(`error: ${error}`), () => console.log('complete') ); // => { // body: "quia et suscipit suscipit recusandae consequuntur expedita et cumreprehenderit molestiae ut ut quas totam nostrum rerum est autem sunt rem eveniet architecto", // id: 1, // title: "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", // userId: 1 // } // => "complete"
リクエスト(通信)が失敗した場合
const observable = Rx.Observable.ajax('https://jsonplaceholder.typicode.co'); observable.subscribe( x => console.log(x.response), error => console.error(`error: ${error.message}`), () => console.log('complete') ); // => "error: ajax error 0"
RxJS を利用したコードと利用しないコードを比較する
RxJS のメリットを確認するために、以下のサンプルを RxJS を利用する場合と利用しない場合で実装し、それぞれのコードを比較する。- 選択した ID の投稿を取得して描画する
- もし、投稿を取得中で別の ID を選択した場合、リクエスト(通信)を中断して新しく選択した ID の投稿だけを取得する
※ Chrome の「Network」タブで回線を絞れば再現できます。
RxJS を利用しない場合
リクエストにはXMLHttpRequest
を利用(axios の利用も検討したが、コードの長さがあまり変わらなかったので今回は利用していない)。通信中かどうかはisFetching
で管理。const select = document.querySelector('select'); const result = document.getElementById('result'); let request; let isFetching = false; const render = post => { result.innerHTML = post.title; }; const fetchPost = id => { return new Promise(resolve => { request = new XMLHttpRequest(); request.responseType = 'json'; request.open('GET', `https://jsonplaceholder.typicode.com/posts/${id}`, true); request.onload = () => { if (request.readyState === XMLHttpRequest.DONE && request.status === 200) { resolve(request.response); } }; request.send(null); }); }; select.addEventListener('change', e => { if (isFetching) { request.abort(); } else { isFetching = true; } fetchPost(e.target.value).then(res => { isFetching = false; render(res); }); });
RxJS を利用した場合
const select = document.querySelector('select'); const result = document.getElementById('result'); const id$ = Rx.Observable.fromEvent(select, 'change').map(e => e.target.value); const fetchPost = id => Rx.Observable.ajax(`https://jsonplaceholder.typicode.com/posts/${id}`).map(res => res.response); const render = post => { result.innerHTML = post.title; }; id$.switchMap(fetchPost).subscribe(render);
RxJS を利用しない場合と比べて以下のようになった。
- 簡潔になった
- 可読性が高くなった
- 副作用を減らせた(RxJS を利用しない場合、
fetchPost
などは関数外の変数request
に依存していたが、それがなくなった)
終わり
学習が難しいですが、複雑な非同期処理を実装する際には有用なライブラリだと思います。今回は導入編のため、次回はより RxJS の理解を深めるために知っておいた方が良い以下の項目などに関して投稿する予定です(多分)。
- Subject
- Scheduler
- Observable の Hot、Cold の性質
- Operator を複数組み合わせたサンプル
RxJS 学習を手助けするマーブルダイアグラム
本記事では記載しませんでしたが、RxJS の Observable は マーブルダイアグラム(Marble Diagram)という図で表現されていることが多いです。以下のサイトでは、様々な Operators を利用して生成した Observable のマーブルダイアグラムが掲載されていますので、RxJS 学習の手助けになると思います。
RxJS Marbles
とは言えども、複雑な機能の Operators から生成された Observable のマーブルダイアグラムだけ見ても理解するのは困難なので、実際にコードを書いてみて動作を確認した方が良いと思います。
以下のサイトだとマーブルダイアグラムがリアルタイムで描画されるため、わかりやすいです。また、自分で書いたコードもマーブルダイアグラムで描画できます(V6 対応なので注意)。
Rx Visualizer
参考
所感ですが、RxJS の用語は同義語がいくつかあるため、記事によって用いられている言葉や説明の仕方がかなり異なります(他のライブラリやフレームワークの記事と比べて)。例えば、この記事での「値を通知する」は、別の記事では「値を流す」と記載されています。どちらの表現が分かりやすいと感じるかは人それぞれだと思います。
そのため、どの記事が分かりやすいと感じるかも人それぞれだと思いますので、たくさんの記事に目を通してみることをおすすめします。
-
「RxJS」初心者入門 – JavaScript の非同期処理の常識を変えるライブラリ: 入門記事です。 -
RxJS を学ぼう #1 - これからはじめる人のための導入編: 入門記事です。part 5 まであります。 -
【翻訳】あなたが求めていたリアクティブプログラミング入門: 難しめの入門記事です。Operators を多用したサンプルもあります。いきなり読むと理解できないかもしれないため、基礎を理解した後に読んでみると良いと思います。 -
Learn to combine RxJs sequences with super intuitive interactive diagrams: 各 Operator の動きをアニメーションで解説してます。 -
Learn RxJS: Operators の解説と、それらを利用したサンプルコードがあります(リファレンスのようなものです)。
コメント
コメントを投稿