C# Tips | コレクション・LINQ:AsParallel制御

C# C#
スポンサーリンク

はじめに:「AsParallel制御」は“並列の暴れ方に手綱をつける”技

前回の「並列 LINQ」は、「AsParallel() を付けると並列になるよ」という話でした。
でも、実務で本気で使うなら、

どれくらい並列に動いていいのか
順序は守るのか
途中で止められるようにするのか
CPU がヒマでも無理に並列化するのか

といった“細かい制御”が欲しくなります。

そこで出てくるのが、AsParallel のあとに続けて使う各種制御メソッドです。
ここでは、初心者向けに

WithDegreeOfParallelism(並列度の上限)
WithExecutionMode(並列化を強制するかどうか)
WithMergeOptions(結果のマージ方法)
AsOrdered / AsUnordered(順序制御)
キャンセル制御(WithCancellation

を、例題付きでかみ砕いて説明していきます。


並列度の制御:WithDegreeOfParallelism

「何スレッドまで使っていいか」を決める

AsParallel() を付けると、PLINQ は「環境に応じていい感じの並列度」を自動で決めてくれます。
でも、実務では「この処理は最大 4 スレッドまでにしてほしい」といった制約を付けたいことがあります。

そのときに使うのが WithDegreeOfParallelism です。

using System;
using System.Linq;

var numbers = Enumerable.Range(1, 100_000);

var result = numbers
    .AsParallel()
    .WithDegreeOfParallelism(4)   // 最大 4 並列まで
    .Select(x => HeavyCalc(x))
    .ToArray();

int HeavyCalc(int x)
{
    System.Threading.Thread.SpinWait(1_000);
    return x * x;
}
C#

ここでの重要ポイントは、「WithDegreeOfParallelism(n) は“最大で n 個のタスク(スレッド)まで使っていい”という上限を決める」ということです。
CPU コア数より大きくしても意味は薄いですし、DB や外部 API に負荷をかけすぎないために、あえて小さめにすることもあります。

なぜ制御したくなるのか(実務目線)

例えば、

同じプロセス内で他の重い処理も動いている
DB 接続数に上限がある
外部 API に「同時接続は 5 まで」などの制限がある

といった状況では、「CPU が空いているからといって、好き放題並列化されると困る」わけです。
WithDegreeOfParallelism は、その“暴れ方”に手綱をつけるためのメソッドだと考えてください。


実行モードの制御:WithExecutionMode

「本当に並列化する価値があるか」をどう判断するか

PLINQ は賢くて、「このクエリは並列化してもあまり得しなさそうだな」と判断すると、内部的に順次実行に切り替えることがあります。
これは基本的にはありがたい挙動ですが、「いや、この処理は絶対に並列化してほしい」という場面もあります。

そこで使うのが WithExecutionMode です。

using System.Linq;
using System.Threading.Tasks;

var numbers = Enumerable.Range(1, 1_000_000);

var result = numbers
    .AsParallel()
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .Select(x => HeavyCalc(x))
    .ToArray();
C#

ParallelExecutionMode.ForceParallelism を指定すると、「PLINQ の“自動で順次に戻す”判断を無視して、とにかく並列でやる」モードになります。

ここでの重要ポイントは、「WithExecutionMode は“PLINQ のおせっかいをどこまで許すか”を決めるスイッチ」だということです。
通常はデフォルトのままでよく、明確な理由があるときだけ ForceParallelism を使う、くらいのスタンスが安全です。


マージ方法の制御:WithMergeOptions

「結果をどうまとめるか」で速度とメモリが変わる

PLINQ は、各スレッドで処理した結果を「マージ(結合)」して、最終的なシーケンスとして返します。
このマージの仕方を制御するのが WithMergeOptions です。

代表的なオプションは次の 3 つです。

NotBuffered
AutoBuffered(デフォルト)
FullyBuffered

例として書いてみます。

using System;
using System.Linq;
using System.Threading;

var numbers = Enumerable.Range(1, 20);

var query = numbers
    .AsParallel()
    .WithMergeOptions(ParallelMergeOptions.NotBuffered)
    .Select(x =>
    {
        Thread.Sleep(100); // 重い処理のつもり
        return x;
    });

foreach (var x in query)
{
    Console.WriteLine(x);
}
C#

NotBuffered は、「処理が終わったものから順次流していく」イメージです。
FullyBuffered は、「全部終わってからまとめて返す」イメージです。

ここでの重要ポイントは、「マージ方法は“結果をどれくらい早く消費したいか/メモリをどう使いたいか”に関わる」ということです。
ただし、初心者のうちはデフォルト(AutoBuffered)のままで十分なことが多く、細かくいじるのは慣れてからで構いません。


順序の制御:AsOrdered / AsUnordered

並列 LINQ は基本「順序を保証しない」

AsParallel() を付けると、PLINQ は「順序を気にせずに速く終わること」を優先します。
そのため、結果の順番が元のシーケンスと違うことがあります。

var numbers = Enumerable.Range(1, 10);

var result = numbers
    .AsParallel()
    .Select(x => HeavyCalc(x))
    .ToArray();
C#

この result の順番は、元の 1~10 の順とは限りません。

順序が必要なら AsOrdered

順序を保ちたいときは、AsOrdered() を使います。

var result = numbers
    .AsParallel()
    .AsOrdered()
    .Select(x => HeavyCalc(x))
    .ToArray();
C#

これで、「元の順序に従った結果」が得られます。
ただし、そのぶん内部処理が増えるので、速度は少し落ちます。

途中で「順序不要」に戻す AsUnordered

逆に、「最初は順序が必要だけど、この先は順序どうでもいい」という場面では、AsUnordered() で順序の制約を外せます。

var result = numbers
    .AsParallel()
    .AsOrdered()
    .Select(x => HeavyCalc(x))
    .AsUnordered()          // ここから先は順序不要
    .Where(x => x > 100)
    .ToArray();
C#

ここでの重要ポイントは、「順序を守るかどうかは“性能と読みやすさのトレードオフ”」だということです。
順序が本当に必要なところだけ AsOrdered を付け、それ以外は付けない、というメリハリが大事です。


キャンセル制御:WithCancellation と CancellationToken

「時間がかかりすぎたら途中でやめたい」を実現する

長時間かかる並列処理では、「ユーザーがキャンセルボタンを押したら止めたい」「タイムアウトしたら止めたい」という要件がよくあります。
PLINQ では、WithCancellation を使ってキャンセル制御を組み込めます。

using System;
using System.Linq;
using System.Threading;

var cts = new CancellationTokenSource();

var query = Enumerable.Range(1, 1_000_000)
    .AsParallel()
    .WithCancellation(cts.Token)
    .Select(x =>
    {
        // 何か重い処理
        Thread.SpinWait(1_000);
        return x;
    });

var task = Task.Run(() =>
{
    try
    {
        foreach (var x in query)
        {
            // 途中で結果を使う
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("キャンセルされました。");
    }
});

// 1 秒後にキャンセル
Thread.Sleep(1000);
cts.Cancel();

task.Wait();
C#

ここでの重要ポイントは、「WithCancellation(token) を付けると、そのトークンがキャンセルされたときに OperationCanceledException が飛ぶ」ということです。
呼び出し側はそれをキャッチして、「キャンセルされた」という流れを作れます。


実務での「AsParallel制御」の組み合わせ例

例:外部 API を並列で叩くが、同時 4 接続までに抑える

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;

IEnumerable<string> GetUrls()
{
    return Enumerable.Range(1, 100)
        .Select(i => $"https://example.com/api/{i}");
}

string CallApi(string url)
{
    // 実際は HTTP クライアントで呼び出す
    Thread.Sleep(200); // 疑似的な待ち時間
    return $"Result for {url}";
}

var urls = GetUrls();

var results = urls
    .AsParallel()
    .WithDegreeOfParallelism(4)  // 同時 4 接続まで
    .Select(url => CallApi(url))
    .ToList();
C#

ここでは、「並列で速くしたいけど、外部サービスに負荷をかけすぎないように 4 並列までに抑える」という意図を、WithDegreeOfParallelism(4) で表現しています。

例:重い計算を並列化しつつ、順序を保ち、キャンセルもできるようにする

using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

double HeavyCalc(int x)
{
    double v = 0;
    for (int i = 0; i < 1000; i++)
    {
        v += Math.Sqrt(x * i + 1);
    }
    return v;
}

var cts = new CancellationTokenSource();

var query = Enumerable.Range(1, 100_000)
    .AsParallel()
    .WithDegreeOfParallelism(Environment.ProcessorCount)
    .WithExecutionMode(ParallelExecutionMode.ForceParallelism)
    .WithCancellation(cts.Token)
    .AsOrdered()
    .Select(x => HeavyCalc(x));

var task = Task.Run(() =>
{
    try
    {
        foreach (var r in query)
        {
            // 結果を順番に処理
        }
    }
    catch (OperationCanceledException)
    {
        Console.WriteLine("キャンセルされました。");
    }
});

// どこかで cts.Cancel() を呼べば止まる
C#

ここでの重要ポイントは、「AsParallel のあとに、必要な制御を“チェーン”として積み重ねていく」という書き方です。
WithDegreeOfParallelismWithExecutionModeWithCancellationAsOrdered などを組み合わせることで、「どう並列化してほしいか」をかなり細かく指定できます。


まとめ:「AsParallel制御ユーティリティ」は“並列の暴れ方を設計するための道具」

AsParallel 自体は「並列にしていいよ」というスイッチですが、
実務で本当に使いこなすには、その“暴れ方”を制御するメソッド群が重要になります。

押さえておきたいポイントをもう一度整理すると、

WithDegreeOfParallelism(n) で「最大並列数」を決める
WithExecutionMode(ForceParallelism) で「とにかく並列化してほしい」と指示できる
WithMergeOptions で「結果のマージ方法」を調整できる(ただし最初はデフォルトで十分)
AsOrdered / AsUnordered で「順序を守るかどうか」を明示できる
WithCancellation(token) で「途中キャンセル可能な並列処理」にできる

ここまで腹落ちしていれば、
「とりあえず AsParallel を付けてみる」段階から一歩進んで、
“どれくらい並列にさせるか/どこまで自由にさせるか”を、自分で設計できるようになります。

タイトルとURLをコピーしました