今回は並列ループを中断/停止する方法について見ていきます。
通常のfor/foreach文はbreakでループをエスケープすることができるのはご承知の通りと思いますが、残念ながらParallel.For/ForEachの中ではそれができません。for/foreach文はれっきとしたループですが、Parallel.For/ForEachの中身をデリゲート/ラムダ式で書くことから分かるように、実際にはループではなくメソッドだからです。並列処理になったからと言って中断/停止がないと困ります。TPLでは、Parallel.For/ForEachのオーバーロードのうちParallelLoopStateを利用できるものでこれらをサポートしています。
ループの停止
まずは、より理解が簡単なループの停止についてです。ループの停止はParallelLoopState.Stopメソッドを使用して行います。まずはサンプルを示します。
using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace Sample04_StopLoop { class Program { static void Main() { var stack = new ConcurrentStack<long>(); Parallel.For(0, 100000, (index, state) => { if (index < 50000) stack.Push(index); else state.Stop(); }); Console.WriteLine("個数 : " + stack.Count); } } } //----- 結果 (例) : 13335
インデックスが50000以上のものにアクセスした場合にループ処理を停止するようになっています。また、その間にいくつのインデックスにアクセスするかを計測しています。上記の結果では、ループの停止までに13335個にアクセスしたようです。通常のfor文であれば結果が50000個になってしかるべきですが、それよりもずっと少ない数で終了しているところを見ると、他の反復処理が行われていないと考えられます。
ParallelLoopState.Stopメソッドは、まだ開始されていないすべての反復処理に停止を命令することができます。それはもちろん、Stopメソッドを呼び出したのとは別のスレッド上のものに対しても有効です。TPLはStopメソッドが呼び出されたことを検出したら、呼び出された瞬間に実行されている処理の次の処理を行わないように振舞ってくれるということです。
並列処理中に他のスレッドでStopメソッドが呼び出されたかどうかは、ParallelLoopState.IsStoppedプロパティで確認することができます。ひとつの要素に対する処理が長い場合、一刻も早く終了させるときに使うと良いと思います。
また、アクセス数の計測にはSystem.Collections.Concurrent名前空間に定義されているConcurrentStackを利用しています。これは並列処理中に複数のスレッドから同時にアクセスされても大丈夫 (スレッドセーフ) なコレクションのひとつです。List<T>などのコレクションは複数のスレッドからアクセスされることを許容していないコレクションなので、このケースでの利用は予期しない結果になる可能性がありますので注意が必要です。スレッドセーフに利用できるコレクションはTPLのリリースと同時にSystem.Collections.Concurrent名前空間にいくつか定義されましたので、ぜひ活用してください。
ループの中断
ループの中断はParallelLoopState.Breakメソッドを使用して行います。こちらもサンプルを示します。
using System; using System.Collections.Concurrent; using System.Threading.Tasks; namespace Sample05_BreakLoop { class Program { static void Main() { var stack = new ConcurrentStack<long>(); Parallel.For(0, 100000, (index, state) => { stack.Push(index); if (index >= 50000) { state.Break(); Console.WriteLine("Index : " + index); Console.WriteLine("LowestBreakIteration : " + state.LowestBreakIteration); } }); Console.WriteLine("個数 : " + stack.Count); } } } //----- 結果 (例) /* Index : 50000 LowestBreakIteration : 50000 Index : 50016 LowestBreakIteration : 50000 個数 : 50002 */
インデックスが50000以上のものにアクセスした時点で中断するように指示しています。Breakメソッドが呼び出されたインデックスと、そのうちの最小のインデックスも表示します。また、処理がすべて終了するまでにアクセスされたインデックスの個数も同時に計測しています。
結果より、50002個のインデックスにアクセスしていることが分かります。また、50000以上のインデックスにアクセスした回数はBreakメソッドが呼び出された回数の2回です。このことから、50000未満のインデックスのすべてにアクセスされていることが分かります。Stopメソッドは呼び出されて以降すぐに他スレッドの残りの処理も終了させますが、Breakメソッドは呼び出されても残りの処理をすべて終了させることはありません。Breakメソッドが呼び出されたインデックスのうち、最小のインデックスまでのすべての処理を行うようになっています。Breakメソッドが呼び出されたインデックスの最小値はParallelLoopState.LowestBreakIterationプロパティで取得できます。TPLは内部でこの最小値以前のみを実行するように振舞ってくれます。
上記の動きをまとめると以下のようになります。
- 並列処理開始
- 50000要素目にアクセスされ、Breakメソッドが呼び出される
- LowestBreakIterationに50000が設定される
- 50016要素目にアクセスされ、Breakメソッドが呼び出される
- LowestBreakIterationには50016より小さい50000が設定されているため更新されない
- 50000要素目未満のみすべて実行される
確認のためにstackの中身をすべて表示してみると、50000要素目未満のすべてのインデックスが格納されていることがわかります。
ループの停止を知る
並列ループが途中で停止されたことを知りたい場合があります。そのためにはParallel.For/ForEachの戻り値であるParallelLoopResult構造体を利用することで行うことができます。次のサンプルを参考にしてください。
using System.Threading.Tasks; namespace Sample06_LoopResult { class Program { static void Main() { var result = Parallel.For(0, 100000, (index, state) => { //----- Do Something... }); if (result.IsCompleted) { //----- すべてのループが実行された } else if (result.LowestBreakIteration.HasValue) { //----- Breakされた } else { //----- Stopされた } } } }
Parallel.ForEachによる中断/停止
ここまでParallel.For文による中断/停止について見てきました。特にBreakメソッドはインデックスを利用して処理する要素を決めていることも確認しました。それでは、要素インデックスを知ることができないParallel.ForEach文はどうでしょうか?実はTPLが内部で処理中の要素インデックスを保持していて、Parallel.Forと同様に処理することができます。
次回予告
次回はスレッド別の変数を利用した並列処理について見ていきたいと思います。