前回までで、IObservable<T>に対してLINQスタイルでの記述ができることを見てきました。これから3回ほどは、時間/イベント/非同期処理をIObservable<T>としてみなし、情報を通知する方法について触れていきます。
MouseMoveなどの各種イベント、一定間隔で発行されるタイマー、非同期処理の完了通知などはすべて、時間軸上のあるタイミングで発生します。Rxは、これら時間軸上で発生する情報をIObservable<T>のシーケンスとして捉えることで、その情報をフィルタリングしたり、変換したりできるようにしています。この考え方を頭に入れておくことは非常に重要です。@ITで@neueccさんが連載しているRx入門の第1回、「第1回 Reactive Extensionsの概要と利用方法」でも説明されていますので、ぜひ参考にしてください。
今回は、これらのうち一定時間間隔で情報を発行するIObservable<T>について見て行きたいと思います。
Observable.Timer
最初に紹介するのはObservable.Timerメソッドです。System.Threading.Timerクラスなどに代表される、よくあるタイマーです。第1引数には開始までの待ち時間を、第2引数には値の発行間隔を指定します。戻り値はIObservable<long>で、OnNextメソッドが呼び出される度に0から始まるインクリメントされた整数が通知されます。以下に挙動確認用の簡単なサンプルを示します。
using System; using System.Reactive.Linq; using System.Threading; namespace Sample13_Timer { class Program { static void Main() { var timer = Observable.Timer(TimeSpan.FromSeconds(3), TimeSpan.FromSeconds(1)); Console.WriteLine("A : 3秒後に1秒間隔のタイマーが開始されます"); var disposerA = timer.Subscribe(value => Console.WriteLine("A : OnNext({0})", value)); Thread.Sleep(1500); //--- 1.5秒タイミングをずらす Console.WriteLine("\\tB : 3秒後に1秒間隔のタイマーが開始されます"); var disposerB = timer.Subscribe(value => Console.WriteLine("\\tB : OnNext({0})", value)); Console.ReadLine(); //--- 何か押したらAを終了 disposerA.Dispose(); Console.ReadLine(); //--- 何か押したらBを終了 disposerB.Dispose(); Console.ReadLine(); //--- 何も流れないことを確認 } } } //----- 結果 /* A : 3秒後に1秒間隔のタイマーが開始されます B : 3秒後に1秒間隔のタイマーが開始されます A : OnNext(0) A : OnNext(1) B : OnNext(0) A : OnNext(2) B : OnNext(1) A : OnNext(3) B : OnNext(2) B : OnNext(3) B : OnNext(4) B : OnNext(5) */
上記のサンプルでは、タイマーAとBの開始時間を1.5秒間ずらし、0.5秒間隔で交互に値が発行されるようにしています。また、何かキーを押したらAのタイマーを停止し、再度何かキーを押したらBのタイマーを停止するようにしています。
ポイントは、Observable.Timerメソッドから返されたIObservable<long>をSubscribeメソッドで複数回の購読する場合の動きです。結果から、AのタイマーもBのタイマーも値が0から始まっていることが確認できます。これは、Subscribeメソッド呼び出し時にObservable.Timerの内部でタイマーが2つ生成されていることを示しています。つまり、ひとつのタイマーをA/Bの2者が監視するのではなく、タイマー2つをA/Bのそれぞれが監視しているということです。この違いは大きいので、間違わないようにしましょう。
Observable.Interval
Observable.IntervalメソッドはObservable.Timerメソッドを簡略化したもので、最初の値が発行されるまでの経過時間 (dueTime) を発行間隔 (period) としたものです。以下に、Observable.Intervalメソッドを利用して作成したデジタル時計のサンプルを示します。
using System; using System.Reactive.Linq; namespace Sample14_Interval { class Program { static void Main() { Observable.Interval(TimeSpan.FromSeconds(1)) .Subscribe(_ => { Console.Clear(); Console.WriteLine(DateTime.Now.ToString()); }); Console.WriteLine(DateTime.Now.ToString()); //--- 初期値 Console.ReadLine(); } } }
Rxを利用すると、この程度のものであれば極めて簡単に作成できます。「タイマーのコンポーネントを置いて、Callbackイベントを関連付けて...」などという従来の雑念は一切なく、「1秒おきに画面を消して現在時刻を表示する」というように、やりたいことをほぼほぼストレートに記述できるのが特徴です。
[ 追記 - 2012/01/07 22:30 ]
「Observable.IntervalメソッドはObservable.Timerメソッドを簡略化したもので、最初の値が発行されるまでの経過時間 (dueTime) をゼロ (TimeSpan.Zero) としたものです(キリッ!!」と思いっきり間違ったことを書いていたのを@neueccさんに指摘していただきました。テキトーにやってしまった自分に喝!@neueccさん、ありがとうございますm( )m
そして初期値を設定しなければならないようなサンプルコード作るくらいなら、Observable.TimerでdutTime = TimeSpan.Zeroにした方が良いだろうという本末転倒ぶりもまた...orz
タイマーは別スレッド
一般的なタイマーもそうですが、Observable.TimerメソッドやObservable.Intervalメソッドも別スレッドで動作します。既定ではThreadPool上です。よって、これに合わせてOnNextメソッドの処理もTimer/Intervalと同じスレッド上で動作します。Consoleクラスはスレッドセーフなので今回の例では問題になりませんが、Windows FormsやWPFなどのGUIアプリケーションの場合、UIスレッド以外からUIコンポーネントに触ることができません。つまり、Subscribe上での処理をUIスレッドに同期的に動作させなければならない場合があります。処理するスレッドに関する問題については、いつか別の機会に触れてみたいと思います。
参考記事
例に漏れず、@okazukiさんのblogに今回の内容が載っています。さらに、ここではめんどくさくて紹介していないObservable.Generateメソッドによる方法もあります。ぜひ、ご参照ください。
次回予告
今回は一定時間の経過に伴って情報を通知するメソッドについて見てきました。次回はイベントが発生したタイミングで通知を行うものについて触れてみたいと思います。