目的と前提
並列 reduce を正しく設計する鍵は「結合可能(結合律を満たす)な演算」と「適切な単位元(identity)」、そして「accumulator と combiner の整合性」です。並列では部分結果を別々のスレッドで計算し、最後に結合します。つまり、演算が結合的でないと結果が崩れ、identity が不適切だと値が歪みます。ここを外さない限り、並列化しても安全に正確な答えに到達できます。
並列 reduce の原則(結合律・単位元・整合性)
結合律と単位元を満たす演算に限定する
結合律とは、二項演算 ⊕ が「(a ⊕ b) ⊕ c = a ⊕ (b ⊕ c)」を満たす性質で、これを満たし、かつ単位元 e が「e ⊕ x = x ⊕ e = x」を満たすときに、並列で部分結果を任意の順序で結合しても正しい結果になります。和(+)、積(×)、最小(min)、最大(max)、連結(StringBuilder での append)は結合的です。逆に「平均の生値を足して最後に割る」「非可換な順序依存合成」は結合律を崩しがちです。
accumulator と combiner は同じ「結合」を表す
reduce の三引数版では、accumulator(A, T → A)と combiner(A, A → A)を渡します。並列では「accumulator で部分結果を作る」「combiner で部分結果どうしを結合する」ため、両者が同じ演算の拡張でなければなりません。例えば「合計」であれば、accumulator は「A に T を加算」、combiner は「A1 と A2 を加算」です。ここがズレると並列時のみ誤答になります。
identity は「何もしていない状態」を厳密に表す
identity は「初期の部分結果」と「空ストリームの結果」を表します。和なら 0、積なら 1、最小なら正の無限大相当(実装では適切な初期化戦略)、文字列連結なら空文字列。identity が不適切だと、並列分割時に余計な寄与を生み、正解を歪めます。
基本コードと安全パターン
和・積・最小・最大(結合的な典型)
int sum = IntStream.rangeClosed(1, 1_000_000)
.parallel()
.reduce(0, (a, b) -> a + b); // identity=0, 結合律を満たす
int product = IntStream.of(1,2,3,4)
.parallel()
.reduce(1, (a, b) -> a * b); // identity=1
int min = IntStream.of(5, 2, 9, 1)
.parallel()
.reduce(Integer.MAX_VALUE, Math::min);
int max = IntStream.of(5, 2, 9, 1)
.parallel()
.reduce(Integer.MIN_VALUE, Math::max);
Javaこれらは結合的な演算の代表例で、identity も自然です。並列にしても正しさが担保されます。
三引数 reduce(accumulator と combiner の整合)
record Stats(long sum, long count) { Stats add(int x) { return new Stats(sum + x, count + 1); } }
Stats zero = new Stats(0, 0);
Stats stats = IntStream.rangeClosed(1, 10)
.parallel()
.boxed()
.reduce(
zero,
(st, x) -> st.add(x), // accumulator: A,T -> A
(s1, s2) -> new Stats(s1.sum() + s2.sum(), // combiner: A,A -> A
s1.count() + s2.count())
);
double avg = stats.count() == 0 ? Double.NaN : (double) stats.sum() / stats.count();
Javaaccumulator と combiner が「同じ集計の加算」になっているため、部分集計を安全に結合できます。平均を直接 reduce するのではなく、和と件数を集めて最後に割るのがセオリーです。
連結や集合の結合(可変容器は combiner を正しく)
List<String> joined = Stream.of("a","b","c")
.parallel()
.reduce(
new ArrayList<>(),
(acc, s) -> { acc.add(s); return acc; }, // 要素を入れる
(left, right) -> { left.addAll(right); return left; } // 部分結果を結合
);
Java並列では各スレッドが別の ArrayList に詰めるため、combiner で「部分リストの結合」を正しく実装する必要があります。外部リストに forEach で add するような副作用設計は避け、reduce/collect で容器を結合します。
例題で理解する「結合可能な設計」
例題1:頻度集計(Map の結合)
Map<String, Long> freq = Stream.of("a","b","a","c","b","a")
.parallel()
.reduce(
new HashMap<>(),
(m, s) -> { m.merge(s, 1L, Long::sum); return m; },
(m1, m2) -> { m2.forEach((k,v) -> m1.merge(k, v, Long::sum)); return m1; }
);
Java部分 Map を最後に merge するのは結合的です。キーごとの加算は結合律を満たすため、並列でも正確な結果になります。大量データでは groupingByConcurrent のほうが簡潔かつ高性能なことが多いですが、reduce でも正しく書けます。
例題2:中央値は reduce しない(順序依存の反例)
中央値や「k 番目」など順序依存の操作は結合的ではありません。並列 reduce に向かず、専用アルゴリズム(選択アルゴリズム)や材質化+ソート+インデックスで扱うべきです。結合律が崩れる操作は「並列の結合順序」で結果が変わるため不適切です。
例題3:平均の誤設計と是正
誤り例(NG):
double avg = list.parallelStream()
.reduce(0.0, (a, x) -> (a + x) / 2.0, (a, b) -> (a + b) / 2.0); // 結合律を満たさない
Java是正(OK):
record S(double sum, long n) {}
S s = list.parallelStream()
.reduce(new S(0,0),
(st, x) -> new S(st.sum() + x, st.n() + 1),
(s1, s2) -> new S(s1.sum() + s2.sum(), s1.n() + s2.n()));
double avg = s.n() == 0 ? Double.NaN : s.sum() / s.n();
Java平均を直接「その場で割る」設計は結合律を壊します。和と件数を集めて最後に一度だけ割ることで並列でも正しくなります。
深掘り:三引数 reduce と Collector の選択基準
三引数 reduce は「モノイド」設計を意識する
三引数 reduce の正しさは、入れ物 A と演算(accumulator/combiner)がモノイド(結合律+単位元)を形成するかで決まります。A が「部分結果の型」、⊕ が「結合操作」、e が「初期値」です。これを満たす限り、ForkJoinPool の任意分割・任意結合でも正しい値になります。reduce のドキュメントが強調する「結合的な累積関数」「単位元」は、このモノイド要件の説明です。
Collector を優先する場面
可変容器の結合(List, Map, StringBuilder)は、Collector(toList, groupingByConcurrent, joining)を使うほうが分かりやすく、combiner のバグを避けられます。特に並列では concurrent Collector が用意されており、正しさと性能の両立が容易です。reduce は「汎用的な結合」を自作したいときに限り、Collector があるならそちらを選ぶのが定石です。
浮動小数の数値安定性
浮動小数点の加算は数学的には結合的ですが、丸め誤差により結合順序で結果が微妙に変わります。並列では結合順序が不定になるため、厳密再現が必要な統計では Kahan 加算など誤差低減法を「A 型」に持たせて reduce するのが安全です。
テンプレート(安全な並列 reduce 雛形)
和・最大・最小(identity と結合律が明快)
int sum = stream.parallel().reduce(0, Integer::sum);
int max = stream.parallel().reduce(Integer.MIN_VALUE, Integer::max);
int min = stream.parallel().reduce(Integer.MAX_VALUE, Integer::min);
Java三引数 reduce(和+件数で平均)
record S(double sum, long n) {}
S s = stream.parallel()
.reduce(new S(0,0),
(st, x) -> new S(st.sum() + x, st.n() + 1),
(a, b) -> new S(a.sum() + b.sum(), a.n() + b.n()));
double avg = s.n() == 0 ? Double.NaN : s.sum() / s.n();
Java可変容器の結合(List)
List<T> out = stream.parallel()
.reduce(new ArrayList<>(),
(acc, t) -> { acc.add(t); return acc; },
(a, b) -> { a.addAll(b); return a; });
Java頻度 Map の結合
Map<K, Long> freq = stream.parallel()
.reduce(new HashMap<>(),
(m, k) -> { m.merge(k, 1L, Long::sum); return m; },
(m1, m2) -> { m2.forEach((k,v) -> m1.merge(k, v, Long::sum)); return m1; });
Javaまとめ
並列 reduce の正しさは「結合律」「単位元」「accumulator/combiner の整合性」の三点で決まります。結合的な演算(和・積・min/max・連結)に限定し、identity を厳密に設定し、部分結果どうしを同じ演算で結ぶこと。容器の結合は Collector を優先し、reduce を使うならモノイド設計を明示する。順序依存の操作(中央値など)は reduce に載せない。こうした原則に従えば、並列化しても正確さを損なわず、性能向上の恩恵だけを享受できます。
