xin9le.net

Microsoft の製品/技術が大好きな Microsoft MVP な管理人の技術ブログです。

Azure Functions (Isolated Worker) で QueueMessage 型にマップする

.NET で利用する Azure Functions には In-Process Model と Isolated Model のふたつがあります。Isolated Model は .NET 6 から利用できるようになった新しいタイプです。それぞれの違いについては公式ドキュメントをご覧ください。

.NET 7 以降では Isolated Model を利用することが強制されるので、.NET 6 以前から Azure Functions を利用している方は移行作業が必要になります。そのときに必要となる作業のひとつとして「QueueTrigger 開発時に QueueMessage 型に直接マップできない」というものがあります。どういうことか、実装を見てみましょう。

QueueMessage へのマップができない件の差分

In-Process Model のとき

関数のエントリーポイントの引数で直接 Azure.Storage.Queues.Models.QueueMessage 型にマップできます。QueueMessage 型には DequeueCountExpiresOn など、たまに使いたくなるようなプロパティが生えていて有用です。

[FunctionName(nameof(QueueTriggerFunction))]
public void EntryPoint(
    [QueueTrigger("sample-queue")] QueueMessage queueMessage)
{ }

Isolated Model のとき

一方で Isolated Model になると byte[] などの比較的プリミティブな型が Binding 可能な型としてサポートされているだけで、QueueMessage 型などの SDK 固有の型へはダイレクトにマップできません。では DequeueCount などへはどうやってアクセスすれば良いのかということになりますが、代替手法として FunctionContext.BindingContext.BindingData が汎用データストアとして用意されており、それを利用することになります。

[FunctionName(nameof(QueueTriggerFunction))]
public void EntryPoint(
    [QueueTrigger("sample-queue")] byte[] payload,
    FunctionContext context)
{
    // BindingData に IReadOnlyDictionary<string, object?> として入っている
    var kvs = context.BindingContext.BindingData;
    var value = (string)kvs["DequeueCount"]!;
    var dequeueCount = long.Parse(value);
}

上のサンプルにコメントとして記載しましたが、IReadOnlyDictionary<string, object?> として提供されるのでだいぶ扱いづらいんですよね。

QueueMessage に変換する

ということで扱いやすい形にするために QueueMessage 型に変換してみましょう。ざっくり以下のような拡張メソッドを作ってみました。

using System;
using System.Collections.Generic;
using Azure.Storage.Queues.Models;
using Microsoft.Azure.Functions.Worker;

public static class BindingContextExtensions
{
    public static QueueMessage ToQueueMessage(this BindingContext context)
    {
        // QueueTrigger かどうかを確認しておく
        var kvs = context.BindingData;
        if (!kvs.TryGetValue("QueueTrigger", out var payload))
            throw new InvalidOperationException("Function is not triggered by QueueTrigger.");

        // 値を取得
        var messageId = getString(kvs, "Id");
        var popReceipt = getString(kvs, "PopReceipt");
        var body = BinaryData.FromString((string)payload!);
        var dequeueCount = getInt64(kvs, "DequeueCount");
        var nextVisibleOn = getNullableDateTimeOffset(kvs, "NextVisibleTime");
        var insertedOn = getNullableDateTimeOffset(kvs, "InsertionTime");
        var expiresOn = getNullableDateTimeOffset(kvs, "ExpirationTime");
        return QueuesModelFactory.QueueMessage(messageId, popReceipt, body, dequeueCount, nextVisibleOn, insertedOn, expiresOn);

        #region ローカル関数
        static string getString(IReadOnlyDictionary<string, object?> kvs, string key)
        {
            var value = kvs[key];
            return value is null
                ? throw new NotSupportedException($"Null value is not supported. | Key : {key}")
                : (string)value;
        }

        static long getInt64(IReadOnlyDictionary<string, object?> kvs, string key)
        {
            var value = getString(kvs, key);
            return long.Parse(value);
        }

        static DateTimeOffset? getNullableDateTimeOffset(IReadOnlyDictionary<string, object?> kvs, string key)
        {
            var value = kvs[key];
            if (value is null)
                return null;

            var span = (value as string).AsSpan();
            span = span.Trim('"');  // 前後に謎の " が含まれるので削除
            return DateTimeOffset.Parse(span);
        }
        #endregion
    }
}

