Java 逆引き集 | Stream を用いたデータ整形パイプライン設計 — パイプラインアーキテクチャ

Java Java
スポンサーリンク

ねらいと基本方針

データ整形パイプラインは「取り込み → 正規化 → 検証 → 集約/変換 → 出力」の流れを、安全に、短く、遅延評価で組み立てる設計です。Stream を軸にすると、巨大データでもメモリピークを一定に保ちつつ、段階を明確に分離できます。重要ポイントは、上流で絞る(filter)、短絡で止める(limit, findFirst など)、重い処理は後段へ、材質化は最小に、外部出力は終端で逐次行うことです。


パイプラインの骨格(段階分離と再利用)

段階の分離と名前付け

各段階に意味のある名前を付けると、意図が明確になり保守性が上がります。関数化して再利用可能にしましょう。

Function<String, String> trim = String::trim;
Predicate<String> nonBlank = s -> !s.isBlank();
Function<String, Record> toRecord = CsvParser::parse; // 例:CSV→ドメイン

Stream<Record> pipeline(Stream<String> lines) {
    return lines
        .map(trim)
        .filter(nonBlank)
        .map(toRecord)
        .filter(Validator::valid); // 例:ドメイン検証
}
Java

変換と検証を上流に置くことで、後段の重い処理へ進む要素数を減らします。

Supplier で同じ上流を再生成

同じ抽出条件に対して複数の終端(count, toList, 書き出しなど)が必要な場合、ストリームは一回性なので再生成にします。

Supplier<Stream<Record>> src = () -> Files.lines(path)
    .map(String::trim)
    .filter(s -> !s.isEmpty())
    .map(CsvParser::parse)
    .filter(Validator::valid);

long n = src.get().count();
List<Record> first100 = src.get().limit(100).toList();
Java

入力の取り込みと前処理(軽く絞る・早く止める)

I/O を終端へ寄せる逐次処理

巨大ファイルは行を遅延で供給し、終端で逐次書き出すのが基本です。材質化を避けてピークメモリを一定に保ちます。

try (var w = Files.newBufferedWriter(Paths.get("out.txt"));
     Stream<String> lines = Files.lines(Paths.get("in.txt"))) {

    lines
      .map(String::trim)
      .filter(s -> !s.isEmpty())
      .map(CsvParser::parse)      // 軽いパース(Optional にして失敗行は落とすのも可)
      .filter(Validator::valid)
      .map(Formatter::toOutputLine)
      .forEach(line -> {
          try { w.write(line); w.newLine(); }
          catch (IOException e) { throw new UncheckedIOException(e); }
      });
}
Java

短絡で仕事を減らす

検証用に先頭のサンプルだけ見るなど、limit を前段に置いて処理を早く止めます。

List<Record> preview = Files.lines(path)
    .map(String::trim).filter(s -> !s.isEmpty())
    .map(CsvParser::parse).filter(Validator::valid)
    .limit(50)
    .toList();
Java

変換・検証・集約の設計(安全性と性能)

Optional ベースの安全変換

失敗し得る変換は Optional を返し、flatMap(Optional::stream) で自然に落とします。例外で全体を止めないのがコツです。

static Optional<Record> safeParse(String line) {
    try { return Optional.of(CsvParser.parse(line)); }
    catch (RuntimeException e) { return Optional.empty(); }
}

List<Record> records = Files.lines(path)
    .map(String::trim).filter(s -> !s.isEmpty())
    .map(App::safeParse)
    .flatMap(Optional::stream)
    .filter(Validator::valid)
    .toList();
Java

集約は結合可能に(counting/summing/reducing)

集計は結合律を満たす演算で記述し、並列にも耐える形にします。

long errorCount = lines.stream()
    .map(CsvParser::parse)
    .map(Validator::errors)             // 例:List<Error>
    .mapToLong(List::size)
    .sum();

Map<String, Long> freqByType = records.stream()
    .collect(Collectors.groupingBy(Record::type, Collectors.counting()));
Java

ソートと重い処理は後段へ

sorted や複雑な map は後段に置き、先にフィルタと短絡で対象を絞ることで仕事量を減らします。

List<String> topNames = records.stream()
    .filter(r -> r.score() >= 80)
    .sorted(Comparator.comparingInt(Record::score).reversed())
    .limit(100)
    .map(Record::name)
    .toList();
Java

出力段とエラーレポート(材質化最小・見える化)

複数出力をまとめて行う

終端は複数並列させず、必要なら一度だけ上流を流して分岐出力します。行を見ながら各シンクへ流すとピークは一定です。

try (var wOk = Files.newBufferedWriter(Paths.get("ok.csv"));
     var wNg = Files.newBufferedWriter(Paths.get("ng.csv"));
     Stream<String> lines = Files.lines(path)) {

    lines.forEach(line -> {
        Optional<Record> rec = safeParse(line);
        if (rec.filter(Validator::valid).isPresent()) {
            try { wOk.write(Formatter.toCsv(rec.get())); wOk.newLine(); }
            catch (IOException e) { throw new UncheckedIOException(e); }
        } else {
            try { wNg.write(line); wNg.newLine(); }
            catch (IOException e) { throw new UncheckedIOException(e); }
        }
    });
}
Java

