RxJS 入門 〜導入編〜

RxJS 入門 〜導入編〜:


はじめに

RxJS(v5)の備忘録です。

  • 「RxJS の何が便利なのかよくわからない」
  • 「ドキュメントやサンプルコードを読んだが意味不明だった」
  • 「複雑な Operators の挙動がよくわからない」
といった人達向けの記事です。

導入編ということで、まず理解した方が良い基本情報に関して主に記載しております。

私も学習中のため、間違いがあればご指摘をいただけると大変助かります。


本記事の前提や注意点

  • バージョンは 5 系を対象にしています。最新は 6 系ですが、書き方がかなり異なるので留意してください(基本的な概念は変わりません)。
  • RxJS のイメージを掴んでもらうことや、理解のしやすさを重視しているため、厳密な定義とは異なる説明を記載している箇所もあります(Observable や一部 Operators の説明など)。
  • RxJS はフロントエンドのライブラリの中では難しい部類に入ると思います。そのため、入門と言いつつ内容は難しめです。


RxJS 学習の心構え

RxJS は「リアクティブ・プログラミング」というプログラミングパラダイムを採用しているため、通常の JavaScript とは問題解決の考え方やコードの書き方が異なります(JavaScript の一般的なプログラミングパラダイムは「オブジェクト指向」)。

※プログラミングパラダイムとは「何かをプログラムで実現したい場合、それをどうやって実現するかという考え方やスタイル、書き方」のこと。

そのため、RxJS を学習する際には、通常の JavaScript のコードを書く時の考え方は捨てて、新しい言語を学ぶぐらいの気持ちで臨んだ方が良いと思います


RxJS とは

「同期処理やイベント駆動の処理を、簡潔で一貫性を持たせ、可読性を高くコーディングできるようにすること」を目的とした JavaScript ライブラリ。

ざっくり言ってしまえば、複雑な非同期処理を効率良く実装できる JavaScript ライブラリ。

Angular などにも採用されている。


なぜ RxJS を利用するのか

以下のようなメリットがあるから(特に非同期処理の実装において)。

  • 簡潔なコードを書ける
  • 可読性が高いコードを書ける
  • 一貫性を持ったコードを書ける
  • 副作用を減らせる
これらのメリットがいくつか実感できるサンプルコードは後述する(基礎を理解してからでないと RxJS を利用したコードが読めないため)。


RxJS の概念(アーキテクチャ)

RxJS のアーキテクチャに基づいたアプリケーション(コード)は、主に以下の要素で構成される(他にもあるが、ここでは最低限理解しておきたいものだけ記載)。

  • Observable(オブザーバブル)
  • Observer(オブザーバ)
  • Operators(オペレータ)
それぞれの構成要素がどのように作用するのかは以下のイメージ。



rxjs-image (2) (1).jpg



Observable(オブザーバブル)

Observer へ通知を送るオブジェクト。

以下のような通知設定も定義されている。

  • 通知する値
  • 通知するタイミング
  • 通知の種類
通知の種類は以下の全3種類。


  • next: 値の通知

  • error: エラー通知

  • complete: 完了通知(値の通知が完了したら通知される)
また、Observable は通知を受け取る Observer を登録でき、登録をすると通知を開始する。Observer を登録することを「Observable を subscribe する」とも言う。


Observer(オブザーバ)

Observable から通知を受けとり、通知の種類に応じた処理をするオブジェクト。


Operators(オペレータ)

Observable を生成、変換するメソッド。

約90種類ほど存在し、それらを組みあわせて、様々な通知設定が定義された Observable を生成できる。


Observable、Observer、Operators で構成されたコードを読み解く

実際に Observable、Observer、Operators で構成された以下のコードを読み解いていく。

// Operators(オペレータ)で Observable(オブザーバブル)を生成する 
const observable = Rx.Observable.of(1, 2, 3); 
 
// Observer(オブザーバ)を定義する 
const observer = { 
  // `next`通知のコールバック関数 
  next: x => console.log(x), 
  // `error`通知のコールバック関数 
  error: error => console.error(err), 
  // `complete`通知のコールバック関数 
  complete: () => console.log('complete') 
}; 
 
// Observable(オブザーバブル)を subscribe する 
observable.subscribe(observer); 
// 以下がコンソール出力される 
// => 1 
// => 2 
// => 3 
// => "complete" 
デモ

123"complete"をコンソール出力するだけの単純な処理。

上記のコードはコメントにも書いてあるように、以下を行っている。

  • Operators(オペレータ)で Observable(オブザーバブル)を生成する
  • Observer(オブザーバ)を定義する
  • Observable(オブザーバブル)を subscribe する

Operators(オペレータ)で Observable(オブザーバブル)を生成する
// Operators(オペレータ)で Observable(オブザーバブル)を生成する 
const observable = Rx.Observable.of(1, 2, 3); 
今回は Operators であるofを利用して Observable を生成している。

生成した Observable(Rx.Observable.of(1, 2, 3))が Observer へ通知する値や通知するタイミング、通知の種類は以下の通り。

  • 通知する値: 123(それぞれを通知するため通知回数は 3 回)。
  • 通知するタイミング: subscribe したら即通知する。
  • 通知の種類: 値である123nextで通知する。値の通知が全て完了したらcompleteを通知する。

Observer(オブザーバ)を定義する
// Observer(オブザーバ)を定義する 
const observer = { 
  // `next`通知のコールバック関数、引数(`x`)に通知された値が渡される 
  next: x => console.log(x), 
  // `error`通知のコールバック関数、引数(`error`)に通知された値が渡される 
  error: error => console.error(error), 
  // `complete`通知のコールバック関数 
  complete: () => console.log('complete') 
}; 
Observable の通知はnexterrorcompleteの全3種類のため、それぞれの通知に応じたコールバック関数を定義している。


Observable(オブザーバブル)を subscribe する
// Observable(オブザーバブル)を subscribe する 
observable.subscribe(observer); 
subscribe をしたら値の通知が開始され、Observer の処理が実行される。

今回生成した Observable(Rx.Observable.of(1, 2, 3))は、値である123nextで通知して、それらの通知が完了したらcompleteを通知するため、Observer の以下の処理が実行される。


  • next通知のコールバック関数(next: x => console.log(x))が3回実行される

  • complete通知のコールバック関数(complete: () => console.log('complete'))が1回実行される
