ねらいと前提
parallelStream は「並列化すれば常に速い」魔法ではありません。効果が出る条件と落とし穴を理解し、順次と並列を同じ環境・同じ入力で比較するのが基本です。並列ストリームは ForkJoinPool でソースを分割し、複数スレッドで部分結果を処理して結合します。作り方は collection.parallelStream()、または既存ストリームに対して parallel()/sequential() を切り替えます。分割しづらいソースや副作用のある処理では、逆に遅くなったり誤結果の原因になります。
並列化の適性と落とし穴の理解
効果が出やすい条件(CPUバウンドと分割性)
- CPUがボトルネックの純粋計算: 文字列正規化、スコア計算、統計などは並列化でスループットが上がりやすい。テキスト解析のような重い計算は parallelStream で大きく改善する例があります。
- 分割しやすいソース: 配列、ArrayList、IntStream.range のような均等分割できる入力は並列に向きます。
避けるべき条件(I/Oや副作用)
- I/O 主体の処理: ファイル/ネットワークの待ち時間が支配的だと並列化の恩恵が薄く、オーバーヘッドが勝ちやすい。
- 共有ミュータブル状態の更新: 外部リストへの add、非スレッドセーフなコレクタの使用は壊れます。Collector は並列適性のあるものを選ぶか、ConcurrentHashMap 系を使います。
- 順序依存や不適切な合成: 順序を期待した処理や結合律を満たさない reduce は並列で誤結果になります。
ベンチマーク設計の基本方針
同条件・同入力・同終端で比較する
- 同じデータと同じ終端操作: 例えば「件数集計」「合計」「トップN」など、順次 stream と parallelStream で完全に同じ処理を走らせます。
- ウォームアップの実施: JIT の最適化とGCの影響を減らすため、ベンチ前に一定回数の事前実行を行います。
- 分割性の確認: LinkedList や生成系ストリーム(iterate/generate)は分割に弱く、parallel の効果が出にくいと理解した上で比較します。
正しさの担保(結合可能な演算に限定)
- Collector/Reduce の選定: counting、summing、maxBy など結合的な集計を使い、必要なら groupingByConcurrent や toConcurrentMap を採用します。
- 順序要件の明示化: 並列で順序は保証されません。必要箇所のみ forEachOrdered を用います(コスト増を理解)。
最小の実用ベンチマーク例(スモークテスト)
CPUバウンドなマップ+集計の比較
import java.util.*;
import java.util.concurrent.ThreadLocalRandom;
import java.util.stream.*;
public class ParallelBench {
static List<String> makeData(int n) {
var rnd = ThreadLocalRandom.current();
return IntStream.range(0, n)
.mapToObj(i -> "str" + rnd.nextInt(1_000_000))
.toList();
}
static int heavyHash(String s) {
// 擬似的に重い計算
int h = 0;
for (int i = 0; i < s.length(); i++) h = h * 31 + s.charAt(i);
return Math.abs(h);
}
static void warmup(List<String> data) {
IntStream.range(0, 3).forEach(r -> {
data.stream().mapToInt(ParallelBench::heavyHash).sum();
data.parallelStream().mapToInt(ParallelBench::heavyHash).sum();
});
}
static long timeMs(Runnable r) {
long t1 = System.nanoTime();
r.run();
return (System.nanoTime() - t1) / 1_000_000;
}
public static void main(String[] args) {
var data = makeData(2_000_000);
warmup(data);
long seq = timeMs(() -> data.stream().mapToInt(ParallelBench::heavyHash).sum());
long par = timeMs(() -> data.parallelStream().mapToInt(ParallelBench::heavyHash).sum());
System.out.printf("sequential=%d ms, parallel=%d ms%n", seq, par);
}
}
Java- ポイント: 同一データ・同一終端で、CPUバウンド処理を比較。ウォームアップ後の時間差を観測します。分割しやすい ArrayList を使用し、並列の恩恵が出やすい条件を整えています。
グループ化の比較(並列適性のあるコレクタ)
record User(String city, int score) {}
static Map<String, Long> countByCitySeq(List<User> users) {
return users.stream().collect(Collectors.groupingBy(User::city, Collectors.counting()));
}
static Map<String, Long> countByCityPar(List<User> users) {
return users.parallelStream().collect(Collectors.groupingByConcurrent(User::city, Collectors.counting()));
}
Java- ポイント: 並列に強い groupingByConcurrent+counting を使用。結合的かつスレッドセーフな集約で誤りなく比較できます。
実運用に近い検証パターン
データ規模・分布・分割性のスイープ
- サイズスイープ: 1万/10万/100万要素で連続測定し、並列のスケールメリットが出る閾値を把握します。
- キー分布: グループ化でキーの偏りが大きいとホットバケットに集中し、競合で伸びが鈍ります。分布を変えて測るか、キーを細分化して緩和を検証します。
- ソース切替: ArrayList と LinkedList、配列、IntStream.range の差を比較し、分割性の影響を体感します。
正しさチェックと再現性
- 同値性検証: 順次結果と並列結果が同一か(合計・件数・ソート不要の値)を必ず比較します Oracle Blogs。
- 順序非依存で設計: 結果の順序に依存しない比較(Map 内容、合計値など)を採用します Qiita。
監視と副作用の隔離
- 副作用排除: 外部状態更新をベンチ対象から外し、純粋関数のみ計測します。
- リソース競合を避ける: 実運用では他スレッドと ForkJoinPool の競合が起きます。単体検証に加え、実環境での同時負荷下の計測も行いましょう。
例題で学ぶ「並列が効く/効かない」
効く例:重いテキスト解析+並列集計
巨大テキスト群に対して名詞抽出やハッシュ計算などの重い前処理がある場合、parallelStream と ConcurrentHashMap ベースの集計で大幅な性能改善が見込めます Zenn。このケースでは I/O を先に材質化するのではなく、並列適性のある集計(counting、summing)に流すのが安全です。
効かない例:行の I/O と副作用まみれの collect
Files.lines のような I/O 供給ストリームを、外部リストへの add や非スレッドセーフなコレクタで集約すると、オーバーヘッドや競合で遅くなり、誤結果の温床になります。並列では副作用を排し、必要なら groupingByConcurrent や toConcurrentMap を使います。
テンプレート(すぐ試せる雛形)
ウォームアップ+単純計測の骨格
static void warmup(Runnable r, int n) { for (int i = 0; i < n; i++) r.run(); }
static long nsec(Runnable r) { long t = System.nanoTime(); r.run(); return System.nanoTime() - t; }
var data = IntStream.range(0, 2_000_000).boxed().toList();
Runnable seq = () -> data.stream().mapToInt(i -> i * i).sum();
Runnable par = () -> data.parallelStream().mapToInt(i -> i * i).sum();
warmup(seq, 3); warmup(par, 3);
System.out.printf("seq=%.2f ms, par=%.2f ms%n", nsec(seq)/1e6, nsec(par)/1e6);
Java並列安全なグループ化
Map<K, Long> counts = stream.parallel()
.collect(Collectors.groupingByConcurrent(keyFn, Collectors.counting()));
Java正しさの自動検証
var seqOut = pipelineSequential(input);
var parOut = pipelineParallel(input);
assert seqOut.equals(parOut) : "parallel mismatch";
Javaまとめ
parallelStream の検証は「同条件・同入力・同終端」で、CPUバウンドかつ分割性の高いソースを選び、ウォームアップ後に順次と並列を厳密に比較します。I/O 主体や副作用のある処理は避け、結合可能な Collector(counting、summing、maxBy、groupingByConcurrent 等)で正しさとスループットを両立します。分割性・キー分布・順序への依存が結果を左右するため、まずスモークで傾向を掴み、実運用条件(同時負荷・競合)でも再計測して最終判断を下すのがプロのベンチ手筋です。
