ねらいと前提
並列ストリームで「キーごとに集計」するなら、Collectors.groupingByConcurrent が最短で安全です。並列対応の ConcurrentHashMap を内部に使い、部分結果を同時に蓄えながら正しく結合できます。重要なポイントは「並列でもスレッドセーフに集約する」「順序は期待しない」「下流コレクタ(counting/summing…)も並列適性のあるものを選ぶ」ことです。これさえ守れば、群ごとの件数、合計、最大値、複数段のグループ化まで、スループットを落とさずに書けます。
基本形と動作の要点
groupingBy と groupingByConcurrent の違い
groupingBy は通常の Map(デフォルトは HashMap)を使い、並列ストリームでは combiner が最後に部分 Map をマージします。groupingByConcurrent は ConcurrentHashMap を使い、並列ストリームで「その場で」各キーのバケットに加算・追加できるため、ロック競合を最小化して高いスループットが出ます。順次ストリームでも正しく動きますが、真価は parallel にあります。
最も単純な並列グループ化(件数)
record User(String city, int score) {}
List<User> users = // 大量データ
Map<String, Long> byCity = users.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
User::city,
java.util.stream.Collectors.counting()
));
// 例: {Tokyo=123456, Osaka=78901, ...}
Javaキー(city)ごとの件数を、並列で安全に数え上げます。counting は合計が単純に足し合わせられる(結合可能)ため、並列でも崩れません。
使い方を広げる(合計・最大・派生値)
合計・平均・最大などの典型集計
// キーごとに score 合計
Map<String, Integer> sumByCity = users.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
User::city,
java.util.stream.Collectors.summingInt(User::score)
));
// キーごとに最大 score のユーザ
Map<String, java.util.Optional<User>> maxByCity = users.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
User::city,
java.util.stream.Collectors.maxBy(java.util.Comparator.comparingInt(User::score))
));
// キーごとの平均(最後に除算)
record S(long sum, long n) {}
Map<String, S> sumCount = users.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
User::city,
java.util.stream.Collectors.reducing(new S(0,0),
u -> new S(u.score(), 1),
(a,b) -> new S(a.sum() + b.sum(), a.n() + b.n()))
));
// 平均 = sumCount.get(city).sum() / (double) sumCount.get(city).n()
Java平均は「途中で割らない」。sum と count を集めて最後に割るのが並列安全な定石です。
値を変換してからグループ化(mapping)
// city ごとに名前のリスト(並列に追加される)
Map<String, java.util.List<String>> namesByCity = users.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
User::city,
java.util.stream.Collectors.mapping(
u -> u.toString(), // 例: 表示名へ変換
java.util.stream.Collectors.toList()
)
));
Javamapping は「キー決定 → 値変換 → 下流へ追加」の定番。toList は並列時に内部でバケットごとに安全に増えます(Collector の combiner が適切に設計されているため)。
下流コレクタの設計を深掘り
なぜ counting/summing は安全なのか
counting/summing は可換・結合的です。並列で部分和が別々に計算されても、最後に加算すれば同じ結果になります。reduce/reducing で独自の集計を作る場合は「結合律+単位元」を意識してください。これが崩れると並列で結果が揺れます。
List/Set への収集で気をつけること
toList/toSet は「キーごとに別の容器を持ち、combiner で addAll 結合」する Collector になっており、並列でも破綻しません。ただし巨大データではメモリピークが容器サイズになります。必要なら limit・フィルタを上流へ移す、キーを圧縮する、件数だけを数えるなどでピークを抑えましょう。
toConcurrentMap と使い分け
「グループ化」ではなく「キーが一意で上書きポリシーを指定したい」なら、toConcurrentMap が適任です。groupingByConcurrent は 1 キーに複数要素が付く前提、toConcurrentMap は 1 キー 1 値の前提。どちらも並列で安全に使えます。
Map<String, Integer> latestScoreById = users.parallelStream()
.collect(java.util.stream.Collectors.toConcurrentMap(
u -> u.toString(), // 例: id
User::score,
(a, b) -> b // 競合したら後勝ち
));
Java並列時の正しさ・性能の勘所
副作用を混ぜない(純粋関数に徹する)
map/filter/key 選択の関数は外部状態を書き換えない純粋関数にします。forEach で外部リストへ add などの副作用をやると、並列で壊れます。集約は必ず Collector へ。
順序に依存しない設計にする
groupingByConcurrent は結果 Map の「挿入順」を保証しません。parallelStream ではそもそも順序が崩れます。順序依存のロジックは、集計後に別途ソート(entrySet().stream().sorted(…))で扱います。
ソースの分割性が性能を決める
ArrayList、配列、IntStream.range は分割性が高く、並列の恩恵が出やすいです。LinkedList、生成系(Stream.generate/iterate)や I/O 由来は分割が苦手で、並列化しても伸びません。まずは順次でベースラインを取り、parallel で実測して判断するのが安全です。
キーの粒度と偏り
キーの種類が少ない(例: 3 都市に 1,000 万件)と、ホットバケットに競合が集中してスループットが落ちます。可能ならキーを細分化する、前段でフィルタ・分岐する、あるいは順次+後段並列(キーごとの重い処理だけ並列)に切り替えると安定します。
実戦例題で体感する
例題 1:ログ行のレベル別件数(巨大ファイル)
record Log(String level, String msg) {}
java.util.stream.Stream<Log> logs = // 解析済みの巨大ストリーム(並列で重い解析済み想定)
Map<String, Long> countByLevel = logs.parallel()
.collect(java.util.stream.Collectors.groupingByConcurrent(
Log::level,
java.util.stream.Collectors.counting()
));
// 例: {ERROR=34567, WARN=120345, INFO=987654}
Javaレベルごとに件数を並列で数えます。解析が重いなら parallel による CPU 利用が効き、counting は並列で安全です。
例題 2:商品カテゴリ×月の売上合計(多段グループ)
record Sale(String category, java.time.YearMonth ym, int amount) {}
Map<String, Map<java.time.YearMonth, Integer>> sumByCatMonth =
sales.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
Sale::category,
java.util.stream.Collectors.groupingByConcurrent(
Sale::ym,
java.util.stream.Collectors.summingInt(Sale::amount)
)
));
// 取り出し例: sumByCatMonth.get("Books").get(YearMonth.of(2025, 12))
Java二段の groupingByConcurrent で「カテゴリ×月」へ同時に加算します。どちらの階層でも結合可能な下流コレクタ(summingInt)を使っているため並列でも正しく結合されます。
例題 3:上位 N キーの抽出(集計後にソート)
Map<String, Long> freq = items.parallelStream()
.collect(java.util.stream.Collectors.groupingByConcurrent(
x -> x, java.util.stream.Collectors.counting()
));
List<Map.Entry<String, Long>> top10 = freq.entrySet().stream()
.sorted(java.util.Map.Entry.<String, Long>comparingByValue().reversed())
.limit(10)
.toList();
Javaグループ化自体は並列、順位付けは集計後に順次でソートします。順序は grouping 中に扱わないのがコツです。
テンプレート(すぐ使える雛形)
並列件数集計
Map<K, Long> counts = stream.parallel()
.collect(java.util.stream.Collectors.groupingByConcurrent(
keyFn,
java.util.stream.Collectors.counting()
));
Java並列合計(int)
Map<K, Integer> sums = stream.parallel()
.collect(java.util.stream.Collectors.groupingByConcurrent(
keyFn,
java.util.stream.Collectors.summingInt(valFn)
));
Java値を加工してからリスト化
Map<K, java.util.List<R>> lists = stream.parallel()
.collect(java.util.stream.Collectors.groupingByConcurrent(
keyFn,
java.util.stream.Collectors.mapping(mapFn, java.util.stream.Collectors.toList())
));
Java多段グループ化(外→内)
Map<K1, Map<K2, Long>> nested = stream.parallel()
.collect(java.util.stream.Collectors.groupingByConcurrent(
key1,
java.util.stream.Collectors.groupingByConcurrent(
key2,
java.util.stream.Collectors.counting()
)
));
Javaまとめ
groupingByConcurrent は「並列でグループ化・集計」するための第一選択です。結合可能な下流コレクタ(counting/summing/maxBy/reducing)を選び、関数は純粋に、順序へは依存しない。ソースの分割性とキー分布でスループットが決まるため、まず順次でベースラインを取り、parallel で実測して判断する。これらを徹底すれば、巨大データでもスレッドセーフに、正しく、速く集計できます。
