Stream の並列化(parallelStream)と注意点 — スレッド安全性
並列 Stream は「ForkJoinPoolで要素を分割・並列処理・結合」する仕組みです。速くなることもありますが、使い方を誤ると逆効果や不正確な結果につながります。副作用を避け、スレッド安全な収集を選び、データ量や処理内容に合わせて使うのが基本です。
基本と特性
- 仕組み: コレクション等から parallelStream を生成すると、内部の ForkJoinPool が複数スレッドで部分処理を実行し、最終的に結果を結合します。
- 出力順: forEach は順序が不定。順序を保ちたい場合は forEachOrdered を選ぶ必要があります。
- 適用場面: データが十分に大きい、各要素処理が重い、分割・結合コストを上回る計算メリットがある場合に向いています。
- 誤用のリスク: 共有可変状態への書き込み、非スレッドセーフな Collector、軽すぎる処理や小規模データでの並列化は、性能低下や結果不正の原因になります。
正しい使い方の原則
- 副作用なし(共有可変の禁止): ループ中に共有リストへ add、共有カウンタを++などは競合の温床。map/filter/reduceの関数は純粋(入力→出力のみ)に保つ。
- 安全な終端操作: 集約は Collectors の提供するスレッドセーフ設計(並列対応)を使う。例えば toList/toSet/toMap は並列でも安全に結合されるよう設計されているが、独自の外部ミューテーションは避ける。
- 順序が必要なら forEachOrdered: 表示や書き出しで順序が重要なら forEachOrdered を使う。ただし順序維持はオーバーヘッドが増える可能性がある。
- 大きいデータ/重い計算に限定: 小さなリスト、超軽量処理(足し算だけ等)は、スレッド管理コストが勝ちやすくシリアルの方が速いことが多い。
- ベンチマークで確認: 並列化の効果は環境依存。測定して選ぶのが前提。
安全なコード例
1) 重い計算の並列化(副作用なし)
import java.util.*;
import java.util.stream.*;
public class HeavyParallel {
public static void main(String[] args) {
List<Integer> data = IntStream.range(0, 1_000_000).boxed().toList();
// 各要素で重い計算(例:疑似的なCPU負荷)を並列
long sum = data.parallelStream()
.mapToLong(HeavyParallel::heavyCompute)
.sum();
System.out.println(sum);
}
static long heavyCompute(int x) {
long r = 0;
for (int i = 0; i < 200; i++) r += (x * 31L + i) % 97;
return r;
}
}
Java- ポイント: 共有状態を書き換えない純粋関数で構成。終端は sum(並列セマンティクスに対応)。
2) 順序を保って出力
List<String> lines = List.of("A","B","C","D","E");
lines.parallelStream()
.map(s -> s + Thread.currentThread().getName())
.forEachOrdered(System.out::println); // 入力順で出力
Java- ポイント: forEachOrdered は順序保証。通常の forEach は順序が乱れることがある。
3) 並列対応の Collector(toMap の重複キー対策付き)
record User(String name, int age) {}
List<User> users = List.of(
new User("Tanaka", 30), new User("Sato", 25), new User("Kato", 25)
);
Map<Integer, String> namesByAge = users.parallelStream()
.collect(Collectors.toMap(
User::age,
User::name,
(a, b) -> a + "," + b // 重複キーの結合ルール
));
Java- ポイント: Collector は併合可能に設計されており、並列でも安全に結合される。重複キーはマージ関数必須。
よくある落とし穴と回避策
- 共有ミュータブル構造へ add/remove: 競合・欠損・例外の原因。回避は「collect を使う」「ローカルに集約→最終的に統合」。
- 軽い処理の並列化で遅くなる: スレッド分割・結合のオーバーヘッドが勝つ。重い計算や大きなデータに限定、測定して判断。
- 順序を期待して forEach を使用: 出力順は不定。必要なら forEachOrdered、またはソート済みデータで終端処理。
- 非スレッドセーフな外部収集: 独自 List へ外部同期なしで add する等は危険。Collector を使うか、ConcurrentLinkedQueue などの並行コレクションを選ぶ(ただし順序とコストに注意)。
- I/O の並列化の過信: ディスクやネットワークにボトルネックがあると逆効果。バッチ化・非同期 I/O・キューイングで最適化を検討。
テンプレート集
- 並列化の基本
list.parallelStream()
.map(this::expensive)
.filter(this::predicate)
.collect(Collectors.toList());
Java- 順序維持の出力
stream.parallel()
.forEachOrdered(System.out::println);
Java- 並列安全なマップ収集(順序保持したい場合は別途対策)
Map<K,V> m = stream.parallel()
.collect(Collectors.toMap(kf, vf, (a,b) -> merge(a,b)));
Java- グループ集計(並列対応)
Map<K, Long> counts = stream.parallel()
.collect(Collectors.groupingBy(kf, Collectors.counting()));
Java- 「外部ミューテーション禁止」の代替(ローカル集計→マージ)
int total = stream.parallel()
.mapToInt(this::score)
.sum();
Javaまとめ
- 並列 Stream は「大規模・重い処理」で、純粋関数+並列対応 Collector を使うと効果的。
- 共有可変状態への書き込みや順序依存の forEach は避け、必要なら forEachOrdered。
- 効果はケース依存。測定して導入可否を決めるのが現実的な最適解です。
