Java 逆引き集 | ストリームでの並列グループ化(groupingByConcurrent) — 並列集計

Java Java
スポンサーリンク

ねらいと前提

並列ストリームで「キーごとに集計」するなら、Collectors.groupingByConcurrent が最短で安全です。並列対応の ConcurrentHashMap を内部に使い、部分結果を同時に蓄えながら正しく結合できます。重要なポイントは「並列でもスレッドセーフに集約する」「順序は期待しない」「下流コレクタ(counting/summing…)も並列適性のあるものを選ぶ」ことです。これさえ守れば、群ごとの件数、合計、最大値、複数段のグループ化まで、スループットを落とさずに書けます。

基本形と動作の要点

groupingBy と groupingByConcurrent の違い

groupingBy は通常の Map(デフォルトは HashMap)を使い、並列ストリームでは combiner が最後に部分 Map をマージします。groupingByConcurrent は ConcurrentHashMap を使い、並列ストリームで「その場で」各キーのバケットに加算・追加できるため、ロック競合を最小化して高いスループットが出ます。順次ストリームでも正しく動きますが、真価は parallel にあります。

最も単純な並列グループ化(件数)

record User(String city, int score) {}

List<User> users = // 大量データ

Map<String, Long> byCity = users.parallelStream()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        User::city,
        java.util.stream.Collectors.counting()
    ));

// 例: {Tokyo=123456, Osaka=78901, ...}
Java

キー(city)ごとの件数を、並列で安全に数え上げます。counting は合計が単純に足し合わせられる(結合可能)ため、並列でも崩れません。

使い方を広げる(合計・最大・派生値)

合計・平均・最大などの典型集計

// キーごとに score 合計
Map<String, Integer> sumByCity = users.parallelStream()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        User::city,
        java.util.stream.Collectors.summingInt(User::score)
    ));

// キーごとに最大 score のユーザ
Map<String, java.util.Optional<User>> maxByCity = users.parallelStream()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        User::city,
        java.util.stream.Collectors.maxBy(java.util.Comparator.comparingInt(User::score))
    ));

// キーごとの平均(最後に除算)
record S(long sum, long n) {}
Map<String, S> sumCount = users.parallelStream()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        User::city,
        java.util.stream.Collectors.reducing(new S(0,0),
            u -> new S(u.score(), 1),
            (a,b) -> new S(a.sum() + b.sum(), a.n() + b.n()))
    ));
// 平均 = sumCount.get(city).sum() / (double) sumCount.get(city).n()
Java

平均は「途中で割らない」。sum と count を集めて最後に割るのが並列安全な定石です。

値を変換してからグループ化(mapping)

// city ごとに名前のリスト(並列に追加される)
Map<String, java.util.List<String>> namesByCity = users.parallelStream()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        User::city,
        java.util.stream.Collectors.mapping(
            u -> u.toString(), // 例: 表示名へ変換
            java.util.stream.Collectors.toList()
        )
    ));
Java

mapping は「キー決定 → 値変換 → 下流へ追加」の定番。toList は並列時に内部でバケットごとに安全に増えます(Collector の combiner が適切に設計されているため)。

下流コレクタの設計を深掘り

なぜ counting/summing は安全なのか

counting/summing は可換・結合的です。並列で部分和が別々に計算されても、最後に加算すれば同じ結果になります。reduce/reducing で独自の集計を作る場合は「結合律+単位元」を意識してください。これが崩れると並列で結果が揺れます。

List/Set への収集で気をつけること

toList/toSet は「キーごとに別の容器を持ち、combiner で addAll 結合」する Collector になっており、並列でも破綻しません。ただし巨大データではメモリピークが容器サイズになります。必要なら limit・フィルタを上流へ移す、キーを圧縮する、件数だけを数えるなどでピークを抑えましょう。

toConcurrentMap と使い分け

「グループ化」ではなく「キーが一意で上書きポリシーを指定したい」なら、toConcurrentMap が適任です。groupingByConcurrent は 1 キーに複数要素が付く前提、toConcurrentMap は 1 キー 1 値の前提。どちらも並列で安全に使えます。

Map<String, Integer> latestScoreById = users.parallelStream()
    .collect(java.util.stream.Collectors.toConcurrentMap(
        u -> u.toString(),         // 例: id
        User::score,
        (a, b) -> b // 競合したら後勝ち
    ));
Java