これで以下のように利用できるようになります。QueueMessage のプロパティにアクセスするのがだいぶ楽になるのではないでしょうか。

[FunctionName(nameof(QueueTriggerFunction))]
public void EntryPoint(
    [QueueTrigger("sample-queue")] byte[] payload,
    FunctionContext context)
{
    var queueMessage = context.BindingContext.ToQueueMessage();
    var dequeueCount = queueMessage.DequeueCount;
}

.NET 6 以前の環境下で C# 11 の required キーワードを利用する

C# 11 / .NET 7 で required キーワードが利用できるようになりました。詳細は公式ドキュメント等に譲りますが、簡単に説明するとプロパティやフィールドのオブジェクト初期化子で値を代入することを強制する機能です。

// こういうクラスがあるとして...
public class Person
{
    public required string Name { get; init; }
    public int? Age { get; init; }
}

// Name プロパティを初期化していないのでエラー
var p = new Person();

// これは OK
// Age プロパティの初期化は強制されていない
var p = new Person(){ Name = "xin9le" };

これまでプロパティの初期化漏れを防ぎたい場合はコンストラクタ引数を経由しなければなりませんでした。ですが、コンストラクタを毎度作るなんて手間だなぁと思うのが人情 (?) ってもんです。ということで C# 11 からオブジェクト初期化子でも必須性を表現し、コンパイルエラーとすることが可能になりました。便利過ぎる。

.NET 6 以前でも利用したい要望がある

この required キーワードは通常は .NET 7 でないと利用できません。と言うと次のような声が聞こえてきそうです。僕には聞こえてきました。

えー!.NET 7 じゃないとダメなの?LTS な .NET 6 のままがいいんだけど required だけでも使いたいわー。
ウチの会社 .NET Framework 2.0 やねん...。C# のバージョンだけは最新だけど!

なります。ということでやっていきましょう。

reqruied キーワードの展開のされ方

C# コンパイラが requried キーワードをどのように解釈しているのかを確認していきます。例によって (?) ILSpy を使ってみましょう。すると、以下のように展開されることがわかります。

using System;
using System.Runtime.CompilerServices;

[RequiredMember]
public sealed class Person
{
    [RequiredMember]
    public string Name { get; init; }

    public int? Age { get; init; }

    [Obsolete("Constructors of types with required members are not supported in this version of your compiler.", true)]
    [CompilerFeatureRequired("RequiredMembers")]
    public Person()
    {
    }
}

このうち [RequiredMember] 属性と [CompilerFeatureRequired] 属性が .NET 7 からのみ追加された型で、.NET 6 以前だとこれが不足していることで利用できなくなっています。つまり、これらの属性を独自に定義してしまえばコンパイラを騙す () ことができそうです。

.NET 6 + C# 11 で required を利用する

Step.1 : Visual Studio のバージョンを確認

まず C# 11 が利用可能なコンパイラを搭載している Visual Studio にしましょう。17.4.0 以降であればよいでしょう。

Step.2 : TargetFramework / LangVersion を確認

以下のように .csproj を設定します。今回は TargetFramework = net6.0 としていますが、それよりも古くても大丈夫です。LangVersion11.0 としましょう。

<Project Sdk="Microsoft.NET.Sdk">

    <PropertyGroup>
        <OutputType>Exe</OutputType>
        <TargetFramework>net6.0</TargetFramework>
        <LangVersion>11.0</LangVersion>
        <Nullable>enable</Nullable>
    </PropertyGroup>

</Project>

Step.3 : RequiredMemberAttribute を追加

以下のように [RequiredMember] 属性を実装します。と言っても特段難しいことはなく、.NET Source Browser を眺めてサクッと持ってきます。

#if !NET7_0_OR_GREATER
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace System.Runtime.CompilerServices;