デバッグ用のサンプルと統計

件数とサンプルを同時に取ると、問題の所在が見えやすくなります。

record Debug<T>(List<T> sample, long count) {}

Debug<Record> dbg = records.stream()
    .collect(Collectors.teeing(
        Collectors.limit(20).collect(Collectors.toList()), // 先頭20
        Collectors.counting(),
        Debug::new
    ));
Java

実例で学ぶパイプライン設計

例題 1:ユーザ CSV から DTO へ変換し、上位スコアのレポート出力

record User(long id, String name, int score) {}
static Optional<User> parseUser(String line) { /* 安全パース */ }

Path in = Paths.get("users.csv");
try (var w = Files.newBufferedWriter(Paths.get("top_users.txt"));
     Stream<String> lines = Files.lines(in)) {

    lines.skip(1) // ヘッダー
         .map(String::trim).filter(s -> !s.isEmpty())
         .map(App::parseUser).flatMap(Optional::stream)
         .filter(u -> u.score() >= 80)
         .sorted(Comparator.comparingInt(User::score).reversed())
         .limit(100)
         .map(u -> "%d,%s,%d".formatted(u.id(), u.name(), u.score()))
         .forEach(s -> {
             try { w.write(s); w.newLine(); }
             catch (IOException e) { throw new UncheckedIOException(e); }
         });
}
Java

短絡と遅延で巨大入力にも耐え、整形は終端で逐次出力します。

例題 2:ログ整形パイプライン(クレンジング→分類→集計)

record Log(String level, String msg) {}

Function<String, Optional<Log>> parseLog = line -> {
    String t = line.trim();
    if (t.isEmpty()) return Optional.empty();
    int sp = t.indexOf(' ');
    if (sp <= 0) return Optional.empty();
    return Optional.of(new Log(t.substring(0, sp), t.substring(sp + 1)));
};

Path log = Paths.get("app.log");
Map<String, Long> perLevel;
try (Stream<String> lines = Files.lines(log)) {
    perLevel = lines
        .map(parseLog).flatMap(Optional::stream)
        .filter(l -> List.of("INFO","WARN","ERROR").contains(l.level()))
        .collect(Collectors.groupingBy(Log::level, Collectors.counting()));
}
Java

可視化は集計後に別途行えばよく、パイプライン自体は純粋に保ちます。


深掘り:正しさ・性能・保守性の勘所

遅延評価を前提に「上流で削る・後段で重くする」

終端が処理を動かします。フィルタと短絡で対象を絞り、重い map や sorted は可能な限り後段へ。これだけで無駄が劇的に減ります。

副作用は終端へ限定し、中間は純粋に

中間操作に外部状態の更新を入れると、短絡や並列で壊れます。外部への書き出しやメトリクス送信は forEach の終端に限定します。

並列は計算が重いときだけ、結合可能な集約で

並列は CPU バウンドの純粋計算に効きます。groupingByConcurrent、counting、summing のような結合可能なコレクタを使い、順序に依存しない設計にします。I/O 主体は並列化の恩恵が薄いことを理解して選択します。

スキーマ変化に強い設計(ヘッダー駆動・正規化)

ヘッダーから列位置を決める、ID/キーの正規化(trim, lower-case)を前段で行うと、データ整形パイプラインはスキーマ変更や揺れに強くなります。

エラーは Optional で落とすか、境界で例外化

想定内の欠損や軽微な不正は Optional で落とす。致命的なフォーマット破綻や I/O は UncheckedIOException で境界から外へ伝える。この線引きでパイプラインは強靭になります。


テンプレート(そのまま使える雛形)

段階分離の基本パイプライン

Stream<T> pipeline(Stream<S> in,
                   Function<S, Optional<T>> parse,
                   Predicate<T> valid,
                   UnaryOperator<Stream<T>> transform) {
    return in
        .map(s -> parse.apply(s))
        .flatMap(Optional::stream)
        .filter(valid)
        .let(transform); // ※ Java には let はないので適宜置き換え
}
Java

安全変換と逐次出力

static <S,T> void etl(Stream<S> in,
                      Function<S, Optional<T>> parse,
                      Predicate<T> valid,
                      Consumer<T> sink) {
    in.map(parse).flatMap(Optional::stream)
      .filter(valid)
      .forEach(sink);
}
Java

正規化→検証→トップN→整形

List<R> topN = src.stream()
    .map(Normalizer::apply)
    .filter(Validator::valid)
    .sorted(Comparator.comparingInt(Domain::score).reversed())
    .limit(N)
    .map(Formatter::toView)
    .toList();
Java

集計テンプレート(件数・合計)

long count = src.stream()
    .filter(Validator::valid)
    .count();

int sum = src.stream()
    .filter(Validator::valid)
    .mapToInt(Domain::value)
    .sum();
Java

まとめ

Stream を用いたデータ整形パイプラインの本質は、段階を分離し、上流で削り、短絡で止め、重い処理は後段へ、出力は終端で逐次にすること。Optional を使って失敗を自然に落とし、結合可能な集約で正しさと性能を両立。スキーマ変化に強いヘッダー駆動と正規化、エラーの線引きを整えれば、巨大データでも読みやすく、速く、壊れないパイプラインが組めます。

タイトルとURLをコピーしました