並列時の正しさ・性能の勘所

副作用を混ぜない(純粋関数に徹する)

map/filter/key 選択の関数は外部状態を書き換えない純粋関数にします。forEach で外部リストへ add などの副作用をやると、並列で壊れます。集約は必ず Collector へ。

順序に依存しない設計にする

groupingByConcurrent は結果 Map の「挿入順」を保証しません。parallelStream ではそもそも順序が崩れます。順序依存のロジックは、集計後に別途ソート(entrySet().stream().sorted(…))で扱います。

ソースの分割性が性能を決める

ArrayList、配列、IntStream.range は分割性が高く、並列の恩恵が出やすいです。LinkedList、生成系(Stream.generate/iterate)や I/O 由来は分割が苦手で、並列化しても伸びません。まずは順次でベースラインを取り、parallel で実測して判断するのが安全です。

キーの粒度と偏り

キーの種類が少ない(例: 3 都市に 1,000 万件)と、ホットバケットに競合が集中してスループットが落ちます。可能ならキーを細分化する、前段でフィルタ・分岐する、あるいは順次+後段並列(キーごとの重い処理だけ並列)に切り替えると安定します。

実戦例題で体感する

例題 1:ログ行のレベル別件数(巨大ファイル)

record Log(String level, String msg) {}

java.util.stream.Stream<Log> logs = // 解析済みの巨大ストリーム(並列で重い解析済み想定)

Map<String, Long> countByLevel = logs.parallel()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        Log::level,
        java.util.stream.Collectors.counting()
    ));

// 例: {ERROR=34567, WARN=120345, INFO=987654}
Java

レベルごとに件数を並列で数えます。解析が重いなら parallel による CPU 利用が効き、counting は並列で安全です。

例題 2:商品カテゴリ×月の売上合計(多段グループ)

record Sale(String category, java.time.YearMonth ym, int amount) {}

Map<String, Map<java.time.YearMonth, Integer>> sumByCatMonth =
    sales.parallelStream()
        .collect(java.util.stream.Collectors.groupingByConcurrent(
            Sale::category,
            java.util.stream.Collectors.groupingByConcurrent(
                Sale::ym,
                java.util.stream.Collectors.summingInt(Sale::amount)
            )
        ));

// 取り出し例: sumByCatMonth.get("Books").get(YearMonth.of(2025, 12))
Java

二段の groupingByConcurrent で「カテゴリ×月」へ同時に加算します。どちらの階層でも結合可能な下流コレクタ(summingInt)を使っているため並列でも正しく結合されます。

例題 3:上位 N キーの抽出(集計後にソート)

Map<String, Long> freq = items.parallelStream()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        x -> x, java.util.stream.Collectors.counting()
    ));

List<Map.Entry<String, Long>> top10 = freq.entrySet().stream()
    .sorted(java.util.Map.Entry.<String, Long>comparingByValue().reversed())
    .limit(10)
    .toList();
Java

グループ化自体は並列、順位付けは集計後に順次でソートします。順序は grouping 中に扱わないのがコツです。

テンプレート(すぐ使える雛形)

並列件数集計

Map<K, Long> counts = stream.parallel()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        keyFn,
        java.util.stream.Collectors.counting()
    ));
Java

並列合計(int)

Map<K, Integer> sums = stream.parallel()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        keyFn,
        java.util.stream.Collectors.summingInt(valFn)
    ));
Java

値を加工してからリスト化

Map<K, java.util.List<R>> lists = stream.parallel()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        keyFn,
        java.util.stream.Collectors.mapping(mapFn, java.util.stream.Collectors.toList())
    ));
Java

多段グループ化(外→内)

Map<K1, Map<K2, Long>> nested = stream.parallel()
    .collect(java.util.stream.Collectors.groupingByConcurrent(
        key1,
        java.util.stream.Collectors.groupingByConcurrent(
            key2,
            java.util.stream.Collectors.counting()
        )
    ));
Java

まとめ

groupingByConcurrent は「並列でグループ化・集計」するための第一選択です。結合可能な下流コレクタ(counting/summing/maxBy/reducing)を選び、関数は純粋に、順序へは依存しない。ソースの分割性とキー分布でスループットが決まるため、まず順次でベースラインを取り、parallel で実測して判断する。これらを徹底すれば、巨大データでもスレッドセーフに、正しく、速く集計できます。

タイトルとURLをコピーしました