そのため、結果として123"complete"がコンソール出力される。

というわけで、Observable、Observer、Operators を利用してコンソール出力できた。

Observable、Observer、Operators を理解していないと、他の構成要素や複雑な利用方法を理解するのは難しいため、まずはこれらを理解する。


RxJS をスムーズに学習するために覚えておきたい基礎知識

前述の概念以外に RxJS をスムーズに学習するために覚えておきたい基礎知識は以下の通り。

  • Observable と Observable インスタンス
  • メソッドチェーン
  • Observer の様々な定義の仕方


Observable と Observable インスタンス

RxJS の Observable には「Observable」と「Observable インスタンス」が存在する。


Observable

Observable は以下のような Observable(クラス)のことを指す。

Rx.Observable; 
そして、以下のようなメソッドを「Observable メソッド」と言う。

Rx.Observable.of; 
つまり、ofのような Operators も Observable メソッドの一種である。


Observable インスタンス

以下のような Observable メソッドから生成された Observable(インスタンス)のことを指す。

// `Rx.Observable.of`から生成された Observable(インスタンス) 
const observable = Rx.Observable.of(1); 
そして、以下のような Observable インスタンスのメソッドを「Observable インスタンスメソッド」と言う。

// `Rx.Observable.of`から生成された Observable(インスタンス) 
const observable = Rx.Observable.of(1); 
 
// Observable インスタンスメソッド 
observable.map; 
Observable インスタンスメソッド から生成された Observable も Observable インスタンスである。

// `Rx.Observable.of`から生成された Observable(インスタンス) 
const observable = Rx.Observable.of(1); 
 
// `observable.map`から生成された Observable(インスタンス) 
const observable2 = observable.map(v => v * 10); 


Observable と Observable インスタンスの違い

大きな違いは利用できるメソッド(Operator など)が異なる

どちらでも利用できるものもあれば、どちらかでしか利用できないものがある。


Observable でしか利用できないメソッド
数値や文字列などから Observable を生成するofなどは Observable でしか利用できない。

// OK 
const observable = Rx.Observable.of(1); 
 
// NG 
const observable2 = observable.of(2); 

Observable インスタンス でしか利用できないメソッド
通知する値を変換するmapなどは Observable インスタンスでしか利用できない。

// NG 
const observable = Rx.Observable.map(v => v * 10); 
 
// OK 
const observable2 = Rx.Observable.of(2).map(v => v * 10); 

Observable と Observable インスタンスで利用できるメソッド
Observable を 1 つにマージするmergeなどはどちらでも利用できる。

// OK 
const observable = Rx.Observable.merge(Rx.Observable.of(1), Rx.Observable.of(2)); 
 
// OK 
const observable3 = Rx.Observable.of(1).merge(Rx.Observable.of(2)); 
書き方が少し異なるが、生成される Observable は同じなので留意しておく。


メソッドチェーン

前述の通り、Observable(もしくは Observable インスタンス)メソッドは、Observable を生成する。

つまり、メソッドの戻り値は Observable のため、以下のようにメソッドチェーンで簡潔に書ける。特に理由がない限り、メソッドチェーンで書くのが一般的である。

Rx.Observable.of(1, 2, 3, 4, 5) 
  .filter(v => v % 2 === 0) 
  .map(v => v * 10) 
  .subscribe({ 
    next: x => console.log(x), 
    error: error => console.error(error), 
    complete: () => console.log('complete') 
  }); 
// => 20 
// => 40 
// => "complete" 
デモ


Observer の様々な定義の仕方

前述の「Observable、Observer、Operators で構成されたコードを読み解く」で記載したサンプルコードの場合、Observer を以下のように定義した。

// Observer(オブザーバ)を定義する 
const observer = { 
  // `next`通知のコールバック関数、引数(`x`)に通知された値が渡される 
  next: x => console.log(x), 
  // `error`通知のコールバック関数、引数(`error`)に通知された値が渡される 
  error: error => console.error(error), 
  // `complete`通知のコールバック関数 
  complete: () => console.log('complete') 
}; 
これ以外にも様々な定義の仕方がある。


next通知のコールバック関数だけ定義する

next通知以外のコールバック関数は必須でないため、以下のようにも定義できる。

// Observer(オブザーバ)を定義する 
// この場合、`next`通知だけ受け取る。Observable が`complete`通知してもそれは受け取らない。 
const observer = x => console.log(x); 


subscribeメソッドの引数にコールバック関数を指定する

また、以下のようにsubscribeメソッドに直接定義できる。この場合、引数にオブジェクトではなくコールバック関数を指定できる。

Rx.Observable.of(1, 2, 3).subscribe( 
  x => console.log(x), 
  error => console.error(err), 
  () => console.log('complete') 
); 
// ↑の Observer の定義は以下と同じ 
// .subscribe({ 
//   next: x => console.log(x), 
//   error: error => console.error(error), 
//   complete: () => console.log('complete'), 
// }); 
このように Observer は様々な定義の仕方があるので留意しておく。


多種多様な Operators を理解する

RxJS を利用して何かを実装する場合、目的に沿った様々な Observable を生成する必要がある。

そのため、RxJS を有効に利用するためには多種多様な Operators を理解することが特に重要である(と思っている)。

Operators はいくつかの種類に分類できる。その中で頻繁に利用されている以下の Operators を種類毎に記載していく。

  • Creation Operators

    • of
    • from
    • fromEvent
    • interval
    • timer
  • Transformation Operators

    • map
    • mapTo
    • scan
    • mergeMap
    • switchMap
  • Utility Operators

    • do
    • delay
  • Error Handling Operators

    • catch
    • retry
  • Multicasting Operators

    • share
  • Filtering Operators

    • filter
    • take
    • takeUntil
    • takeWhile
    • distinctUntilChanged
  • Combination Operators

    • startWith
    • merge
    • concat
    • zip
    • forkJoin
    • combineLatest
    • withLatestFrom
  • その他 Operators

    • ajax
