ねらいと前提
CSV をオブジェクトへ変換する ETL は「読み取り → クリーニング → 変換 → 検証 → 出力」の流れです。Stream を使えば、行を遅延で処理しながら宣言的に書けるため、巨大ファイルでもメモリピークを一定に保てます。重要なのは、ヘッダー処理、区切り・引用符の扱い、型変換の安全化(Optional/try-catch)、そしてバリデーションを早い段階で行い、無効行を自然に落とすことです。
最小のパイプライン(ヘッダーを飛ばしてレコード化)
基本の読み取りとマッピング
import java.nio.file.*;
import java.io.*;
import java.util.*;
import java.util.stream.*;
record User(long id, String name, int age) {}
static Optional<User> parseUser(String line) {
try {
String[] a = line.split(",", -1); // 空フィールドも残す
long id = Long.parseLong(a[0].trim());
String name = a[1].trim();
int age = Integer.parseInt(a[2].trim());
return Optional.of(new User(id, name, age));
} catch (RuntimeException e) {
return Optional.empty(); // 変換失敗は捨てる
}
}
static List<User> readUsers(Path path) throws IOException {
try (Stream<String> lines = Files.lines(path)) {
return lines
.skip(1) // ヘッダーを飛ばす
.map(String::trim)
.filter(l -> !l.isEmpty())
.map(line -> parseUser(line))
.flatMap(Optional::stream) // 失敗行は自然に除外
.toList();
}
}
Javaこの形は「無効行を Optional.empty で落とす」ため、例外で処理が止まりません。まずはここから。
例: 即時出力で材質化ゼロ(大規模向け)
try (var w = Files.newBufferedWriter(Paths.get("out.json"));
Stream<String> lines = Files.lines(Paths.get("users.csv"))) {
lines.skip(1)
.map(String::trim)
.filter(l -> !l.isEmpty())
.map(CsvETL::parseUser)
.flatMap(Optional::stream)
.forEach(u -> {
try { w.write("%d,%s,%d%n".formatted(u.id(), u.name(), u.age())); }
catch (IOException e) { throw new UncheckedIOException(e); }
});
}
Java逐次書き出しでピークメモリをほぼ一定に保てます。
実務品質のパーサー設計(引用符・エスケープ・欠損)
引用符・カンマを安全に扱う簡易パーサー
split は「引用符内のカンマ」に弱いので、基本的な引用規則に対応した軽量パーサーを用意します。
static List<String> parseCsvLine(String line) {
List<String> out = new ArrayList<>();
StringBuilder sb = new StringBuilder();
boolean quoted = false;
for (int i = 0; i < line.length(); i++) {
char c = line.charAt(i);
if (c == '"') {
// 連続の二重引用はエスケープ("" -> ")
if (quoted && i + 1 < line.length() && line.charAt(i + 1) == '"') {
sb.append('"'); i++;
} else {
quoted = !quoted;
}
} else if (c == ',' && !quoted) {
out.add(sb.toString()); sb.setLength(0);
} else {
sb.append(c);
}
}
out.add(sb.toString());
return out;
}
static Optional<User> parseUserQuoted(String line) {
try {
List<String> a = parseCsvLine(line);
long id = Long.parseLong(a.get(0).trim());
String name = a.get(1).trim();
int age = Integer.parseInt(a.get(2).trim());
return Optional.of(new User(id, name, age));
} catch (RuntimeException e) {
return Optional.empty();
}
}
Java必要十分のルール(ダブルクオート、エスケープ、カンマ)に対応し、現場の CSV に耐えます。
ヘッダーによる位置解決(列順が変わる想定)
record Header(int idIdx, int nameIdx, int ageIdx) {}
static Header parseHeader(String headerLine) {
List<String> h = parseCsvLine(headerLine).stream().map(String::trim).toList();
return new Header(h.indexOf("id"), h.indexOf("name"), h.indexOf("age"));
}
static Optional<User> parseUserByHeader(String line, Header h) {
try {
List<String> a = parseCsvLine(line);
long id = Long.parseLong(a.get(h.idIdx()).trim());
String name = a.get(h.nameIdx()).trim();
int age = Integer.parseInt(a.get(h.ageIdx()).trim());
return Optional.of(new User(id, name, age));
} catch (RuntimeException e) {
return Optional.empty();
}
}
Java列順や追加列に強くなり、スキーマ変更に耐えられます。
バリデーション・エラーハンドリング(品質担保)
早い段階で検証して落とす
static boolean isValid(User u) {
return u.id() > 0 && !u.name().isBlank() && u.age() >= 0 && u.age() <= 120;
}
static List<User> readValidated(Path p) throws IOException {
try (Stream<String> lines = Files.lines(p)) {
String header = lines.findFirst().orElse("");
}
try (Stream<String> lines = Files.lines(p)) {
Header h = parseHeader(lines.findFirst().orElse(""));
}
try (Stream<String> lines = Files.lines(p)) {
Header h = parseHeader(Files.lines(p).findFirst().orElse(""));
return Files.lines(p).skip(1)
.map(String::trim)
.filter(l -> !l.isEmpty())
.map(l -> parseUserByHeader(l, h))
.flatMap(Optional::stream)
.filter(CsvETL::isValid) // 検証通過のみ
.toList();
}
}
Java検証は早く・軽く。落とす基準を明確にして、後段の負荷を下げます。
失敗行のログ収集(デバッグ・監査)
record ParseResult(Optional<User> user, String raw, String error) {}
static ParseResult tryParse(String line, Header h) {
try {
return new ParseResult(parseUserByHeader(line, h), line, "");
} catch (RuntimeException e) {
return new ParseResult(Optional.empty(), line, e.getMessage());
}
}
static void importWithReport(Path p) throws IOException {
Header h = parseHeader(Files.lines(p).findFirst().orElse(""));
try (Stream<String> lines = Files.lines(p)) {
var results = lines.skip(1)
.map(l -> tryParse(l, h))
.toList();
var ok = results.stream().flatMap(r -> r.user().stream()).toList();
var ng = results.stream().filter(r -> r.user().isEmpty()).toList();
System.out.println("OK=" + ok.size() + ", NG=" + ng.size());
}
}
Java成功と失敗を分けて可視化すると、運用が安定します。
例題で身につける ETL
例題 1:CSV → ドメイン → JSON 行の逐次変換
record Order(long id, String product, int qty, double price) {}
static Optional<Order> parseOrder(String line, Header h) {
try {
List<String> a = parseCsvLine(line);
long id = Long.parseLong(a.get(h.idIdx()).trim());
String product = a.get(h.nameIdx()).trim();
int qty = Integer.parseInt(a.get(h.ageIdx()).trim());
double price = Double.parseDouble(a.get(h.ageIdx()+1).trim());
return Optional.of(new Order(id, product, qty, price));
} catch (RuntimeException e) { return Optional.empty(); }
}
static void ordersToJson(Path in, Path out) throws IOException {
Header h = parseHeader(Files.lines(in).findFirst().orElse(""));
try (var w = Files.newBufferedWriter(out);
Stream<String> lines = Files.lines(in)) {
lines.skip(1)
.map(l -> parseOrder(l, h))
.flatMap(Optional::stream)
.forEach(o -> {
String json = "{\"id\":" + o.id() + ",\"product\":\"" + o.product()
+ "\",\"qty\":" + o.qty() + ",\"price\":" + o.price() + "}";
try { w.write(json); w.newLine(); }
catch (IOException e) { throw new UncheckedIOException(e); }
});
}
}
Java材質化せずに行ごと変換・出力。ETL の王道です。
例題 2:巨大 CSV のページング処理(行数制御)
static void processInChunks(Path p, int chunkSize) throws IOException {
try (Stream<String> lines = Files.lines(p)) {
Iterator<String> it = lines.iterator();
while (true) {
List<String> buf = new ArrayList<>(chunkSize);
for (int i = 0; i < chunkSize && it.hasNext(); i++) buf.add(it.next());
if (buf.isEmpty()) break;
buf.stream()
.skip(buf.get(0).startsWith("id") ? 1 : 0) // 先頭ならヘッダー除去
.map(String::trim)
.filter(l -> !l.isEmpty())
.map(CsvETL::parseUserQuoted)
.flatMap(Optional::stream)
.forEach(u -> {/* 書き出し・集計 */});
}
}
}
Javaチャンクでピークを制御しつつ、Stream の宣言性を活用します。
深掘り:正しさ・性能・保守性の勘どころ
遅延評価を前提に「逐次出力」へ
- ドライバー: 終端が処理を動かす。forEach で外部出力すれば、行単位で処理・解放が進む。
- メモリ: toList/toMap は必要最小限。巨大 CSV は逐次が基本。
早い段階の絞り込みと短絡
- 軽いフィルタ: 空行・コメント行除去は前段に。
- 短絡: limit や takeWhile を使える場合は早めに止める。検証用のサンプル抽出にも有効。
例外と Optional の線引き
- Optional: 想定内の欠損・不正値を自然に落とす。
- 例外: フォーマット破綻や I/O など「想定外」は UncheckedIOException で伝搬させる。失敗行は別レポートへ。
ヘッダー駆動のスキーマ管理
- 位置解決: ヘッダーから列インデックスを取ると、列順変更に強い。
- 存在検証: 必須列が無い場合は即座にエラー化し、早期に失敗させる。
ライブラリの採用判断
- 自前パーサー: シンプル CSV、速度重視、依存削減。
- ライブラリ: 区切り変更・複雑な引用・改行含むフィールドなどが出るなら、Apache Commons CSV や OpenCSV を検討。Stream へは Parser を Iterator/Iterable として渡し、StreamSupport で包むと宣言的に書ける。
テンプレート(そのまま使える雛形)
ヘッダーあり CSV → レコード
static <T> List<T> readCsv(Path path,
java.util.function.Function<String, List<String>> parse,
java.util.function.Function<List<String>, Optional<T>> mapWithHeader) throws IOException {
try (Stream<String> lines = Files.lines(path)) {
List<String> all = lines.toList();
if (all.isEmpty()) return List.of();
List<String> header = parse.apply(all.get(0));
return all.stream().skip(1)
.map(parse)
.map(cols -> mapWithHeader.apply(cols))
.flatMap(Optional::stream)
.toList();
}
}
Java簡易 CSV パーサー(引用対応)
static List<String> parseCsvLineSimple(String line) {
List<String> out = new ArrayList<>();
StringBuilder sb = new StringBuilder();
boolean quoted = false;
for (int i = 0; i < line.length(); i++) {
char c = line.charAt(i);
if (c == '"') {
if (quoted && i + 1 < line.length() && line.charAt(i + 1) == '"') { sb.append('"'); i++; }
else quoted = !quoted;
} else if (c == ',' && !quoted) {
out.add(sb.toString()); sb.setLength(0);
} else sb.append(c);
}
out.add(sb.toString());
return out;
}
JavaOptional ベースの安全マッピング
static <T> Optional<T> safeMap(List<String> cols,
java.util.function.Function<List<String>, T> f) {
try { return Optional.of(f.apply(cols)); }
catch (RuntimeException e) { return Optional.empty(); }
}
Java逐次 ETL(材質化ゼロ)
static <T> void etl(Path in, java.util.function.Function<String, Optional<T>> parse, java.util.function.Consumer<T> sink) throws IOException {
try (Stream<String> lines = Files.lines(in)) {
lines.skip(1)
.map(String::trim)
.filter(l -> !l.isEmpty())
.map(parse)
.flatMap(Optional::stream)
.forEach(sink);
}
}
Javaまとめ
Stream を使った CSV → オブジェクト変換は、遅延評価で「行を一つずつ処理する」設計が本質です。ヘッダーで列位置を解決し、引用・カンマを安全にパースし、Optional で無効行を自然に落とす。終端は forEach(外部出力)でメモリピークを一定に、バリデーションは前段で早く・軽く。自前で足りなければライブラリを包んで Stream 化する。この作法を押さえれば、ETL は短く、読みやすく、そして巨大 CSV に対しても強くなります。
