Java | Java 標準ライブラリ:CompletableFuture 基礎

Java Java
スポンサーリンク

CompletableFuture を一言でいうと

CompletableFuture は、

「将来終わる“非同期の計算結果”を表す箱(Future)に対して、
『終わったらこれをして、その後これをして…』という“後続処理”をつなげていける仕組み」

です。

普通の Future は「結果を待つ(get する)」ことはできますが、
「終わったら自動で次の処理を実行する」「複数の非同期処理を組み合わせる」といったことがやりにくいです。

CompletableFuture はそこを一気に強化したクラスで、

非同期で実行する
終わったら自動で次のステップ
途中でエラーになったら別の流れ
複数の非同期を並列実行してまとめる

といったことを“宣言的に”つないで書けるようにしてくれます。


まずは Future との違いイメージを固める

普通の Future は、「約束の箱」だけです。

タスクを ExecutorServicesubmit して、Future を受け取り、
必要になったときに get() で結果を取りに行く、という使い方でした。

CompletableFuture は、

自分自身が Future でもある
かつ、「終わったらこれをやる」という“後続処理”を登録できる

というところがポイントです。

「Future に“完了したらどうするか”をくっつけられるようにしたもの」
くらいに思ってください。

そのおかげで、

A を非同期で計算して、終わったらそれを元に B を計算し、
さらにその結果で C を表示する

といった流れを、

A
→ thenApply で B
→ thenAccept で C

というふうにつなげて書けるようになります。


CompletableFuture の一番基本的な作り方

supplyAsync と runAsync

一番よく使うのは、supplyAsyncrunAsync の 2 つです。

CompletableFuture.supplyAsync(Supplier<T>)
結果を返す処理(T get())を非同期で実行し、その結果を持つ CompletableFuture<T> を返す。

CompletableFuture.runAsync(Runnable)
戻り値のない処理(void run())を非同期で実行し、結果型が VoidCompletableFuture<Void> を返す。

まずは supplyAsync の例から。

import java.util.concurrent.CompletableFuture;

public class SupplyAsyncBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("重い計算を別スレッドで開始");
                    try {
                        Thread.sleep(2000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    return 42;
                });

        System.out.println("main スレッドは別の仕事をしている…");

        Integer result = future.get();   // 終わるまで待って結果を取得
        System.out.println("結果 = " + result);
    }
}
Java

ここで起きていることは次の通りです。

supplyAsync に渡したラムダが別スレッドで実行される。
戻り値 42 を持つ CompletableFuture<Integer> が返ってくる。
必要になったタイミングで get() で結果を取りに行く。

Future と似ていますが、「後続処理をつなげる」という本領はまだ発揮していません。
まずは「非同期で値を計算する Future」としてイメージを掴んでください。

runAsync は戻り値なし版です。

import java.util.concurrent.CompletableFuture;

public class RunAsyncBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future =
                CompletableFuture.runAsync(() -> {
                    System.out.println("バックグラウンドでログ出力など");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        throw new RuntimeException(e);
                    }
                    System.out.println("バックグラウンド処理完了");
                });

        System.out.println("main は待たずに先に進む");

        future.get();   // 必要なら、最後に「終わるのを待つ」
        System.out.println("全処理完了");
    }
}
Java

thenApply / thenAccept / thenRun で「完了後の処理」をつなげる

ここからが CompletableFuture の本領です。

「非同期で計算が終わったら、その結果を使ってさらに処理を行う」
というのを、thenApply などでつなげて書きます。

thenApply:結果を変換する

thenApply は、「結果を別の値に変換する」処理をつなぐメソッドです。

import java.util.concurrent.CompletableFuture;

public class ThenApplyBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("数値を計算中…");
                    return 10;
                });

        CompletableFuture<Integer> doubled =
                future.thenApply(x -> {
                    System.out.println("thenApply で 2 倍にする");
                    return x * 2;
                });

        Integer result = doubled.get();
        System.out.println("結果 = " + result); // 20
    }
}
Java

このときの流れはこうです。

supplyAsync が 10 を計算して CompletableFuture<Integer> を完了させる。
その完了をトリガーに、thenApply に渡したラムダが「別のスレッド(または同じスレッド)」で実行され、20 を返す。
20 を持つ新しい CompletableFuture<Integer>(ここでは doubled)が返される。