/// <summary>
/// Specifies that a type has required members or that a member is required.
/// </summary>
[AttributeUsage(AttributeTargets.Class | AttributeTargets.Struct | AttributeTargets.Field | AttributeTargets.Property, AllowMultiple = false, Inherited = false)]
internal sealed class RequiredMemberAttribute : Attribute
{ }
#endif

.NET 7 未満の環境でのみ追加したいので #if !NET7_0_OR_GREATER で括っています。また利用したいアセンブリ内にのみ閉じればよい型なのでアクセシビリティを internal にしています。

Step.4 : CompilerFeatureRequiredAttribute を追加

ふたつ目の属性を追加しましょう。こちらも [RequiredMember] 属性と同様の考え方で実装します。.NET Source Browser で検索すれば簡単ですね。

#if !NET7_0_OR_GREATER
// Licensed to the .NET Foundation under one or more agreements.
// The .NET Foundation licenses this file to you under the MIT license.

namespace System.Runtime.CompilerServices;

/// <summary>
/// Indicates that compiler support for a particular feature is required for the location where this attribute is applied.
/// </summary>
[AttributeUsage(AttributeTargets.All, AllowMultiple = true, Inherited = false)]
internal sealed class CompilerFeatureRequiredAttribute : Attribute
{
    public CompilerFeatureRequiredAttribute(string featureName)
    {
        FeatureName = featureName;
    }

    /// <summary>
    /// The name of the compiler feature.
    /// </summary>
    public string FeatureName { get; }

    /// <summary>
    /// If true, the compiler can choose to allow access to the location where this attribute is applied if it does not understand <see cref="FeatureName"/>.
    /// </summary>
    public bool IsOptional { get; init; }

    /// <summary>
    /// The <see cref="FeatureName"/> used for the ref structs C# feature.
    /// </summary>
    public const string RefStructs = nameof(RefStructs);

    /// <summary>
    /// The <see cref="FeatureName"/> used for the required members C# feature.
    /// </summary>
    public const string RequiredMembers = nameof(RequiredMembers);
}
#endif

Build & Run

お疲れ様でした。ここまで準備が整えばビルドが通るようになったはずです。属性をふたつ追加するだけという簡単なお仕事なので、.NET 6 以前のフレームワークをもうしばらく利用する方は是非やってみてください。手元でやってみた範囲では .NET Framework 2.0 でも動作しました

また、特定のフレームワークから追加される型に C# コンパイラが依存することはこれまでもありました。たとえば init キーワードモジュール初期化子も同様なので、過去記事を参考にしてみてくださいませ。

Azure Functions (Isolated Worker) で AppInsights に出力したログが LogLevel.Warning 以上にフィルターされてしまう問題への対処

タイトルがだいぶ長いんですが、執筆時点でタイトル通りの問題が発生します。最近 .NET 7 がリリースされたので、業務コードを順次 C# 11 / .NET 7 へ移行を開始しています。そんな中 Azure Functions を In-Process Model から Isolated Model に乗せ換えるにあたって Application Insights のログ周りでハマったので、今回はその回避方法を紹介します。まずは発生状況を箇条書きで整理しましょう。

  1. Azure Functions (Isolated Worker) を利用したい
    • .NET 7 を利用する場合はこれ一択
  2. Application Insights にログを出力するに際して下記 NuGet パッケージを利用する *1
  3. LogLevel.Warning 未満のログを出力したい
    • LogLevel.Information とか高頻度で使いますよね?

つまり .NET 7 + Azure Functions の環境では発生しそうです。

問題を再現させてみる

Step.1 : NuGet Package を参照

先に示した通り Microsoft.Azure.Functions.Worker.ApplicationInsights のパッケージを参照します。執筆時点では 1.0.0-preview3 が最新です。

<ItemGroup>
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="1.10.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.ApplicationInsights" Version="1.0.0-preview3" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http" Version="3.0.13" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="1.7.0" />
</ItemGroup>

Step.2 : AppInsights SDK を DI に登録

公式の Isolated Process Model の導入ガイドにも書かれていますが、以下のような初期化コードを書きましょう。

