Java Tips | コレクション:並列Stream制御

Java Java
スポンサーリンク

並列Stream制御は「速くしたいけど、暴れさせたくない」技

Stream には
stream()(普通の順次処理)と
parallelStream()(並列処理)
の2つの世界があります。

「CPUコアを全部使って一気に処理したい」
「でも、どこでもかしこでも並列にすると逆に遅くなる・バグる」

このジレンマをうまくコントロールするのが「並列Stream制御」です。
今日は「いつ parallel にするか」「どう制御するか」を、初心者向けにかみ砕いていきます。


基本:parallelStream と stream の違いを体感する

同じ処理を順次と並列で書いてみる

まずは、単純な例で「何が違うのか」を見てみます。

import java.util.List;

public class ParallelBasic {

    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5);

        System.out.println("=== sequential ===");
        numbers.stream()
               .map(ParallelBasic::heavy)
               .forEach(System.out::println);

        System.out.println("=== parallel ===");
        numbers.parallelStream()
               .map(ParallelBasic::heavy)
               .forEach(System.out::println);
    }

    private static int heavy(int n) {
        System.out.println(Thread.currentThread().getName() + " : " + n);
        try {
            Thread.sleep(500); // 重い処理のつもり
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
        return n * 2;
    }
}
Java

順次版では、ほぼ常に main スレッドだけが動きます。
並列版では、ForkJoinPool.commonPool-worker-... のような複数スレッドが動いているのが見えるはずです。

ここでの重要ポイントは二つです。

一つ目は、「parallelStream() と書くだけで、“どのスレッドで実行するか”をフレームワーク側に任せられる」ことです。
自分でスレッドを立てたり、Executor を直接触ったりする必要はありません。

二つ目は、「並列にした瞬間、“順番”や“スレッド安全性”を自分で意識しないといけなくなる」ことです。
ここを理解せずに parallel を使うと、業務コードが一気に危険地帯になります。


重要1:並列Streamで「やってはいけないこと」

副作用のある処理を共有オブジェクトに対して行う

一番危ないのは、「共有の可変オブジェクトに対して、複数スレッドから書き込みをする」ことです。

import java.util.ArrayList;
import java.util.List;

public class BadExample {

    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5);
        List<Integer> result = new ArrayList<>();

        numbers.parallelStream()
               .map(n -> n * 2)
               .forEach(result::add); // 危険!

        System.out.println(result);
    }
}
Java

これは「たまたま動く」こともありますが、
本質的にはスレッドセーフではなく、要素が欠けたり順番がおかしくなったりする可能性があります。

並列Streamでは、次のようなことは避けるべきです。

共有の ArrayListadd する。
共有のカウンタを ++ する。
共有の Map に put する(スレッドセーフでない実装)。

代わりに、「Stream の終端操作で集約させる」書き方にします。

import java.util.List;
import java.util.stream.Collectors;

public class GoodExample {

    public static void main(String[] args) {
        List<Integer> numbers = List.of(1, 2, 3, 4, 5);

        List<Integer> result =
                numbers.parallelStream()
                       .map(n -> n * 2)
                       .collect(Collectors.toList());

        System.out.println(result);
    }
}
Java

ここでの重要ポイントは、
「並列Streamでは“副作用のある共有オブジェクト”を触らず、“不変な値の変換+フレームワークによる集約”に徹する」ことです。


重要2:並列にすると「順番」が保証されないことがある

forEach と forEachOrdered の違い

並列Streamで forEach を使うと、処理順序は保証されません。

List<Integer> numbers = List.of(1, 2, 3, 4, 5);

numbers.parallelStream()
       .forEach(System.out::println); // 3 1 5 2 4 など、順不同
Java

「順番がどうでもいい」処理なら問題ありませんが、
「入力順に出したい」場合は forEachOrdered を使う必要があります。

numbers.parallelStream()
       .forEachOrdered(System.out::println); // 1 2 3 4 5
Java

ここでの重要ポイントは二つです。

一つ目は、「並列Streamは“順番を崩してもいいなら速くできる”が、“順番を守ろうとするとその分コストがかかる”」ということです。

二つ目は、「業務要件として“順番が意味を持つかどうか”を、最初に決めておく」ことです。
ログ出力や画面表示など、「順番が大事なところ」で安易に parallel を使うと、後で困ります。


重要3:並列Streamは「何でも速くする魔法」ではない

要素数・処理の重さ・オーバーヘッドを意識する

並列Streamには、スレッド分割やタスク管理のオーバーヘッドがあります。
要素数が少ない・処理が軽い場合は、むしろ遅くなることも普通にあります。

例えば、100件程度の List に対して、軽い map をするだけなら、
素直に stream() で順次処理した方が速いことが多いです。

並列Streamが効果を発揮しやすいのは、ざっくり言うと次のような条件です。

要素数が多い(数万〜数十万以上など)。
各要素の処理がそこそこ重い(I/O待ちやCPU計算)。
処理が独立していて、共有状態をほとんど触らない。

ここでの重要ポイントは、
「parallel を使う前に、“本当にここがボトルネックか?”を一度疑う」ことです。

プロファイラやログで「どこに時間がかかっているか」を見たうえで、
“ここは並列化する価値がある”と判断できる場所だけに parallel を導入するのが、実務的なやり方です。


並列Stream制御ユーティリティの考え方

「ここだけ並列」「ここは絶対順次」を明示する

プロジェクトが大きくなると、
「誰かが勝手に parallelStream を使って、思わぬところでスレッドが暴れる」
という事故が起きがちです。

そうならないように、「並列化の方針」をユーティリティやラッパーで表現しておくのも一つの手です。

import java.util.Collection;
import java.util.stream.Stream;

public final class Parallels {

    private Parallels() {}

    public static <T> Stream<T> maybeParallel(
            Collection<T> source,
            boolean parallel
    ) {
        if (source == null || source.isEmpty()) {
            return Stream.empty();
        }
        Stream<T> s = source.stream();
        return parallel ? s.parallel() : s;
    }
}
Java

使い方のイメージです。

Parallels.maybeParallel(list, useParallel)
         .map(...)
         .collect(...);
Java

ここでの重要ポイントは二つです。

一つ目は、「並列にするかどうかの判断を、“呼び出し側のフラグ”や“設定値”に寄せられる」ことです。
本番では parallel、テストでは sequential、のような切り替えもやりやすくなります。

二つ目は、「parallelStream() をあちこちに直接書かず、“並列化の入口”を限定できる」ことです。
これにより、「どこで並列化しているか」を追いやすくなり、トラブルシュートもしやすくなります。


まとめ:並列Stream制御で身につけてほしい感覚

並列Stream制御は、
単に「parallelStream() を知る」話ではなく、
「どこを並列にし、どこをあえて順次のままにするかを設計する技術」です。

副作用のある共有オブジェクト(List, Map など)を並列Streamから直接触らない。
順番が意味を持つ処理では、forEachOrdered や「そもそも parallel にしない」という選択をする。
「要素数」「処理の重さ」「オーバーヘッド」を見て、本当に並列化の価値がある場所だけに使う。
parallelStream() をあちこちにばらまかず、「並列化の入口」をユーティリティや設定でコントロールする。

あなたのコードのどこかに、
「とりあえず速くなりそうだから parallelStream にしてみた」箇所があれば、
一度「本当にここを並列にすべきか?」「副作用や順番の問題はないか?」を眺め直してみてください。

その慎重な一歩が、
「マシンパワーを“安全に”引き出せるエンジニア」への、
確かなステップになります。

タイトルとURLをコピーしました