※ Operators の戻り値は Observable のため、ざっくり言ってしまえば全ての Operators は Observable を生成するメソッドです。そのため、後述する「Creation Operators」以外の Operators でも Observable は生成されるので、混乱しないように注意してください。


Creation Operators

数値、文字列、オブジェクト、配列などの Observable ではないものから Observable を生成できる Operators。


of

引数の値をnext通知する Observable を生成する。


単一の値を通知する
Rx.Observable.of(1).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: 1" 
// => "complete" 
デモ


複数の値を通知する
Rx.Observable.of(1, 2, 3).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "complete" 
デモ


from

引数のイテラブル(配列や文字列など)や Promise の結果をnexterror通知する Observable を生成する。


配列を通知する
Rx.Observable.from([1, 2, 3]).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "complete" 
デモ


文字列を通知する
Rx.Observable.from('soarflat').subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: s" 
// => "next: o" 
// => "next: a" 
// => "next: r" 
// => "next: f" 
// => "next: l" 
// => "next: a" 
// => "next: t" 
// => "complete" 
デモ


Promise の 結果を通知する
Promise が resolve されるとnextを通知して、reject されるとerrorを通知する。

const promiseResolve = () => { 
  return new Promise(resolve => setTimeout(() => resolve('resolve!!'), 3000)); 
}; 
 
Rx.Observable.from(promiseResolve()).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 3秒後に以下がコンソール出力される 
// => "next: resolve!!" 
// => "complete" 
 
const promiseReject = () => { 
  return new Promise((resolve, reject) => setTimeout(() => reject('reject!!'), 3000)); 
}; 
 
Rx.Observable.from(promiseReject()).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 3秒後に以下がコンソール出力される 
// => error: reject!! 
デモ


fromEvent

第1引数の要素が、第2引数のイベントを発火すると Event Object をnext通知する Observable を生成する。

<input type="text" value="" /> 
const input = document.querySelectorAll('input[type="text"]'); 
 