ポイントは、「元の future と、thenApply の結果の future は別物」だということです。
チェーンの各ステップが、それぞれ新しい CompletableFuture を作っているイメージです。

thenAccept:結果を受け取って“消費”する(戻り値なし)

thenAccept は、「結果を受け取って何かするが、次の値は返さない」処理をつなぐときに使います。

import java.util.concurrent.CompletableFuture;

public class ThenAcceptBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future =
                CompletableFuture.supplyAsync(() -> "Hello, CompletableFuture")
                    .thenAccept(msg -> {
                        System.out.println("メッセージを表示: " + msg);
                    });

        future.get(); // 表示が終わるのを待つ
    }
}
Java

ここでは、

supplyAsync で文字列を非同期に作る。
その結果が来たら thenAccept で表示する。
戻り値は Void なので、「これでチェーン終わり」のイメージ。

という流れになっています。

thenRun:結果を使わず「終わったらこれだけしたい」

thenRun は、前の結果を使わず、
「とにかく終わったらこれを実行して」と言いたいときに使います。

import java.util.concurrent.CompletableFuture;

public class ThenRunBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> future =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("重い処理中…");
                    return 123;
                }).thenRun(() -> {
                    System.out.println("重い処理が終わったので、後片付きをする");
                });

        future.get(); // 全部終わるのを待つ
    }
}
Java

前段の結果は無視して、
「完了した」という事実だけをトリガーに動く後処理を書くイメージです。


thenCompose / thenCombine で「非同期どうし」をつなぐ

少しだけレベルを上げます。
ここからは「非同期の結果」がさらに「非同期処理」を返すようなケースです。

thenCompose:Future<Future<T>> を平らにする

例えば、「ユーザー情報を取ってきて、そのユーザー ID で別の API を呼ぶ」ようなケース。

両方とも非同期だとすると、

ユーザー取得:CompletableFuture<User>
ユーザー ID から情報取得:id -> CompletableFuture<Detail>

という関係になりがちです。

これを素直に thenApply で書くと、

CompletableFuture<CompletableFuture<Detail>>

という二重の Future になってしまいます。

そこで使うのが thenCompose です。

CompletableFuture<User> fetchUserAsync();
CompletableFuture<Detail> fetchDetailAsync(long userId);

CompletableFuture<Detail> detailFuture =
        fetchUserAsync()
            .thenCompose(user -> fetchDetailAsync(user.getId()));
Java

thenCompose は、「結果を使って別の CompletableFuture を返す」関数を受け取り、
「二重 Future を平らにしてくれる」イメージです。

初心者の段階では、

「thenApply は “同期的な変換結果” を返すとき」
「thenCompose は “新しい非同期処理” を返したいとき」

くらいの理解で十分です。

thenCombine:二つの非同期を並列に走らせて、両方終わったら合成する

例えば、「価格情報」と「在庫情報」を別々の API で取ってきて、
両方揃ったら一つのビューにまとめたい、というようなケースです。

CompletableFuture<Integer> priceFuture = fetchPriceAsync();
CompletableFuture<Integer> stockFuture = fetchStockAsync();

CompletableFuture<String> summaryFuture =
        priceFuture.thenCombine(stockFuture, (price, stock) -> {
            return "価格: " + price + ", 在庫: " + stock;
        });

String summary = summaryFuture.get();
System.out.println(summary);
Java

ここでは、

priceFuturestockFuture はそれぞれ並列に動く。
両方が完了したタイミングで、(price, stock) のラムダが呼ばれる。
その戻り値を持つ CompletableFuture<String> が完成する。

という流れになります。

「複数の非同期結果をまとめて次の処理に渡したい」ときに、thenCombine は非常に便利です。


例外処理:exceptionally / handle でエラーに向き合う

非同期処理でも、例外は必ず考えないといけません。
CompletableFuture は、例外処理もチェーンの中に組み込めるようになっています。

exceptionally:エラー時だけ“代わりの値”を返す

exceptionally は、「どこかで例外が起きたときにだけ呼ばれる」ハンドラを登録します。

import java.util.concurrent.CompletableFuture;

