Java 逆引き集 | Stream を使った CSV → オブジェクト変換 — ETL の簡潔化

Java Java
スポンサーリンク

ねらいと前提

CSV をオブジェクトへ変換する ETL は「読み取り → クリーニング → 変換 → 検証 → 出力」の流れです。Stream を使えば、行を遅延で処理しながら宣言的に書けるため、巨大ファイルでもメモリピークを一定に保てます。重要なのは、ヘッダー処理、区切り・引用符の扱い、型変換の安全化(Optional/try-catch)、そしてバリデーションを早い段階で行い、無効行を自然に落とすことです。


最小のパイプライン(ヘッダーを飛ばしてレコード化)

基本の読み取りとマッピング

import java.nio.file.*;
import java.io.*;
import java.util.*;
import java.util.stream.*;

record User(long id, String name, int age) {}

static Optional<User> parseUser(String line) {
    try {
        String[] a = line.split(",", -1); // 空フィールドも残す
        long id = Long.parseLong(a[0].trim());
        String name = a[1].trim();
        int age = Integer.parseInt(a[2].trim());
        return Optional.of(new User(id, name, age));
    } catch (RuntimeException e) {
        return Optional.empty(); // 変換失敗は捨てる
    }
}

static List<User> readUsers(Path path) throws IOException {
    try (Stream<String> lines = Files.lines(path)) {
        return lines
            .skip(1)                               // ヘッダーを飛ばす
            .map(String::trim)
            .filter(l -> !l.isEmpty())
            .map(line -> parseUser(line))
            .flatMap(Optional::stream)             // 失敗行は自然に除外
            .toList();
    }
}
Java

この形は「無効行を Optional.empty で落とす」ため、例外で処理が止まりません。まずはここから。

例: 即時出力で材質化ゼロ(大規模向け)

try (var w = Files.newBufferedWriter(Paths.get("out.json"));
     Stream<String> lines = Files.lines(Paths.get("users.csv"))) {

    lines.skip(1)
         .map(String::trim)
         .filter(l -> !l.isEmpty())
         .map(CsvETL::parseUser)
         .flatMap(Optional::stream)
         .forEach(u -> {
             try { w.write("%d,%s,%d%n".formatted(u.id(), u.name(), u.age())); }
             catch (IOException e) { throw new UncheckedIOException(e); }
         });
}
Java

逐次書き出しでピークメモリをほぼ一定に保てます。


実務品質のパーサー設計(引用符・エスケープ・欠損)

引用符・カンマを安全に扱う簡易パーサー

split は「引用符内のカンマ」に弱いので、基本的な引用規則に対応した軽量パーサーを用意します。

static List<String> parseCsvLine(String line) {
    List<String> out = new ArrayList<>();
    StringBuilder sb = new StringBuilder();
    boolean quoted = false;
    for (int i = 0; i < line.length(); i++) {
        char c = line.charAt(i);
        if (c == '"') {
            // 連続の二重引用はエスケープ("" -> ")
            if (quoted && i + 1 < line.length() && line.charAt(i + 1) == '"') {
                sb.append('"'); i++;
            } else {
                quoted = !quoted;
            }
        } else if (c == ',' && !quoted) {
            out.add(sb.toString()); sb.setLength(0);
        } else {
            sb.append(c);
        }
    }
    out.add(sb.toString());
    return out;
}

static Optional<User> parseUserQuoted(String line) {
    try {
        List<String> a = parseCsvLine(line);
        long id = Long.parseLong(a.get(0).trim());
        String name = a.get(1).trim();
        int age = Integer.parseInt(a.get(2).trim());
        return Optional.of(new User(id, name, age));
    } catch (RuntimeException e) {
        return Optional.empty();
    }
}
Java

必要十分のルール(ダブルクオート、エスケープ、カンマ)に対応し、現場の CSV に耐えます。

ヘッダーによる位置解決(列順が変わる想定)

record Header(int idIdx, int nameIdx, int ageIdx) {}

static Header parseHeader(String headerLine) {
    List<String> h = parseCsvLine(headerLine).stream().map(String::trim).toList();
    return new Header(h.indexOf("id"), h.indexOf("name"), h.indexOf("age"));
}

