はじめに:「AsParallel制御」は“並列の暴れ方に手綱をつける”技
前回の「並列 LINQ」は、「AsParallel() を付けると並列になるよ」という話でした。
でも、実務で本気で使うなら、
どれくらい並列に動いていいのか
順序は守るのか
途中で止められるようにするのか
CPU がヒマでも無理に並列化するのか
といった“細かい制御”が欲しくなります。
そこで出てくるのが、AsParallel のあとに続けて使う各種制御メソッドです。
ここでは、初心者向けに
WithDegreeOfParallelism(並列度の上限)WithExecutionMode(並列化を強制するかどうか)WithMergeOptions(結果のマージ方法)AsOrdered / AsUnordered(順序制御)
キャンセル制御(WithCancellation)
を、例題付きでかみ砕いて説明していきます。
並列度の制御:WithDegreeOfParallelism
「何スレッドまで使っていいか」を決める
AsParallel() を付けると、PLINQ は「環境に応じていい感じの並列度」を自動で決めてくれます。
でも、実務では「この処理は最大 4 スレッドまでにしてほしい」といった制約を付けたいことがあります。
そのときに使うのが WithDegreeOfParallelism です。
using System;
using System.Linq;
var numbers = Enumerable.Range(1, 100_000);
var result = numbers
.AsParallel()
.WithDegreeOfParallelism(4) // 最大 4 並列まで
.Select(x => HeavyCalc(x))
.ToArray();
int HeavyCalc(int x)
{
System.Threading.Thread.SpinWait(1_000);
return x * x;
}
C#ここでの重要ポイントは、「WithDegreeOfParallelism(n) は“最大で n 個のタスク(スレッド)まで使っていい”という上限を決める」ということです。
CPU コア数より大きくしても意味は薄いですし、DB や外部 API に負荷をかけすぎないために、あえて小さめにすることもあります。
なぜ制御したくなるのか(実務目線)
例えば、
同じプロセス内で他の重い処理も動いている
DB 接続数に上限がある
外部 API に「同時接続は 5 まで」などの制限がある
といった状況では、「CPU が空いているからといって、好き放題並列化されると困る」わけです。WithDegreeOfParallelism は、その“暴れ方”に手綱をつけるためのメソッドだと考えてください。
実行モードの制御:WithExecutionMode
「本当に並列化する価値があるか」をどう判断するか
PLINQ は賢くて、「このクエリは並列化してもあまり得しなさそうだな」と判断すると、内部的に順次実行に切り替えることがあります。
これは基本的にはありがたい挙動ですが、「いや、この処理は絶対に並列化してほしい」という場面もあります。
そこで使うのが WithExecutionMode です。
using System.Linq;
using System.Threading.Tasks;
var numbers = Enumerable.Range(1, 1_000_000);
var result = numbers
.AsParallel()
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.Select(x => HeavyCalc(x))
.ToArray();
C#ParallelExecutionMode.ForceParallelism を指定すると、「PLINQ の“自動で順次に戻す”判断を無視して、とにかく並列でやる」モードになります。
ここでの重要ポイントは、「WithExecutionMode は“PLINQ のおせっかいをどこまで許すか”を決めるスイッチ」だということです。
通常はデフォルトのままでよく、明確な理由があるときだけ ForceParallelism を使う、くらいのスタンスが安全です。
マージ方法の制御:WithMergeOptions
「結果をどうまとめるか」で速度とメモリが変わる
PLINQ は、各スレッドで処理した結果を「マージ(結合)」して、最終的なシーケンスとして返します。
このマージの仕方を制御するのが WithMergeOptions です。
代表的なオプションは次の 3 つです。
NotBufferedAutoBuffered(デフォルト)FullyBuffered
例として書いてみます。
using System;
using System.Linq;
using System.Threading;
var numbers = Enumerable.Range(1, 20);
var query = numbers
.AsParallel()
.WithMergeOptions(ParallelMergeOptions.NotBuffered)
.Select(x =>
{
Thread.Sleep(100); // 重い処理のつもり
return x;
});
foreach (var x in query)
{
Console.WriteLine(x);
}
C#NotBuffered は、「処理が終わったものから順次流していく」イメージです。FullyBuffered は、「全部終わってからまとめて返す」イメージです。
ここでの重要ポイントは、「マージ方法は“結果をどれくらい早く消費したいか/メモリをどう使いたいか”に関わる」ということです。
ただし、初心者のうちはデフォルト(AutoBuffered)のままで十分なことが多く、細かくいじるのは慣れてからで構いません。
順序の制御:AsOrdered / AsUnordered
並列 LINQ は基本「順序を保証しない」
AsParallel() を付けると、PLINQ は「順序を気にせずに速く終わること」を優先します。
そのため、結果の順番が元のシーケンスと違うことがあります。
var numbers = Enumerable.Range(1, 10);
var result = numbers
.AsParallel()
.Select(x => HeavyCalc(x))
.ToArray();
C#この result の順番は、元の 1~10 の順とは限りません。
順序が必要なら AsOrdered
順序を保ちたいときは、AsOrdered() を使います。
var result = numbers
.AsParallel()
.AsOrdered()
.Select(x => HeavyCalc(x))
.ToArray();
C#これで、「元の順序に従った結果」が得られます。
ただし、そのぶん内部処理が増えるので、速度は少し落ちます。
途中で「順序不要」に戻す AsUnordered
逆に、「最初は順序が必要だけど、この先は順序どうでもいい」という場面では、AsUnordered() で順序の制約を外せます。
var result = numbers
.AsParallel()
.AsOrdered()
.Select(x => HeavyCalc(x))
.AsUnordered() // ここから先は順序不要
.Where(x => x > 100)
.ToArray();
C#ここでの重要ポイントは、「順序を守るかどうかは“性能と読みやすさのトレードオフ”」だということです。
順序が本当に必要なところだけ AsOrdered を付け、それ以外は付けない、というメリハリが大事です。
キャンセル制御:WithCancellation と CancellationToken
「時間がかかりすぎたら途中でやめたい」を実現する
長時間かかる並列処理では、「ユーザーがキャンセルボタンを押したら止めたい」「タイムアウトしたら止めたい」という要件がよくあります。
PLINQ では、WithCancellation を使ってキャンセル制御を組み込めます。
using System;
using System.Linq;
using System.Threading;
var cts = new CancellationTokenSource();
var query = Enumerable.Range(1, 1_000_000)
.AsParallel()
.WithCancellation(cts.Token)
.Select(x =>
{
// 何か重い処理
Thread.SpinWait(1_000);
return x;
});
var task = Task.Run(() =>
{
try
{
foreach (var x in query)
{
// 途中で結果を使う
}
}
catch (OperationCanceledException)
{
Console.WriteLine("キャンセルされました。");
}
});
// 1 秒後にキャンセル
Thread.Sleep(1000);
cts.Cancel();
task.Wait();
C#ここでの重要ポイントは、「WithCancellation(token) を付けると、そのトークンがキャンセルされたときに OperationCanceledException が飛ぶ」ということです。
呼び出し側はそれをキャッチして、「キャンセルされた」という流れを作れます。
実務での「AsParallel制御」の組み合わせ例
例:外部 API を並列で叩くが、同時 4 接続までに抑える
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
IEnumerable<string> GetUrls()
{
return Enumerable.Range(1, 100)
.Select(i => $"https://example.com/api/{i}");
}
string CallApi(string url)
{
// 実際は HTTP クライアントで呼び出す
Thread.Sleep(200); // 疑似的な待ち時間
return $"Result for {url}";
}
var urls = GetUrls();
var results = urls
.AsParallel()
.WithDegreeOfParallelism(4) // 同時 4 接続まで
.Select(url => CallApi(url))
.ToList();
C#ここでは、「並列で速くしたいけど、外部サービスに負荷をかけすぎないように 4 並列までに抑える」という意図を、WithDegreeOfParallelism(4) で表現しています。
例:重い計算を並列化しつつ、順序を保ち、キャンセルもできるようにする
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
double HeavyCalc(int x)
{
double v = 0;
for (int i = 0; i < 1000; i++)
{
v += Math.Sqrt(x * i + 1);
}
return v;
}
var cts = new CancellationTokenSource();
var query = Enumerable.Range(1, 100_000)
.AsParallel()
.WithDegreeOfParallelism(Environment.ProcessorCount)
.WithExecutionMode(ParallelExecutionMode.ForceParallelism)
.WithCancellation(cts.Token)
.AsOrdered()
.Select(x => HeavyCalc(x));
var task = Task.Run(() =>
{
try
{
foreach (var r in query)
{
// 結果を順番に処理
}
}
catch (OperationCanceledException)
{
Console.WriteLine("キャンセルされました。");
}
});
// どこかで cts.Cancel() を呼べば止まる
C#ここでの重要ポイントは、「AsParallel のあとに、必要な制御を“チェーン”として積み重ねていく」という書き方です。WithDegreeOfParallelism、WithExecutionMode、WithCancellation、AsOrdered などを組み合わせることで、「どう並列化してほしいか」をかなり細かく指定できます。
まとめ:「AsParallel制御ユーティリティ」は“並列の暴れ方を設計するための道具」
AsParallel 自体は「並列にしていいよ」というスイッチですが、
実務で本当に使いこなすには、その“暴れ方”を制御するメソッド群が重要になります。
押さえておきたいポイントをもう一度整理すると、
WithDegreeOfParallelism(n) で「最大並列数」を決めるWithExecutionMode(ForceParallelism) で「とにかく並列化してほしい」と指示できるWithMergeOptions で「結果のマージ方法」を調整できる(ただし最初はデフォルトで十分)AsOrdered / AsUnordered で「順序を守るかどうか」を明示できるWithCancellation(token) で「途中キャンセル可能な並列処理」にできる
ここまで腹落ちしていれば、
「とりあえず AsParallel を付けてみる」段階から一歩進んで、
“どれくらい並列にさせるか/どこまで自由にさせるか”を、自分で設計できるようになります。
