xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

gRPC / MagicOnion 入門 (7) - Duplex Streaming 通信

前回前々回と Server Streaming 通信と Client Streaming 通信について解説してきました。今回はこれらを合わせた通信方法である Duplex Streaming 通信について見ていきます。

f:id:xin9le:20170604153558p:plain

Step.1 - サービス定義

いつも通り、まずサーバー側で提供するサービスのインターフェースを定義します。例えば以下のようになります。

using System.Threading.Tasks;
using MagicOnion;

namespace MagicOnionSample.ServiceDefinition
{
    public interface ISampleApi : IService<ISampleApi>
    {
        Task<DuplexStreamingResult<int, int>> DuplexSample();
      //Task<DuplexStreamingResult<int, int>> DuplexSample(string arg);  // 引数はダメ
    }
}

戻り値の型を Task<DuplexStreamingResult<TRequest, TResponse>> とするのがポイントです。また、Client Streaming のときと同様にメソッドに引数を設定できないので注意が必要です。

Step.2 - サービスの実装

Step.1 で定義したインターフェースを実装します。例えば以下のような感じです。

using System;
using System.Threading.Tasks;
using MagicOnion;
using MagicOnion.Server;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Service
{
    public class SampleApi : ServiceBase<ISampleApi>, ISampleApi
    {
        public async Task<DuplexStreamingResult<int, int>> DuplexSample()
        {
            var streaming = this.GetDuplexStreamingContext<int, int>();
            var task = streaming.ForEachAsync(async x =>
            {
                //--- クライアントから送信された値が偶数だったら 2 倍にして返してみたり
                Console.WriteLine($"Received : {x}");
                if (x % 2 == 0)
                    await streaming.WriteAsync(x * 2);
            });

            //--- サーバー側から任意のタイミングで送信してみたり
            await Task.Delay(100);  // テキトーにずらしたり
            await streaming.WriteAsync(123);
            await streaming.WriteAsync(456);

            //--- メッセージの受信がすべて終わるまで待つ
            await task;

            //--- サーバーからの送信が完了したことを通知
            return streaming.Result();
        }
    }
}

ポイントは以下の 5 点です。これまで Server Streaming と Client Streaming で見てきたもののミックスになっているので結構複雑です。

  • GetDuplexStreamingContext<TRequest, TResponse> からストリーミング通信するためのコンテキストを取得
  • クライアント側から送信されるメッセージを ForEachAsync で受信
  • クライアント側から送信完了通知が来るまでメッセージを受信し続ける
  • WriteAsync でサーバー側からメッセージを送信
  • Result メソッドで完了通知を送信

Step.3 - クライアントの実装

最後に、Step.2 までで実装した API を呼び出すクライアントを作成します。以下のような感じです。こちらもサーバー側と同様に、受信と送信が入り乱れるので難易度が高いです。

using System;
using System.Linq;
using System.Threading.Tasks;
using Grpc.Core;
using MagicOnion;
using MagicOnion.Client;
using MagicOnionSample.ServiceDefinition;

namespace MagicOnionSample.Client
{
    class Program
    {
        static void Main() => MainAsync().Wait();

        static async Task MainAsync()
        {
            //--- API に接続するためのチャンネルとクライアントを生成
            var channel = new Channel("localhost", 12345, ChannelCredentials.Insecure);
            var client = MagicOnionClient.Create<ISampleApi>(channel);

            //--- ForEachAsync でサーバーからのメッセージを受信
            var streaming = await client.DuplexSample();
            var task = streaming.ResponseStream.ForEachAsync(x => Console.WriteLine($"Response : {x}"));

            //--- WriteAsync でサーバー側にメッセージを送信
            //--- CompleteAsync で送信完了を通知
            foreach (var x in Enumerable.Range(0, 5))
                await streaming.RequestStream.WriteAsync(x);
            await streaming.RequestStream.CompleteAsync();

            //--- メッセージの受信完了を待機
            await task;

            //--- アプリが終わらないように
            Console.ReadLine();
        }
    }
}

実行してみる

実行すると以下のような結果が得られます。結構に複雑なので、実際にデバッガーでひとつずつステップ実行して試してみることをおすすめします。

f:id:xin9le:20170611235710p:plain