並列ストリームと ForkJoinPool のチューニング — スループット向上
並列ストリームは「書き換え少なくCPUを使い切る」ための強力な選択肢。ただし闇雲な並列化は遅くなったり結果が壊れます。安全に速くするコツは「副作用ゼロ」「正しい Collector」「適切なタスク粒度」「プールのサイズ管理」。実務で効く具体策をコード中心にまとめます。
並列ストリームの基本整理
- 共通プール: デフォルトでは ForkJoinPool の「共通プール」を使う(並列度は一般に CPU コア数程度)。明示的に変えたい場合は自前の ForkJoinPool を使って並列ストリームを実行する。
- 向いている処理: CPUバウンドで重い純粋関数(例:数値計算・変換・集約)。I/O 待ち主体なら CompletableFuture や非同期 I/O を検討。
- 副作用禁止: 外部リストへの add、外部カウンタの更新などは競合・欠落の原因。reduce/collect で安全に集約する。
速度を引き出す基本パターン
1) 純粋関数+組み込み集約(sum/reduce/collect)
int sumSquares = IntStream.rangeClosed(1, 5_000_000)
.parallel()
.map(n -> n * n) // 副作用なし
.sum(); // 組み込み集約(安全・高速)
Java- ポイント: 組み込みの sum/average/max/min は並列集約最適化済み。
2) コレクション収集は Collector を使う
List<String> evens = IntStream.rangeClosed(1, 1_000_000)
.parallel()
.filter(n -> n % 2 == 0)
.mapToObj(String::valueOf)
.collect(Collectors.toList()); // 外部リストへの forEach(add) はNG
Java- ポイント: Collector 内部で安全に部分結果を結合する。
3) 順序が必要なら forEachOrdered
IntStream.rangeClosed(1, 100)
.parallel()
.map(n -> n * 2)
.forEachOrdered(System.out::println); // 出力順を保証
Java- ポイント: 並列でも順序化できるが、オーバーヘッドは増える。必要な場面のみ。
ForkJoinPool を明示指定してチューニング
4) 自前プールで並列度を調整
ForkJoinPool pool = new ForkJoinPool(8); // 並列度(論理コア数・計測で調整)
int result = pool.submit(() ->
IntStream.rangeClosed(1, 10_000_000)
.parallel()
.map(n -> n % 3 == 0 ? n : 0)
.sum()
).join();
pool.shutdown();
Java- ポイント: 共通プールと競合させたくない処理を隔離。ロードに応じてプールを使い分ける。
5) 共有マップは concurrent Collector を使う
Map<String, Long> counts = Stream.of("a","b","a","c","b","a")
.parallel()
.collect(Collectors.groupingByConcurrent(s -> s, Collectors.counting()));
Java- ポイント: groupingByConcurrent/toConcurrentMap は並列想定の Collector。競合を避けられる。
タスク粒度の最適化(オーバーヘッド削減)
6) 事前にチャンクへ分割してから並列処理
// 大きなリストを固定サイズチャンクへ分割
static <T> List<List<T>> chunk(List<T> list, int size) {
List<List<T>> out = new ArrayList<>();
for (int i = 0; i < list.size(); i += size) {
out.add(list.subList(i, Math.min(i + size, list.size())));
}
return out;
}
List<String> data = /* 大量データ */;
int chunkSize = 10_000;
int total = chunk(data, chunkSize).parallelStream()
.mapToInt(batch ->
batch.stream() // チャンク内は直列
.mapToInt(s -> s.length())
.sum()
)
.sum();
Java- ポイント: 細かすぎるタスクは分割オーバーヘッドで逆効果。適切な粒度へまとめる。
7) Spliterator の characteristics を活用
- サイズ既知・分割容易なソース(配列・ArrayList・range)は並列分割が得意。リンクリストや中間で flatMap 連発は分割効率が悪くなるため計測必須。
実例シナリオでの最適化
8) テキスト解析(CPUバウンド)での並列カウント
List<Path> files = /* 多数のテキスト */;
ForkJoinPool pool = new ForkJoinPool(Runtime.getRuntime().availableProcessors());
Map<String, Long> df = pool.submit(() ->
files.parallelStream()
.flatMap(p -> {
try {
return Files.lines(p).flatMap(line -> Arrays.stream(line.split("\\s+")));
} catch (IOException e) {
return Stream.empty(); // 失敗はスキップ(要ログ)
}
})
.filter(w -> w.length() >= 3)
.map(String::toLowerCase)
.collect(Collectors.groupingByConcurrent(s -> s, Collectors.counting()))
).join();
pool.shutdown();
Java- ポイント: 文字列処理はCPUバウンド。並列+concurrent Collector が効く。I/O は lines 内で逐次読み込み(無闇な並列 I/O は逆効果になりやすい)。
9) 数値統計(プリミティブストリームでオートボクシング回避)
DoubleSummaryStatistics stats = DoubleStream.iterate(1.0, x -> x <= 10_000_000, x -> x + 1.0)
.parallel()
.map(Math::sqrt)
.summaryStatistics(); // count/sum/min/max/avg を一発
Java- ポイント: プリミティブ専用の並列集約は高速。ボクシングを避ける。
テンプレート集
- 並列オン+集約
var r = list.parallelStream().map(this::transform).reduce(identity, this::combine);
Java- 順序保証出力
stream.parallel().forEachOrdered(System.out::println);
Java- 並列度指定で実行
var pool = new ForkJoinPool(n);
var r = pool.submit(() -> stream.parallel().map(...).collect(...)).join();
pool.shutdown();
Java- Concurrent 集約
var m = stream.parallel().collect(Collectors.groupingByConcurrent(keyFn, Collectors.counting()));
Java- チャンク分割→並列
chunk(bigList, size).parallelStream().map(this::processChunk).reduce(0, Integer::sum);
Javaよくある落とし穴と回避策
- 外部状態の更新(副作用): 外部リストへの add、カウンタ加算はNG。collect/reduce を使う。必要なら concurrent Collector。
- 過度な並列化: 軽い処理や小規模データは直列の方が速い。必ず計測して判断。
- 順序乱れ: ログや出力順が必要なら forEachOrdered。ソートや distinct は最後に寄せる。
- I/O 待ちでの並列: スレッドを塞ぐとプールが枯渇。非同期 I/O や別プール(Executor)で分離。並列ストリームはCPUバウンド優先。
- 分割しづらいパイプライン: heavy な flatMap 連発は分割効率低下。事前の整形・チャンク化で改善。
- 例外処理の欠落: ストリーム内 I/O は unchecked 化や Optional/Result で安全に流し、最後にサマリ出力。
まとめ
- 並列ストリームを速く安全に使う鍵は「副作用ゼロ」「並列想定 Collector」「適切な粒度」「プール管理」。
- ForkJoinPool を明示指定して並列度をコントロールしつつ、CPUバウンドな純粋処理に絞ればスループットは伸びる。
- 小さく試して計測し、処理特性(CPU/I/O、粒度、分割容易性)に合わせて戦略を選ぶのが最短ルート。
👉 練習課題: 10万件の文字列リストをチャンクサイズ1,000・2,000・5,000で分割し、それぞれ parallelStream+groupingByConcurrent で長さ別件数を集計。ForkJoinPool の並列度を2,4,8に変えて実行時間を比較し、最適な粒度と並列度を見つけてください。
