前回、前々回と Server Streaming 通信と Client Streaming 通信について解説してきました。今回はこれらを合わせた通信方法である Duplex Streaming 通信について見ていきます。
Step.1 - サービス定義
いつも通り、まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。
using System.Threading.Tasks; using MagicOnion; namespace MagicOnionSample.ServiceDefinition { public interface ISampleApi : IService<ISampleApi> { Task<DuplexStreamingResult<int, int>> DuplexSample(); //Task<DuplexStreamingResult<int, int>> DuplexSample(string arg); // 引数はダメ } }
戻り値の型を Task<DuplexStreamingResult<TRequest, TResponse>>
とするのがポイントです。また、Client Streaming のときと同様にメソッドに引数を設定できないので注意が必要です。
Step.2 - サービスの実装
Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。
using System; using System.Threading.Tasks; using MagicOnion; using MagicOnion.Server; using MagicOnionSample.ServiceDefinition; namespace MagicOnionSample.Service { public class SampleApi : ServiceBase<ISampleApi>, ISampleApi { public async Task<DuplexStreamingResult<int, int>> DuplexSample() { var streaming = this.GetDuplexStreamingContext<int, int>(); var task = streaming.ForEachAsync(async x => { //--- クライアントから送信された値が偶数だったら 2 倍にして返してみたり Console.WriteLine($"Received : {x}"); if (x % 2 == 0) await streaming.WriteAsync(x * 2); }); //--- サーバー側から任意のタイミングで送信してみたり await Task.Delay(100); // テキトーにずらしたり await streaming.WriteAsync(123); await streaming.WriteAsync(456); //--- メッセージの受信がすべて終わるまで待つ await task; //--- サーバーからの送信が完了したことを通知 return streaming.Result(); } } }
ポイントは以下の 5 点です。これまで Server Streaming と Client Streaming で見てきたもののミックスになっているので結構複雑です。
GetDuplexStreamingContext<TRequest, TResponse>
からストリーミング通信するためのコンテキストを取得- クライアント側から送信されるメッセージを
ForEachAsync
で受信 - クライアント側から送信完了通知が来るまでメッセージを受信し続ける
WriteAsync
でサーバー側からメッセージを送信Result
メソッドで完了通知を送信
Step.3 - クライアントの実装
最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。こちらもサーバー側と同様に、受信と送信が入り乱れるので難易度が高いです。
using System; using System.Linq; using System.Threading.Tasks; using Grpc.Core; using MagicOnion; using MagicOnion.Client; using MagicOnionSample.ServiceDefinition; namespace MagicOnionSample.Client { class Program { static void Main() => MainAsync().Wait(); static async Task MainAsync() { //--- API に接続するためのチャンネルとクライアントを生成 var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure); var client = MagicOnionClient.Create<ISampleApi>(channel); //--- ForEachAsync でサーバーからのメッセージを受信 var streaming = await client.DuplexSample(); var task = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Response : {x}")); //--- WriteAsync でサーバー側にメッセージを送信 //--- CompleteAsync で送信完了を通知 foreach (var x in Enumerable.Range(0, 5)) await streaming.RequestStream.WriteAsync(x); await streaming.RequestStream.CompleteAsync(); //--- メッセージの受信完了を待機 await task; //--- アプリが終わらないように Console.ReadLine(); } } }
実行してみる
実行すると以下のような結果が得られます。結構に複雑なので、実際にデバッガーでひとつずつステップ実行して試してみることをおすすめします。