new HostBuilder().ConfigureFunctionsWorkerDefaults(static (context, builder) =>
{
    builder.AddApplicationInsights();
    builder.AddApplicationInsightsLogger();
});

Step.3 : 関数を実装

検証目的ということで、今回は HttpTrigger が叩かれた際に全ての LogLevel を出力するコードを書いていきます。

public sealed class SampleFunction
{
    [Function(nameof(SampleFunction))]
    public async Task<HttpResponseData> EntryPoint(
        [HttpTrigger(AuthorizationLevel.Anonymous, "get")] HttpRequestData request,
        FunctionContext context)
    {
        var logger = context.GetLogger<SampleFunction>();
        var levels = new[]
        {
            // わかりやすいように Enum.GetValues<LogLevel>() と同じ配列を自前で作る
            LogLevel.Trace,        // 0
            LogLevel.Debug,        // 1
            LogLevel.Information,  // 2
            LogLevel.Warning,      // 3
            LogLevel.Error,        // 4
            LogLevel.Critical,     // 5
            LogLevel.None,         // 6
        };
        foreach (var x in levels)
        {
            await Task.Delay(10);  // ログの出力順が綺麗になるように少し delay を入れておく
            logger.Log(x, "LogLevel.{LogLevel}", x);
        }

        return request.CreateResponse(HttpStatusCode.OK);
    }
}

Step.4 : LogLevel の設定を環境変数に追加

最後に Azure Functions のインスタンスの環境変数として以下を設定します。Application Insights に出力する LogLevel を最低の Trace に設定しておきます。

{
    "Logging": {
        "ApplicationInsights": {
            "LogLevel": {
                "Default": "Trace"
            }
        }
    }
}

実行結果

ザックリとな再現手順の解説でしたが、ここまで終わりです。実際に Azure Functions に Zip Deploy をして API を叩いてみましょう。すると次のようなログが出力されます。

見事に LogLevel.Warning 以上ですね。困ったぞ、と。

原因

ではどこでこのフィルターが働いているのかというと、AppInsights の Core SDK の奥底にある既定のフィルタールールに依ります。

// The default behavior is to capture only logs above Warning level from all categories.
// This can achieved with this code level filter -> loggingBuilder.AddFilter<Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider>("",LogLevel.Warning);
// However, this will make it impossible to override this behavior from Configuration like below using appsettings.json:
// {
//     "Logging": {
//         "ApplicationInsights": {
//             "LogLevel": {
//                 "": "Error"
//             }
//         }
//     },
//     ...
// }
// The reason is as both rules will match the filter, the last one added wins.
// To ensure that the default filter is in the beginning of filter rules, so that user override from Configuration will always win,
// we add code filter rule to the 0th position as below.

loggingBuilder.Services.Configure<LoggerFilterOptions>(static options =>
{
    options.Rules.Insert(0, new LoggerFilterRule
    (
        "Microsoft.Extensions.Logging.ApplicationInsights.ApplicationInsightsLoggerProvider",
        null,
        LogLevel.Warning,
        null
    ));
});

これまでの In-Process Model な Azure Functions では環境変数等に書いた LogLevel がこのフィルタールールを override する形で設定されていました。これは ASP.NET Core + App Service でも同じ挙動ですし、AppInsights のコード上にもコメントで「ユーザー設定で override できるように 0 番目に挿入する」というメモまで残っています。ですが、Isolated Model 向けの AppInsights SDK を利用するとその意に反して override されない状態になっています。Isolated Model 向け SDK の初期化順序が良くないということなんだと思います。

回避方法

実運用において LogLevel.Information が出ないのは全く使い物にならないということで workaround していきましょう。初期化処理に以下のようなコードを付け加えます。