public class ExceptionallyBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() -> {
                    System.out.println("計算開始");
                    if (true) {
                        throw new RuntimeException("計算失敗");
                    }
                    // return 10;
                }).exceptionally(ex -> {
                    System.out.println("例外発生: " + ex.getMessage());
                    return 0;  // エラー時は 0 を返すことにする
                });

        Integer result = future.get();
        System.out.println("結果 = " + result); // 0
    }
}
Java

チェーンのどこかで例外が飛ぶと、その時点で future は「例外で完了した」状態になります。
exceptionally は、それを受け取って、「代わりの正常な値」を返すことができます。

例外をログに出して、フォールバック値を返す、というパターンに向いています。

handle:成功時と失敗時の両方を一か所で扱う

handle は、成功でも失敗でも必ず呼ばれます。

ラムダの引数は (結果, 例外) の 2 つで、
どちらか一方が null になります。

import java.util.concurrent.CompletableFuture;

public class HandleBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Integer> future =
                CompletableFuture.supplyAsync(() -> {
                    if (Math.random() < 0.5) {
                        return 10;
                    } else {
                        throw new RuntimeException("たまたま失敗");
                    }
                }).handle((value, ex) -> {
                    if (ex != null) {
                        System.out.println("失敗: " + ex.getMessage());
                        return 0;
                    } else {
                        System.out.println("成功: " + value);
                        return value * 2;
                    }
                });

        Integer result = future.get();
        System.out.println("最終結果 = " + result);
    }
}
Java

成功と失敗を「同じ場所でまとめてハンドルしたい」場合に、handle は便利です。


allOf / anyOf:複数の CompletableFuture をまとめる

最後に、複数の CompletableFuture をまとめて待つユーティリティも少し触れておきます。

allOf:全部終わるのを待つ

CompletableFuture.allOf(f1, f2, f3, ...) は、
渡した全ての CompletableFuture が完了したときに完了する CompletableFuture<Void> を返します。

import java.util.concurrent.CompletableFuture;

public class AllOfBasic {
    public static void main(String[] args) throws Exception {
        CompletableFuture<Void> f1 = CompletableFuture.runAsync(() -> sleepAndLog(1000, "A"));
        CompletableFuture<Void> f2 = CompletableFuture.runAsync(() -> sleepAndLog(1500, "B"));
        CompletableFuture<Void> f3 = CompletableFuture.runAsync(() -> sleepAndLog(500, "C"));

        CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);

        System.out.println("全部終わるのを待つ…");
        all.get();
        System.out.println("全タスク完了");
    }

    static void sleepAndLog(long millis, String name) {
        try {
            Thread.sleep(millis);
            System.out.println(name + " 完了");
        } catch (InterruptedException e) {
            Thread.currentThread().interrupt();
        }
    }
}
Java

各タスクの結果が必要な場合は、all.get() の後に、
f1.join(), f2.join() のように個別に取り出します。

anyOf:どれか一つ終わるのを待つ

anyOf は、「最初に完了したもの」を待つためのユーティリティです。

CompletableFuture<Object> any =
        CompletableFuture.anyOf(f1, f2, f3);

Object firstResult = any.get();
Java

「どれか一つの結果が得られれば十分」という状況で使えます。


まとめ:CompletableFuture 基礎を自分の中でこう位置づける

CompletableFuture を初心者向けにまとめると、

「Future に“完了後の処理チェーン”と“例外処理”と“複数組み合わせ”をする力を足した、非同期処理のための強化版 Future」

です。

押さえておきたいポイントを整理すると、

非同期で値を作るには supplyAsync、戻り値なしなら runAsync
終わったら何かしたいときに thenApply(変換)、thenAccept(消費)、thenRun(通知だけ)。
非同期どうしをつなぐときに thenCompose(平らにする)や thenCombine(並列結果の合成)。
エラー時に exceptionallyhandle でフォールバックやログ。
複数の非同期をまとめる allOf / anyOf

まずは、

  • supplyAsync で非同期処理を作る
  • thenApplythenAccept をつなげて「終わったら何をするか」を書いてみる
  • exceptionally でエラー時の挙動を変えてみる

この 3 つに慣れるのがおすすめです。

タイトルとURLをコピーしました