static Optional<User> parseUserByHeader(String line, Header h) {
    try {
        List<String> a = parseCsvLine(line);
        long id = Long.parseLong(a.get(h.idIdx()).trim());
        String name = a.get(h.nameIdx()).trim();
        int age = Integer.parseInt(a.get(h.ageIdx()).trim());
        return Optional.of(new User(id, name, age));
    } catch (RuntimeException e) {
        return Optional.empty();
    }
}
Java

列順や追加列に強くなり、スキーマ変更に耐えられます。


バリデーション・エラーハンドリング(品質担保)

早い段階で検証して落とす

static boolean isValid(User u) {
    return u.id() > 0 && !u.name().isBlank() && u.age() >= 0 && u.age() <= 120;
}

static List<User> readValidated(Path p) throws IOException {
    try (Stream<String> lines = Files.lines(p)) {
        String header = lines.findFirst().orElse("");
    }
    try (Stream<String> lines = Files.lines(p)) {
        Header h = parseHeader(lines.findFirst().orElse(""));
    }
    try (Stream<String> lines = Files.lines(p)) {
        Header h = parseHeader(Files.lines(p).findFirst().orElse(""));
        return Files.lines(p).skip(1)
            .map(String::trim)
            .filter(l -> !l.isEmpty())
            .map(l -> parseUserByHeader(l, h))
            .flatMap(Optional::stream)
            .filter(CsvETL::isValid) // 検証通過のみ
            .toList();
    }
}
Java

検証は早く・軽く。落とす基準を明確にして、後段の負荷を下げます。

失敗行のログ収集(デバッグ・監査)

record ParseResult(Optional<User> user, String raw, String error) {}

static ParseResult tryParse(String line, Header h) {
    try {
        return new ParseResult(parseUserByHeader(line, h), line, "");
    } catch (RuntimeException e) {
        return new ParseResult(Optional.empty(), line, e.getMessage());
    }
}

static void importWithReport(Path p) throws IOException {
    Header h = parseHeader(Files.lines(p).findFirst().orElse(""));
    try (Stream<String> lines = Files.lines(p)) {
        var results = lines.skip(1)
            .map(l -> tryParse(l, h))
            .toList();

        var ok = results.stream().flatMap(r -> r.user().stream()).toList();
        var ng = results.stream().filter(r -> r.user().isEmpty()).toList();

        System.out.println("OK=" + ok.size() + ", NG=" + ng.size());
    }
}
Java

成功と失敗を分けて可視化すると、運用が安定します。


例題で身につける ETL

例題 1:CSV → ドメイン → JSON 行の逐次変換

record Order(long id, String product, int qty, double price) {}

static Optional<Order> parseOrder(String line, Header h) {
    try {
        List<String> a = parseCsvLine(line);
        long id = Long.parseLong(a.get(h.idIdx()).trim());
        String product = a.get(h.nameIdx()).trim();
        int qty = Integer.parseInt(a.get(h.ageIdx()).trim());
        double price = Double.parseDouble(a.get(h.ageIdx()+1).trim());
        return Optional.of(new Order(id, product, qty, price));
    } catch (RuntimeException e) { return Optional.empty(); }
}

static void ordersToJson(Path in, Path out) throws IOException {
    Header h = parseHeader(Files.lines(in).findFirst().orElse(""));
    try (var w = Files.newBufferedWriter(out);
         Stream<String> lines = Files.lines(in)) {
        lines.skip(1)
             .map(l -> parseOrder(l, h))
             .flatMap(Optional::stream)
             .forEach(o -> {
                 String json = "{\"id\":" + o.id() + ",\"product\":\"" + o.product()
                         + "\",\"qty\":" + o.qty() + ",\"price\":" + o.price() + "}";
                 try { w.write(json); w.newLine(); }
                 catch (IOException e) { throw new UncheckedIOException(e); }
             });
    }
}
Java

材質化せずに行ごと変換・出力。ETL の王道です。

例題 2:巨大 CSV のページング処理(行数制御)

static void processInChunks(Path p, int chunkSize) throws IOException {
    try (Stream<String> lines = Files.lines(p)) {
        Iterator<String> it = lines.iterator();
        while (true) {
            List<String> buf = new ArrayList<>(chunkSize);
            for (int i = 0; i < chunkSize && it.hasNext(); i++) buf.add(it.next());
            if (buf.isEmpty()) break;
            buf.stream()
               .skip(buf.get(0).startsWith("id") ? 1 : 0) // 先頭ならヘッダー除去
               .map(String::trim)
               .filter(l -> !l.isEmpty())
               .map(CsvETL::parseUserQuoted)
               .flatMap(Optional::stream)
               .forEach(u -> {/* 書き出し・集計 */});
        }
    }
}
Java