new HostBuilder().ConfigureFunctionsWorkerDefaults(static (context, builder) =>
{
    builder.AddApplicationInsights();
    builder.AddApplicationInsightsLogger();

    // 方法 1 : 構成情報で上書きする
    builder.Services.AddLogging(logging =>
    {
        var config = context.Configuration.GetSection("Logging");
        logging.AddConfiguration(config);
    });

    // 方法 2 : AppInsights 向け LoggerProvider の特別ルールを削除して既定に戻す
    builder.Services.Configure<LoggerFilterOptions>(static options =>
    {
        var rule = options.Rules.FirstOrDefault(static x => x.ProviderName == typeof(ApplicationInsightsLoggerProvider).FullName);
        if (rule is not null)
            options.Rules.Remove(rule);
    });
});

ふたつ方法を載せましたが、必要な方を選べばよいと思います。僕は念のため (?) どちらも書いておきました。この実装を加えたものをデプロイしてログの出力状況を確認してみましょう。結果は以下の通りです。これで使い物になりそうですね!

*1:Microsoft.Azure.Functions.Worker.ApplicationInsights の NuGet Package を利用しない場合は再現しない

C#/.NET で和暦の元年表記をするときの注意点

ちょっとした Tips というか「あれ?」となったので備忘録としてメモします。

ひと言まとめ

元年表記したければ y をくっ付けろ

日頃から数字と文字の間に半角スペースを入れる癖があって、何気なくいつも通り半角スペースを入れたら 令和 1 年 になって気付いたという。

サンプルコード

var culture = new CultureInfo("ja-jp");
culture.DateTimeFormat.Calendar = new JapaneseCalendar();
    
var date = new DateOnly(2019, 5, 1);
date.ToString("ggy年", culture);    // 令和元年
date.ToString("gg y年", culture);   // 令和 元年
date.ToString("ggy 年", culture);   // 令和1 年
date.ToString("gg y 年", culture);  // 令和 1 年

SqlBulkCopy + IDataReader を利用した IEnumerable<T> の高効率なバルク挿入

C# / .NET + SQL Server 環境において Bulk Insert をするのはなかなかお手間です。それもこれも SqlBulkCopy という専用クラスがなかなか曲者なためなのですが。ただただ IEnumerable<T> のようなコレクションを挿入するのにひと工夫というか、ひと手間必要なのが心理的にハードルが高い。初見殺し!

僕自身、過去にやっていたのは DataTable を利用した方法でした。以下の記事に良い例が載っているので詳細はそちらに譲りますが、DataTable のインスタンスを作るにあたって一度データのコピーが必要になる上に、要素数が多いと内部の動的配列の拡張が何度も発生して遅くなってしまいます。折角パフォーマンスよくデータ挿入ができる SqlBulkCopy なのに、その事前準備で遅くなってどうするんだ!ということでなんとかしたい。

IDataReader を独自実装

SqlBulkCopy はそのデータソースとして IDataReader をとることができます。これを実装しさえすれば任意のデータソースと繋ぎ込めるというわけですね。ということで、今回は IEnumerable<T> なコレクションを IDataReader 経由で投げ込むことを考えてみます。

IDataReader に求められる実装

IDataReader を実装しようとすると、とんでもなくたくさんのプロパティとメソッドの実装を要求されます。ブログに書くには長過ぎるので折り畳んでおきますが、正直これを見たら卒倒するか吐くかしますね、普通。

IDataReader が要求する実装

