前回は、Rxの動作確認などを楽にしてくれるSubject<T>について触れました。今回はこれの亜種を紹介します。
ReplaySubject<T>
ひとつ目はReplaySubject<T>です。ReplaySubject<T>は、OnNextが呼び出されたときに値を通知すると同時にその値をキャッシュします。新たにSubscribeで登録されると、登録と同時にキャッシュした値を全て通知します。下記に簡単なサンプル示します。Main関数以外は前回のサンプルと同じなので割愛します。
static void Main() { var subject = new ReplaySubject<int>(); var disposerA = subject.SubscribeTracer("A"); subject.OnNext(1); disposerA.DisposeTracer("A"); subject.OnNext(2); var disposerB = subject.SubscribeTracer("B"); subject.OnCompleted(); subject.OnNext(3); var disposerC = subject.SubscribeTracer("C"); } //----- 結果 /* ----- A : Subscribe Before ----- ----- A : Subscribe After ----- A : OnNext(1) ----- A : Dispose Before ----- ----- A : Dispose After ----- ----- B : Subscribe Before ----- B : OnNext(1) B : OnNext(2) ----- B : Subscribe After ----- B : OnCompleted ----- C : Subscribe Before ----- C : OnNext(1) C : OnNext(2) C : OnCompleted ----- C : Subscribe After ----- */
Aだけ見るとSubject<T>と特に変りないですが、Bを見ると違いは一目瞭然です。BをSubscribeで登録すると同時に値が2つ発行されています。さらに、OnCompletedを呼び出した後にCを登録すると、OnCompletedまでが呼び出されます。このとき、OnCompletedの後に呼び出されたOnNextは無視されるのがポイントです。それまでの通知を再現してくれるので、Replayという名前が付いています。
ポイントは以下の通りです。
- OnNextで通知した値はキャッシュされる
- Subscribeで新たに購読者が登録されたとき、それまでの値をすべて順番通りに再通知する
- 再通知されるのはOnCompletedが呼び出されるまでの値に限る
BehaviorSubject<T>
ふたつ目はBehaviorSubject<T>です。BehaviorSubject<T>は、OnNextが呼び出されたときに値を通知すると同時にその値をキャッシュします。新たにSubscribeで登録されると、登録と同時にキャッシュした値を通知します。ReplaySubject<T>と違うところは、ReplaySubject<T>がすべての値をキャッシュするのに対して、BehaviorSubject<T>は最新の値のみをキャッシュすることです。下記に簡単なサンプル示します。
static void Main() { var subject = new BehaviorSubject<int>(0); var disposerA = subject.SubscribeTracer("A"); subject.OnNext(1); disposerA.DisposeTracer("A"); subject.OnNext(2); var disposerB = subject.SubscribeTracer("B"); subject.OnNext(3); subject.OnCompleted(); var disposerC = subject.SubscribeTracer("C"); } //----- 結果 /* ----- A : Subscribe Before ----- A : OnNext(0) ----- A : Subscribe After ----- A : OnNext(1) ----- A : Dispose Before ----- ----- A : Dispose After ----- ----- B : Subscribe Before ----- B : OnNext(2) ----- B : Subscribe After ----- B : OnNext(3) B : OnCompleted ----- C : Subscribe Before ----- C : OnCompleted ----- C : Subscribe After ----- */
まずこれまでのものと異なるのは、インスタンス生成時に引数で初期値を指定することです。AをSubscribeで登録したとき、この初期値が通知されていることが確認できます。また、Bの登録時には最後に発行した2が表示されています。OnCompletedが呼び出された後に登録したCは、最後に通知したものがOnCompletedなので、OnCompletedが呼び出されています。
ポイントは以下の通りです。
- コンストラクタで初期値を指定する
- OnNextで通知した最新の値がキャッシュされる
- Subscribeで新たに購読者が登録されたとき、キャッシュしている値を通知する
- すでにOnCompletedが呼び出されている場合は、OnCompletedが呼び出される
AsyncSubject<T>
最後はAsyncSubject<T>です。AsyncSubject<T>は非同期処理を模倣した通知形式になっています。OnCompletedが呼び出されるまでは非同期処理実行中で、その間、通知した最新の値をキャッシュしておきます。OnCompletedの呼び出しで非同期処理が完了し、キャッシュしておいた結果と完了の通知を行います。OnCompleted後は非同期処理は行われていないので、キャッシュしておいた結果と完了の通知のみを行います。下記に簡単なサンプル示します。
static void Main() { var subject = new AsyncSubject<int>(); subject.OnNext(1); subject.SubscribeTracer("A"); subject.OnNext(2); Console.WriteLine("----- OnCompleted Before -----"); subject.OnCompleted(); Console.WriteLine("----- OnCompleted After -----"); subject.OnNext(3); subject.SubscribeTracer("B"); } //----- 結果 /* ----- A : Subscribe Before ----- ----- A : Subscribe After ----- ----- OnCompleted Before ----- A : OnNext(2) A : OnCompleted ----- OnCompleted After ----- ----- B : Subscribe Before ----- B : OnNext(2) B : OnCompleted ----- B : Subscribe After ----- */
AをSubscribeで登録してもそれ以前に発行したおいた値1は通知されません。また、値2を発行しても、その段階では通知されません。OnCompletedを呼び出すと、最後に発行した値2の通知と完了通知が行われます。OnCompleted後に呼び出した値3は無視されており、Bの登録時にOnCompleted呼び出し時と同じ結果が得られます。
ポイントは以下の通りです。
- OnNextに渡した値のうち、最新のものがキャッシュされる
- OnCompletedが呼び出されたときにキャッシュされた値が通知される
- OnCompleted以前にSubscribeしていても、OnCompletedまで値は通知されない
- OnCompleted後にSubscribeすると、OnNextとOnCompletedが呼び出される
参考記事
次の記事は、これらの理解を深めるための手助けになると思いますので、ぜひ参考にしてください。というか、神様2人の真似事をしている気がしないでもない...
- Reactive Extensionsの非同期周りの解説と自前実装
- Reactive Extensions入門 11「非同期処理用のSubject」
- Reactive Extensions再入門 その14「Nextメソッド」
次回予告
今回はSubject<T>の亜種、3種類について見てきました。基本となるSubject<T>も含めて4種類、それぞれ挙動が少しずつ異なるので、特徴を掴んでうまく使いこなしたいですね。次回は簡単にIObservable<T>を生成する方法について触れてみたいと思います。