チャンクでピークを制御しつつ、Stream の宣言性を活用します。


深掘り:正しさ・性能・保守性の勘どころ

遅延評価を前提に「逐次出力」へ

  • ドライバー: 終端が処理を動かす。forEach で外部出力すれば、行単位で処理・解放が進む。
  • メモリ: toList/toMap は必要最小限。巨大 CSV は逐次が基本。

早い段階の絞り込みと短絡

  • 軽いフィルタ: 空行・コメント行除去は前段に。
  • 短絡: limit や takeWhile を使える場合は早めに止める。検証用のサンプル抽出にも有効。

例外と Optional の線引き

  • Optional: 想定内の欠損・不正値を自然に落とす。
  • 例外: フォーマット破綻や I/O など「想定外」は UncheckedIOException で伝搬させる。失敗行は別レポートへ。

ヘッダー駆動のスキーマ管理

  • 位置解決: ヘッダーから列インデックスを取ると、列順変更に強い。
  • 存在検証: 必須列が無い場合は即座にエラー化し、早期に失敗させる。

ライブラリの採用判断

  • 自前パーサー: シンプル CSV、速度重視、依存削減。
  • ライブラリ: 区切り変更・複雑な引用・改行含むフィールドなどが出るなら、Apache Commons CSV や OpenCSV を検討。Stream へは Parser を Iterator/Iterable として渡し、StreamSupport で包むと宣言的に書ける。

テンプレート(そのまま使える雛形)

ヘッダーあり CSV → レコード

static <T> List<T> readCsv(Path path,
                           java.util.function.Function<String, List<String>> parse,
                           java.util.function.Function<List<String>, Optional<T>> mapWithHeader) throws IOException {
    try (Stream<String> lines = Files.lines(path)) {
        List<String> all = lines.toList();
        if (all.isEmpty()) return List.of();
        List<String> header = parse.apply(all.get(0));
        return all.stream().skip(1)
            .map(parse)
            .map(cols -> mapWithHeader.apply(cols))
            .flatMap(Optional::stream)
            .toList();
    }
}
Java

簡易 CSV パーサー(引用対応)

static List<String> parseCsvLineSimple(String line) {
    List<String> out = new ArrayList<>();
    StringBuilder sb = new StringBuilder();
    boolean quoted = false;
    for (int i = 0; i < line.length(); i++) {
        char c = line.charAt(i);
        if (c == '"') {
            if (quoted && i + 1 < line.length() && line.charAt(i + 1) == '"') { sb.append('"'); i++; }
            else quoted = !quoted;
        } else if (c == ',' && !quoted) {
            out.add(sb.toString()); sb.setLength(0);
        } else sb.append(c);
    }
    out.add(sb.toString());
    return out;
}
Java

Optional ベースの安全マッピング

static <T> Optional<T> safeMap(List<String> cols,
                               java.util.function.Function<List<String>, T> f) {
    try { return Optional.of(f.apply(cols)); }
    catch (RuntimeException e) { return Optional.empty(); }
}
Java

逐次 ETL(材質化ゼロ)

static <T> void etl(Path in, java.util.function.Function<String, Optional<T>> parse, java.util.function.Consumer<T> sink) throws IOException {
    try (Stream<String> lines = Files.lines(in)) {
        lines.skip(1)
             .map(String::trim)
             .filter(l -> !l.isEmpty())
             .map(parse)
             .flatMap(Optional::stream)
             .forEach(sink);
    }
}
Java

まとめ

Stream を使った CSV → オブジェクト変換は、遅延評価で「行を一つずつ処理する」設計が本質です。ヘッダーで列位置を解決し、引用・カンマを安全にパースし、Optional で無効行を自然に落とす。終端は forEach(外部出力)でメモリピークを一定に、バリデーションは前段で早く・軽く。自前で足りなければライブラリを包んで Stream 化する。この作法を押さえれば、ETL は短く、読みやすく、そして巨大 CSV に対しても強くなります。

タイトルとURLをコピーしました