public class DummyDataReader : IDataReader
{
    #region IDataReader implementations
    /// <inheritdoc/>
    public object this[int i]
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public object this[string name]
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int Depth
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool IsClosed
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int RecordsAffected
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int FieldCount
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public void Close()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public void Dispose()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool GetBoolean(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public byte GetByte(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public long GetBytes(int i, long fieldOffset, byte[]? buffer, int bufferoffset, int length)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public char GetChar(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public long GetChars(int i, long fieldoffset, char[]? buffer, int bufferoffset, int length)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public IDataReader GetData(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public string GetDataTypeName(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public DateTime GetDateTime(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public decimal GetDecimal(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public double GetDouble(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    [return: DynamicallyAccessedMembers(DynamicallyAccessedMemberTypes.PublicFields | DynamicallyAccessedMemberTypes.PublicProperties)]
    public Type GetFieldType(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public float GetFloat(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public Guid GetGuid(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public short GetInt16(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int GetInt32(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public long GetInt64(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public string GetName(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int GetOrdinal(string name)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public DataTable? GetSchemaTable()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public string GetString(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public object GetValue(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public int GetValues(object[] values)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool IsDBNull(int i)
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool NextResult()
        => throw new NotImplementedException();


    /// <inheritdoc/>
    public bool Read()
        => throw new NotImplementedException();
    #endregion
}

しかし、実はこれ全部を実装する必要はありません。SqlBulkCopy の内部実装を追いかけたりしたところ、実際に必要となるのは以下の 3 つのメソッド / プロパティだけです。

  • int FieldCount
  • object GetValue(int i)
  • bool Read()

他は全て NotImplementedException のままでも大丈夫です。なんだよー、これなら全然行けるじゃん...!

public class SqlBulkCopyDataReader : IDataReader
{
    #region IDataReader implementations
    /// <inheritdoc/>
    public int FieldCount
        => 0;


    /// <inheritdoc/>
    public object GetValue(int i)
        => null;


    /// <inheritdoc/>
    public bool Read()
        => true;
    #endregion
}

ざっくりと実装してみる

ということで実装していきましょう。いろいろ端折ってますが、概ね以下のような感じになります。

public class SqlBulkCopyDataReader<T> : IDataReader
{
    private IEnumerator<T> DataEnumerator { get; }

    public SqlBulkCopyDataReader(IEnumerator<T> enumerator)
        => this.DataEnumerator = enumerator;

    public SqlBulkCopyDataReader(IEnumerable<T> data)
        : this(data.GetEnumerator())
    { }

    #region IDataReader implementations
    public int FieldCount
        => PropertyInfoCache<T>.Instances.Length;

    public void Dispose()
        => this.DataEnumerator.Dispose();

    public object GetValue(int i)
    {
        // 対象テーブルの列とプロパティの個数 / 並び順が一致している前提
        var prop = PropertyInfoCache<T>.Instances[i];
        var obj = this.DataEnumerator.Current;
        return prop.GetValue(obj)!;
    }

    public bool Read()
        => this.DataEnumerator.MoveNext();
    #endregion

    private static class PropertyInfoCache<U>
    {
        public static PropertyInfo[] Instances { get; }

        static PropertyInfoCache()
            => Instances = typeof(U).GetProperties(BindingFlags.Instance | BindingFlags.Public);
    }
}

IEnumerator<T>.Dispose() を呼び出すために Dispose() メソッドも追加になっていますが、それでもメチャ短い!

SqlBulkCopy に食わせてみる

下準備が整ったので実際に SqlBulkCopy と組み合わせて使ってみましょう。折角なので SqlConnection の拡張メソッドでも作ってみます。実際にはまだもうちょっと考慮すべき点がありそうですが、サンプルとしてはだいぶいい感じになっていると思います。

public static class SqlConnectionExtensions
{
    public static async ValueTask<int> BulkInsertAsync<T>(this SqlConnection connection, IEnumerable<T> data, SqlBulkCopyOptions options = default, int? timeout = null, CancellationToken cancellationToken = default)
    {
        using (var executor = new SqlBulkCopy(connection, options, null))
        {
            // テーブル名と型名が一致しているとする
            executor.DestinationTableName = typeof(T).Name;

            // タイムアウトを指定できるようにしておくと優しそう
            executor.BulkCopyTimeout = timeout ?? executor.BulkCopyTimeout;

            // データを流し込む
            using (var reader = new SqlBulkCopyDataReader<T>(data))
                await executor.WriteToServerAsync(reader, cancellationToken);

            // 影響した行数 (= 流し込んだ件数) を返すのが一般的
            return executor.RowsCopied;
        }
    }
}

まとめ

ずっと IDataReader の実装が嫌過ぎて、単純な食わず嫌いで DataTable に甘んじてましたが、気が向いて SqlBulkCopy の内部実装を追いかけてみたら実は全然大した実装が必要ないことを知ってぴえん。10 年くらい無駄なことをしてたと思うとだいぶ勿体ないことをしてました。反省。