Rx.Observable.fromEvent(input, 'input').subscribe( 
  e => console.log(`next: ${e.target.value}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// `input`要素に文字列を入力すると、入力した文字列がコンソール出力される 
デモ


interval

引数で指定した時間(ms)毎にnext通知する Observable を生成する。

通知する値は0から始まり、通知毎に1が加算された値が通知される。

intervalだけ利用した場合、next通知が永遠にされるため、complete通知はされない(別の Operators を利用すれば、complete通知させることも可能)。

Rx.Observable.interval(1000).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 1秒毎に以下がコンソール出力される 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "next: 4" 
// => ... 
// => "next: 99" 
デモ


timer

引数で指定した時間(ms)後にnext通知する Observable を生成する。

Rx.Observable.timer(1000).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 1秒後に以下がコンソール出力される 
// => "next: 0" 
// => "complete" 
デモ


第 2 引数も指定した場合
第 2 引数も指定すれば、第 1 引数で指定した時間後にnext通知を開始し、その後は第 2 引数で指定した時間毎にnext通知をする。

Rx.Observable.timer(1000, 2000).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 1秒後に以下がコンソール出力される 
// => "next: 0" 
// 以降は2秒毎にコンソール出力される 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "next: 4" 
// => ... 
// => "next: 99" 
デモ


Transformation Operators

Observable を変換(現在の Observable に基づいて、新しい Observable を生成)する Operators。


map

このメソッドを実行した Observable がnext通知する値に対して任意の関数を実行し、その戻り値をnext通知する Observable を生成する。

Rx.Observable.of(1, 2, 3) 
  // `map`を実行した Observable(`Observable.of(1, 2, 3)`)が通知する値(`1`、`2`、`3`)に対して 
  // 値を10倍にする関数を実行。その戻り値(`10`、`20`、`30`)を`next`通知する Observable を生成する。 
  .map(value => value * 10) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 10" 
// => "next: 20" 
// => "next: 30" 
// => "complete" 
デモ


mapTo

固定の値をnext通知する Observable を生成する。

Rx.Observable.of(1, 2, 3) 
  .mapTo(10) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 10" 
// => "next: 10" 
// => "next: 10" 
// => "complete" 
デモ


mapTo の使い所
クリック時などに固定の値をnext通知したい時に利用する。

<button id="increment">increment</button> 
const increment = document.getElementById('increment'); 
 
Rx.Observable.fromEvent(increment, 'click') 
  .mapTo(1) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// button をクリックする度に以下がコンソール出力される。 
// => "next: 1" 
デモ


scan

このメソッドを実行した Observable がnext通知する値に対して任意の関数を実行し、その戻り値を累積してnext通知する Observable を生成する。

const observable = Rx.Observable.of(1, 2, 3, 4, 5); 
 
observable 
  // `scan`を実行した Observable(`observable`)が`next`通知する値(`1`、`2`、`3`、`4`、`5`)に対して 
  // 累積した値と通知された値を加算する関数(`(accumulator, value) => accumulator + value`)を実行し、その戻り値を累積して通知する。 
  // `accumulator` に累積された値が渡され(初期値が第2引数である`0`)、`value`に通知された値が渡される。 
  // そのため、今回の関数の場合、以下のように値を加算して累積する 
  // 1回目の`next`通知では`accumulator`が`0`で`value`が`1`なので、`0 + 1`の戻り値である`1`を累積して`next`通知する 
  // 2回目の`next`通知では`accumulator`が`1`で`value`が`2`なので、`1 + 2`の戻り値である`3`累積して`next`通知する 
  // 3回目の`next`通知では`accumulator`が`3`で`value`が`3`なので、`3 + 3`の戻り値である`6`累積して`next`通知する 
  // 4回目の`next`通知では`accumulator`が`6`で`value`が`4`なので、`6 + 4`の戻り値である`10`累積して`next`通知する 
  // 5回目の`next`通知では`accumulator`が`10`で`value`が`5`なので、`10 + 5`の戻り値である`15`累積して`next`通知する 
  .scan((accumulator, value) => accumulator + value, 0) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1" 
// => "next: 3" 
// => "next: 6" 
// => "next: 10" 
// => "next: 15" 
// => "complete" 
デモ


mergeMap

このメソッドを実行した Observable がnext通知をしたら、任意の Observable を生成する。

以下の場合、mergeMapを実行した Observable がnext通知する値に対して Observable を生成する関数を実行し、その戻り値である Observable をマージした Observable を生成する。

const observable = Rx.Observable.of(2000, 1000, 3000); 
 
// `mergeMap`を実行した Observable(`observable`)が通知する値(`2000`、`1000`、`3000`)に対して 
// Observable を生成する関数(`v => Rx.Observable.timer(v).mapTo(v)`)を実行し、 
// その戻り値である以下の Observable をマージした Observable を生成する。 
// Rx.Observable.timer(2000).mapTo(2000) 
// Rx.Observable.timer(1000).mapTo(1000) 
// Rx.Observable.timer(3000).mapTo(3000) 
const merged = observable.mergeMap(v => Rx.Observable.timer(v).mapTo(v)); 
 
merged.subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: 1000" 
// => "next: 2000" 
// => "next: 3000" 
// => "complete" 
デモ

つまり、上記で生成される Observable は以下のように生成される Observable と同じ。

Rx.Observable.merge( 
  Rx.Observable.timer(2000).mapTo(2000), 
  Rx.Observable.timer(1000).mapTo(1000), 
  Rx.Observable.timer(3000).mapTo(3000) 
).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: 1000" 
// => "next: 2000" 
// => "next: 3000" 
// => "complete" 
デモ

mergeMapを利用した方が柔軟に Observable を生成できる。


map と何が違うのか
上記の処理はmapでも実現できそうだが、mapを利用すると以下のようにコンソール出力が異なる。

const observable = Rx.Observable.of(2000, 1000, 3000); 
 
observable 
  // このメソッドを実行した Observable(`observable`)が通知する値(`2000`、`1000`、`3000`)に対して 
  // Observable を生成する関数(`v => Rx.Observable.timer(v).mapTo(v)`)を実行し、 
  // その戻り値(Observable)を通知する Observable を生成する。 
  // 今回の場合、通知する戻り値は以下の3つの Observable。 
  // Rx.Observable.timer(2000).mapTo(2000) 
  // Rx.Observable.timer(1000).mapTo(1000) 
  // Rx.Observable.timer(3000).mapTo(3000) 
  .map(v => Rx.Observable.timer(v).mapTo(v)) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: [object Object]" 
// [object Object]は Rx.Observable.timer(2000).mapTo(2000) 
// 
// => "next: [object Object]" 
// [object Object]は Rx.Observable.timer(1000).mapTo(1000) 
// 
// => "next: [object Object]" 
// [object Object]は Rx.Observable.timer(3000).mapTo(3000) 
// 
// => "complete" 
デモ

mapを利用した場合、生成した Observable をnext通知する Observable が生成される。

そのため、生成した Observable から通知をさせたい場合はmergeMapを利用する。


switchMap

mergeMapと同様で、このメソッドを実行した Observable がnext通知をしたら、任意の Observable を生成する。

mergeMapと異なる点は、このメソッドを実行した Observable がnext通知する度に、前に生成された Observable は破棄されて新しい Observable が生成される。

const observable = Rx.Observable.interval(5000); 
 
observable 
  // `switchMap`を実行した Observable(`observable`)が`next`通知をしたら、 
  // 任意の Observable(`Rx.Observable.interval(1000)`)を生成する。 
  .switchMap(() => Rx.Observable.interval(1000)) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// このタイミングで`observable`が`next`通知をするため、前に生成された Observable(`Rx.Observable.interval(1000)`)は 
// 破棄されて、新しい Observable(`Rx.Observable.interval(1000)`)が生成される。 
// そのため、`next`通知される値も`0`からになる。 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// ... 
デモ


Utility Operators

ユーティリティーな Operators。


do

Observable に影響を与えずに、通知の途中でロギングなどの処理を実行できる。

Rx.Observable.of(1, 2, 3, 4, 5) 
  // `do`内の関数(処理)は Observable に影響を与えない 
  .do(v => console.log(`map を実行前: ${v}`)) 
  .map(v => v * 10) 
  .do(v => console.log(`map を実行後: ${v}`)) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "map を実行前: 1" 
// => "map を実行後: 10" 
// => "next: 10" 
// => "map を実行前: 2" 
// => "map を実行後: 20" 
// => "next: 20" 
// => "map を実行前: 3" 
// => "map を実行後: 30" 
// => "next: 30" 
// => "map を実行前: 4" 
// => "map を実行後: 40" 
// => "next: 40" 
// => "map を実行前: 5" 
// => "map を実行後: 50" 
// => "next: 50" 
// => "complete" 
デモ


delay

このメソッドを実行した Observable の通知を、引数で指定した時間(ms)遅らせる。

// 3秒後に`1`を通知する Observable 
const observable = Rx.Observable.of(1).delay(3000); 
 
observable.subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 3秒後に以下がコンソール出力される 
// => "next: 1" 
// => "complete" 
デモ


Error Handling Operators

Observable のエラーハンドリングができる Operators。


catch

このメソッドを実行した Observable がerror通知をした際に、Observable を生成する関数を実行し、その戻り値である Observable から通知を再開する。

再開した Observable が通知を正常にできた場合、Observer にerror通知はされない。

const promiseReject = () => new Promise((resolve, reject) => reject('reject!!')); 
const observable = Rx.Observable.from(promiseReject()); 
 
observable 
  // このメソッドを実行した Observable(`observable`)が`error`を通知した際に、 
  // Observable を生成する関数(`error => Rx.Observable.of(error)`)を実行し、 
  // その戻り値である Observable(`Rx.Observable.of(error)`)から通知を再開する。 
  .catch(error => Rx.Observable.of(error)) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// 今回`catch`で生成した Observable(`Rx.Observable.of(error)`)から`next`通知が 
// 再開されるため、以下がコンソール出力される。 
// => "next: reject!!" 
// => "complete" 
// 再開した Observable が通知を正常にできたので、Observer に`error`通知はされない。 
デモ


retry

このメソッドを実行した Observable がerror通知をした場合、Observable を再試行する(通知を最初からやり直す)。

再試行する回数は引数で指定した数。指定しない場合は無限に再試行する。

// `map`を実行した Observable(`Rx.Observable.interval(1000)`)が 
// `2`より大きな値を`next`通知したら、`error`通知をする Observable 
const observable = Rx.Observable.interval(1000).map(value => { 
  if (value > 2) { 
    throw new Error('2より大きい!'); 
  } else { 
    return value; 
  } 
}); 
 
observable 
  .retry(2) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// このタイミングで`observable`から`error`通知がされるため、1回目の再試行をする。 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// このタイミングで`observable`から`error`通知がされるため、2回目の再試行をする。 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// 再試行が2回失敗した(依然として`observable`から`error`通知がされる)ため、Observer に`error`通知をして終了。 
// => "error: Error: 2より大きい!" 
デモ


再試行に失敗した後に catch でエラーハンドリングする
// `map`を実行した Observable(`Rx.Observable.interval(1000)`)が 
// `2`より大きな値を`next`通知したら、`error`通知をする Observable。 
const observable = Rx.Observable.interval(1000).map(value => { 
  if (value > 2) { 
    throw new Error('2より大きい!'); 
  } else { 
    return value; 
  } 
}); 
 
observable 
  .retry(2) 
  .catch(() => Rx.Observable.of('catch!!')) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// このタイミングで`observable`から`error`通知がされるため、1回目の再試行をする。 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// このタイミングで`observable`から`error`通知がされるため、2回目の再試行をする。 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// 再試行が2回失敗した(依然として`error`通知がされる)ため、 
// `catch`で生成した Observable(`Rx.Observable.of('catch!!')`)から`next`通知が再開されて以下がコンソール出力される。 
// => "next: catch!!" 
// => "complete" 
デモ


Multicasting Operators

複数の Observer へ同一の通知を送れる Operators。


share

複数の Observer への通知を共有する。


share を利用しない場合
1 つの Observable から複数の Observer に通知をしても、それぞれの Observer が受け取る通知は別物になる。

const observable = Rx.Observable.interval(1000).take(5); 
 
// Observer1 を登録する 
observable.subscribe( 
  x => console.log(`Observer1 next: ${x}`), 
  error => console.error(`Observer1 error: ${error}`), 
  () => console.log('Observer1 complete') 
); 
 
setTimeout(() => { 
  // Observer2 を登録する 
  observable.subscribe( 
    x => console.log(`Observer2 next: ${x}`), 
    error => console.error(`Observer2 error: ${error}`), 
    () => console.log('Observer2 complete') 
  ); 
}, 2500); 
// => "Observer1 next: 0" 
// => "Observer1 next: 1" 
// このタイミングで Observer2 が登録され、通知が開始する(Observer1 とは異なる通知を受け取る)。 
// => "Observer1 next: 2" 
// => "Observer2 next: 0" 
// => "Observer1 next: 3" 
// => "Observer2 next: 1" 
// => "Observer1 next: 4" 
// => "Observer1 complete" 
// => "Observer2 next: 2" 
// => "Observer2 next: 3" 
// => "Observer2 next: 4" 
// => "Observer2 complete" 
デモ


share を利用する場合
それぞれの Observer が受け取る通知は共通のものになる。

const observable = Rx.Observable.interval(1000) 
  .take(5) 
  .share(); 
 
// Observer1 を登録する 
observable.subscribe( 
  x => console.log(`Observer1 next: ${x}`), 
  error => console.error(`Observer1 error: ${error}`), 
  () => console.log('Observer1 complete') 
); 
 
setTimeout(() => { 
  // Observer2 を登録する 
  observable.subscribe( 
    x => console.log(`Observer2 next: ${x}`), 
    error => console.error(`Observer2 error: ${error}`), 
    () => console.log('Observer2 complete') 
  ); 
}, 2500); 
// => "Observer1 next: 0" 
// => "Observer1 next: 1" 
// このタイミングで Observer2 が登録され、Observer1 と同じ通知を受け取る。 
// => "Observer1 next: 2" 
// => "Observer2 next: 2" 
// => "Observer1 next: 3" 
// => "Observer2 next: 3" 
// => "Observer1 next: 4" 
// => "Observer2 next: 4" 
// => "Observer1 complete" 
// => "Observer2 complete" 
デモ


Filtering Operators

現在の Observable が通知する値に基づいて、特定の値だけを通知する Observable を生成する Operators。


filter

このメソッドを実行した Observable が通知する値に対して条件判定をする関数を実行し、条件に合った(関数の戻り値がtrueになった)値を通知する Observable を生成する。

Rx.Observable.of(1, 2, 3, 4) 
  // `filter`を実行した Observable(`Observable.of(1, 2, 3, 4)`)が 
  // 通知する値(1, 2, 3, 4)に対して条件判定をする関数(`value => value % 2 === 0`)を実行し、 
  // 条件に合った値(2, 4)を通知する Observable を生成する。 
  .filter(value => value % 2 === 0) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 2" 
// => "next: 4" 
// => "complete" 
デモ

条件に合う値がなければcompleteだけ通知される。

Rx.Observable.of(1, 3) 
  .filter(value => value % 2 === 0) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "complete" 


take

引数で指定した回数のnext通知をする Observable を生成する。

Rx.Observable.interval(1000) 
  .take(3) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// => "complete" 
デモ


takeUntil

引数の Observable がnextを通知したら、このメソッドを実行した Observable のnext通知を止め、complete通知をする Observable を生成する。

const timer = Rx.Observable.timer(4500); 
const interval = Rx.Observable.interval(1000); 
 
interval 
  // 引数の Observable(`timter`)が `next`通知をしたら、Observable(`interval`)の 
  // `next`通知を止め、`complete`通知をする Observable を生成する。 
  .takeUntil(timer) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "complete" 
デモ


takeWhile

このメソッドを実行した Observable が通知する値に対して条件判定をする関数を実行し、条件に合わなかった(関数の戻り値がfalseになった)場合、このメソッドを実行した Observable のnext通知を止め、complete通知をする Observable を生成する。

const of = Rx.Observable.of(1, 2, 3, 4, 5); 
 
of 
  // Observable(`of`)が通知する値(1, 2, 3, 4, 5)に対して 
  // 条件判定をする関数(`v => v <= 3`)を実行して、条件に合わなかった(関数の戻り値が`false`になった)場合、 
  // このメソッドを実行した Observable(`of`)の`next`通知を止め、`complete`通知をする Observable を生成する。 
  .takeWhile(v => v <= 3) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "complete" 
デモ


distinctUntilChanged

前回next通知された値と異なる値のみをnext通知をする Observable を生成する。

Rx.Observable.of(1, 1, 2, 4, 4, 5, 5, 5, 3) 
  .distinctUntilChanged() 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1" 
// => "next: 2" 
// => "next: 4" 
// => "next: 5" 
// => "next: 3" 
// => "complete" 
デモ


比較対象がオブジェクトの場合
値がオブジェクトの場合、オブジェクトを比較する関数を定義すれば、前回のオブジェクトと異なるオブジェクトのみを通知できる。

Rx.Observable.of({ name: 'soarflat' }, { name: 'soarflat' }, { name: 'dom' }, { name: 'tom' }) 
  .distinctUntilChanged((prev, current) => prev.name === current.name) 
  .subscribe( 
    x => console.log(`next: ${x.name}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: soarflat" 
// => "next: dom" 
// => "next: tom" 
// => "complete" 
デモ


Combination Operators

複数の Observable を結合する Operators。


merge

複数の Observable をマージする。

// 2秒後に`1`を通知する Observable 
const observable = Rx.Observable.timer(2000).mapTo(1); 
// 1秒後に`2`を通知する Observable 
const observable2 = Rx.Observable.timer(1000).mapTo(2); 
// 3秒後に`3`を通知する Observable 
const observable3 = Rx.Observable.timer(3000).mapTo(3); 
 
// 複数の Observable(`observable`、`observable2`、`observable3`) をマージする。 
// 今回の場合、以下の通知をする Observable が生成される。 
// 1秒後に`2`を通知する 
// 2秒後に`1`を通知する 
// 3秒後に`3`を通知する 
const merged = Rx.Observable.merge(observable, observable2, observable3); 
 
merged.subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => "next: 2" 
// => "next: 1" 
// => "next: 3" 
// => "complete" 
デモ


concat

指定した Observable が順番に通知を開始する Observable を生成する。

以下のように、1つの目の Observable がcomplete通知をしたら、2つ目の Observable が通知を開始する。そして、3つの目の Observable がcomplete通知をしたら、3つ目の Observable が通知を開始する。

const observable = Rx.Observable.of(1, 2, 3).delay(2000); 
const observable2 = Rx.Observable.of(4, 5, 6).delay(1000); 
const observable3 = Rx.Observable.of(7, 8, 9); 
 
Rx.Observable 
  // 指定したObservable(`observable`、`observable2`、`observable3`)が 
  // 順番に通知を開始するObservable を生成する。 
  .concat(observable, observable2, observable3) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// `observable`が`complete`通知をしたら、`observable2`が通知を開始する。 
// => "next: 4" 
// => "next: 5" 
// => "next: 6" 
// `observable2`が`complete`通知をしたら、`observable3`が通知を開始する 
// => "next: 7" 
// => "next: 8" 
// => "next: 9" 
// => "complete" 
デモ


指定した Observable がcomplete通知をしない場合
complete通知をしない Observable を指定した場合、それ以降の Observable は決して通知を開始しない。

const observable = Rx.Observable.interval(1000); 
const observable2 = Rx.Observable.of(4, 5, 6); 
 
Rx.Observable.concat(observable, observable2).subscribe( 
  x => console.log(`next: ${x}`), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// `observable`は`complete`通知をしないため、`observable2`は決して通知を開始しない。 
// => "next: 0" 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// ... 
// => "next: 67" 
// => "next: 68" 
デモ


forkJoin

指定した Observable が全てcomplete通知をしたら、それぞれの Observable が最後に通知した値をまとめた配列をnext通知する Observable を生成する。

const observable = Rx.Observable.timer(1000); 
const observable2 = Rx.Observable.of(1, 2, 3); 
 
Rx.Observable 
  // 指定した Observable(`observable`、`observable2`)が全て`complete`通知をしたら、 
  // それぞれの Observable が最後に通知した値をまとめた配列を`next`通知する Observable を生成する。 
  // 今回、`observable` が最後に`next`通知する値は`0`であり、`observable2`が最後に`next`通知する値は`3`のため、 
  // この Observable が`next`通知する値は`[0, 3]` 
  .forkJoin(observable, observable2) 
  .subscribe( 
    x => console.log(x), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => [0, 3] 
// => "complete" 
デモ


pairwise

1 つ前のnext通知の値と、現在のnext通知の値を配列にまとめてnext通知する Observable を生成する。

Rx.Observable.of(1, 2, 3, 4, 5, 6) 
  .pairwise() 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1,2" 
// => "next: 2,3" 
// => "next: 3,4" 
// => "next: 4,5" 
// => "next: 5,6" 
// => "complete" 
デモ


pairwise を利用する上での注意点
pairwiseを実行する Observable が 2 回next通知をしないと生成した Observable はnext通知をしない。

そのため、以下のように通知が 1 回しかされない Observable の場合、complete通知だけされる。

Rx.Observable.of(1) 
  .pairwise() 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "complete" 
デモ


startWith

引数の値を最初に通知する Observable を生成する。

Rx.Observable.of(1, 2, 3) 
  .startWith(999) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 999" 
// => "next: 1" 
// => "next: 2" 
// => "next: 3" 
// => "complete" 
デモ


zip

指定した Observable が全てnext通知をしたら、通知された値を配列にまとめてnext通知する Observable を生成する。

const observable = Rx.Observable.of(1, 11); 
const observable2 = Rx.Observable.of(2, 12); 
const observable3 = Rx.Observable.of(3, 13); 
 
Rx.Observable 
  // 指定した全ての Observable(`observable`、`observable2`、`observable3`)が`next`通知をしたら、 
  // 通知された値を配列にまとめて`next`通知する Observable(observableZipped)を生成する。 
  // 今回、指定した Observable が 1回目に`next`通知をする値はそれぞれ`1`、`2`、`3`であり、 
  // 2回目に`next`通知をする値は`11`、`12`、`13`である。 
  // そのため、この Observable が`next`通知する値は`[1, 2, 3]`(1回目)と`[11, 12, 13]`(2回目)である。 
  .zip(observable, observable2, observable3) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1,2,3" 
// => "next: 11,12,13" 
// => "complete" 
デモ


生成した Observable がnext通知するタイミング
指定した Observable が全てnext通知したタイミングで通知する。

const observable = Rx.Observable.of(1); 
const observable2 = Rx.Observable.of(2); 
// 1秒後に`0`を`next`通知する Observable 
const observable3 = Rx.Observable.timer(1000); 
 
Rx.Observable 
  // 1秒後に`[1, 2, 0]`を`next`通知する Observable を生成する 
  .zip(observable, observable2, observable3) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// 1秒後に以下がコンソール出力される 
// => "next: 1,2,0" 
// => "complete" 
デモ


生成した Observable がnext通知をする回数
指定した Observable の中で、next通知が1番少ない回数だけ通知する。

// 通知回数4回 
const observable = Rx.Observable.of(1, 2, 3, 4); 
// 通知回数3回 
const observable2 = Rx.Observable.of(5, 6, 7); 
// 通知回数無限 
const observable3 = Rx.Observable.interval(1000); 
 
Rx.Observable 
  // 今回指定した Observable の中で、`next`通知回数が1番少ないのは`observable2`である。 
  // そのため、今回生成した Observable が`next`通知をする回数は3回 
  .zip(observable, observable2, observable3) 
  .subscribe( 
    x => console.log(`next: ${x}`), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// => "next: 1,5,0" 
// => "next: 2,6,1" 
// => "next: 3,7,2" 
// この時点で`observable2`が`complete`通知をするので、"complete"が出力される 
// => "complete" 
デモ


combineLatest

指定した Observable が全てnext通知をしたら、それぞれの Observable が最後にnext通知した値を配列にまとめてnext通知する Observable を生成する。

// 1秒毎に`"A"`、`"B"`、`"C"`、`"D"`、`"E"`を通知する Observable 
const observable = Rx.Observable.interval(1000) 
  .take(5) 
  .map(index => 'ABCDE'[index]); 
// 1.4秒毎に`"F"`、`"G"`、`"H"`、`"I"`、`"J"`を通知する Observable 
const observable2 = Rx.Observable.interval(1400) 
  .take(5) 
  .map(index => 'FGHIJ'[index]); 
// 2.2秒毎に`"K"`、`"L"`、`"M"`、`"N"`、`"O"`を通知する Observable 
const observable3 = Rx.Observable.interval(2200) 
  .take(5) 
  .map(index => 'KLMNO'[index]); 
 
// 指定した Observable(`observable`、`observable2`、`observable3`)が全て`next`通知をしたら、 
// それぞれの Observable が最後に`next`通知した値を配列にまとめて`next`通知する Observable を生成する。 
const combined = Rx.Observable.combineLatest(observable, observable2, observable3); 
 
combined.subscribe( 
  x => console.log(x), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// 生成した Observable が最初に`next`通知をするタイミングは`observable3`が1回目(2.2秒後)の`next`通知をした時。 
// それまでに以下の`next`通知がされている。 
// 1. `observable`が`"A"`を`next`通知する。 
// 2. `observable2`が`"F"`を`next`通知する。 
// 3. `observable`が`"B"`を`next`通知する。 
// 4. `observable3`が`"K"`を`next`通知する。 
//    このタイミングで指定した Observable が全て`next`通知をしたので、 
//    それぞれの Observable が最後に通知した値がまとめられた配列が`next`通知され、以下がコンソール出力される。 
// => ["B", "F", "K"] 
// 
// これ以降は、いずれかの Observable(`observable`、`observable2`、`observable3`)が`next`通知する度に、 
// それぞれの Observable が最後に通知した値を配列にまとめて`next`通知をするので、以下がコンソール出力される。 
// => ["B", "G", "K"] 
// => ["C", "G", "K"] 
// => ["D", "G", "K"] 
// => ["D", "H", "K"] 
// => ["D", "H", "L"] 
// => ["E", "H", "L"] 
// => ["E", "I", "L"] 
// => ["E", "I", "M"] 
// => ["E", "J", "M"] 
// => ["E", "J", "N"] 
// => ["E", "J", "O"] 
// =>  "complete" 
デモ


withLatestFrom

このメソッドを実行した Observable がnext通知をした際に、引数の全ての Observable が一度でもnext通知済みであれば、それぞれの Observable が最後にnext通知した値を配列にまとめてnext通知する Observable を生成する。

// 1秒毎に`"A"`、`"B"`、`"C"`、`"D"`、`"E"`を通知する Observable 
const observable = Rx.Observable.interval(1000) 
  .take(5) 
  .map(index => 'ABCDE'[index]); 
// 1.4秒毎に`"F"`、`"G"`、`"H"`、`"I"`、`"J"`を通知する Observable 
const observable2 = Rx.Observable.interval(1400) 
  .take(5) 
  .map(index => 'FGHIJ'[index]); 
// 2.2秒毎に`"K"`、`"L"`、`"M"`、`"N"`、`"O"`を通知する Observable 
const observable3 = Rx.Observable.interval(2200) 
  .take(5) 
  .map(index => 'KLMNO'[index]); 
 
observable 
  // このメソッドを実行した Observable(`observable`)が`next`通知をした際に、 
  // 引数の全ての Observable(`observable2`、`observable3`) 
  // が一度でも`next`通知済みであれば、それぞれの Observable が最後に通知した値を配列にまとめて`next`通知する Observable を生成する。 
  .withLatestFrom(observable2, observable3) 
  .subscribe( 
    x => console.log(x), 
    error => console.error(`error: ${error}`), 
    () => console.log('complete') 
  ); 
// 生成した Observable が最初に`next`通知をするタイミングは`observable`が3回目(3秒後)の 
// `next`通知をした時であり、それまでに以下の`next`通知がされている。 
// 1. `observable`が`"A"`を`next`通知する。 
// 2. `observable2`が`"F"`を`next`通知する。 
// 3. `observable`が`"B"`を`next`通知する。 
// 4. `observable2`が`"G"`を`next`通知する。 
// 5. `observable3`が`"K"`を`next`通知する。 
// 6. `observable`が`"C"`を`next`通知する。 
//    このタイミングで引数の全ての Observable(`observable2`、`observable3`)が`next`通知済みなので、 
//    それぞれの Observable が最後に通知した値をまとめた配列が`next`通知され、以下がコンソール出力される。 
// => ["C", "G", "K"] 
// 
// これ以降は、`observable`が`next`通知をする度に、それぞれの Observable が最後に通知した値を 
// 配列にまとめて`next`通知をするので、以下がコンソール出力される。 
// => ["D", "G", "K"] 
// => ["E", "H", "L"] 
// => "complete" 


その他 Operators

その他の Operators(ドキュメントで明記されていないものはこちらに記載、v6 にも存在する)。


ajax

AJAX(HTTP)リクエストの結果を通知する Observable を生成する。


リクエスト(通信)が成功した場合
const observable = Rx.Observable.ajax('https://jsonplaceholder.typicode.com/post/1'); 
 
observable.subscribe( 
  x => console.log(x.response), 
  error => console.error(`error: ${error}`), 
  () => console.log('complete') 
); 
// => { 
//   body: "quia et suscipit suscipit recusandae consequuntur expedita et cumreprehenderit molestiae ut ut quas totam nostrum rerum est autem sunt rem eveniet architecto", 
//   id: 1, 
//   title: "sunt aut facere repellat provident occaecati excepturi optio reprehenderit", 
//   userId: 1 
// } 
// => "complete" 
デモ


リクエスト(通信)が失敗した場合
const observable = Rx.Observable.ajax('https://jsonplaceholder.typicode.co'); 
 
observable.subscribe( 
  x => console.log(x.response), 
  error => console.error(`error: ${error.message}`), 
  () => console.log('complete') 
); 
// => "error: ajax error 0" 
デモ


RxJS を利用したコードと利用しないコードを比較する

RxJS のメリットを確認するために、以下のサンプルを RxJS を利用する場合と利用しない場合で実装し、それぞれのコードを比較する。

  • 選択した ID の投稿を取得して描画する
  • もし、投稿を取得中で別の ID を選択した場合、リクエスト(通信)を中断して新しく選択した ID の投稿だけを取得する


abort.gif


※ Chrome の「Network」タブで回線を絞れば再現できます。


RxJS を利用しない場合

リクエストにはXMLHttpRequestを利用(axios の利用も検討したが、コードの長さがあまり変わらなかったので今回は利用していない)。通信中かどうかはisFetchingで管理。

const select = document.querySelector('select'); 
const result = document.getElementById('result'); 
 
let request; 
let isFetching = false; 
 
const render = post => { 
  result.innerHTML = post.title; 
}; 
 
const fetchPost = id => { 
  return new Promise(resolve => { 
    request = new XMLHttpRequest(); 
    request.responseType = 'json'; 
    request.open('GET', `https://jsonplaceholder.typicode.com/posts/${id}`, true); 
    request.onload = () => { 
      if (request.readyState === XMLHttpRequest.DONE && request.status === 200) { 
        resolve(request.response); 
      } 
    }; 
    request.send(null); 
  }); 
}; 
 
select.addEventListener('change', e => { 
  if (isFetching) { 
    request.abort(); 
  } else { 
    isFetching = true; 
  } 
 
  fetchPost(e.target.value).then(res => { 
    isFetching = false; 
    render(res); 
  }); 
}); 
デモ


RxJS を利用した場合

const select = document.querySelector('select'); 
const result = document.getElementById('result'); 
 
const id$ = Rx.Observable.fromEvent(select, 'change').map(e => e.target.value); 
 
const fetchPost = id => 
  Rx.Observable.ajax(`https://jsonplaceholder.typicode.com/posts/${id}`).map(res => res.response); 
 
const render = post => { 
  result.innerHTML = post.title; 
}; 
 
id$.switchMap(fetchPost).subscribe(render); 
デモ

RxJS を利用しない場合と比べて以下のようになった。

  • 簡潔になった
  • 可読性が高くなった
  • 副作用を減らせた(RxJS を利用しない場合、fetchPostなどは関数外の変数requestに依存していたが、それがなくなった)
というわけで RxJS を利用することで、非同期処理を効率良く実装できた。


終わり

学習が難しいですが、複雑な非同期処理を実装する際には有用なライブラリだと思います。

今回は導入編のため、次回はより RxJS の理解を深めるために知っておいた方が良い以下の項目などに関して投稿する予定です(多分)。

  • Subject
  • Scheduler
  • Observable の Hot、Cold の性質
  • Operator を複数組み合わせたサンプル


RxJS 学習を手助けするマーブルダイアグラム

本記事では記載しませんでしたが、RxJS の Observable は マーブルダイアグラム(Marble Diagram)という図で表現されていることが多いです。

以下のサイトでは、様々な Operators を利用して生成した Observable のマーブルダイアグラムが掲載されていますので、RxJS 学習の手助けになると思います。

RxJS Marbles

とは言えども、複雑な機能の Operators から生成された Observable のマーブルダイアグラムだけ見ても理解するのは困難なので、実際にコードを書いてみて動作を確認した方が良いと思います。

以下のサイトだとマーブルダイアグラムがリアルタイムで描画されるため、わかりやすいです。また、自分で書いたコードもマーブルダイアグラムで描画できます(V6 対応なので注意)。

Rx Visualizer


参考

所感ですが、RxJS の用語は同義語がいくつかあるため、記事によって用いられている言葉や説明の仕方がかなり異なります(他のライブラリやフレームワークの記事と比べて)。

例えば、この記事での「値を通知する」は、別の記事では「値を流す」と記載されています。どちらの表現が分かりやすいと感じるかは人それぞれだと思います。

そのため、どの記事が分かりやすいと感じるかも人それぞれだと思いますので、たくさんの記事に目を通してみることをおすすめします。

コメント

このブログの人気の投稿

投稿時間:2021-06-17 05:05:34 RSSフィード2021-06-17 05:00 分まとめ(1274件)

投稿時間:2021-06-20 02:06:12 RSSフィード2021-06-20 02:00 分まとめ(3871件)

投稿時間:2020-12-01 09:41:49 RSSフィード2020-12-01 09:00 分まとめ(69件)