ねらいと基本方針
データ整形パイプラインは「取り込み → 正規化 → 検証 → 集約/変換 → 出力」の流れを、安全に、短く、遅延評価で組み立てる設計です。Stream を軸にすると、巨大データでもメモリピークを一定に保ちつつ、段階を明確に分離できます。重要ポイントは、上流で絞る(filter)、短絡で止める(limit, findFirst など)、重い処理は後段へ、材質化は最小に、外部出力は終端で逐次行うことです。
パイプラインの骨格(段階分離と再利用)
段階の分離と名前付け
各段階に意味のある名前を付けると、意図が明確になり保守性が上がります。関数化して再利用可能にしましょう。
Function<String, String> trim = String::trim;
Predicate<String> nonBlank = s -> !s.isBlank();
Function<String, Record> toRecord = CsvParser::parse; // 例:CSV→ドメイン
Stream<Record> pipeline(Stream<String> lines) {
return lines
.map(trim)
.filter(nonBlank)
.map(toRecord)
.filter(Validator::valid); // 例:ドメイン検証
}
Java変換と検証を上流に置くことで、後段の重い処理へ進む要素数を減らします。
Supplier で同じ上流を再生成
同じ抽出条件に対して複数の終端(count, toList, 書き出しなど)が必要な場合、ストリームは一回性なので再生成にします。
Supplier<Stream<Record>> src = () -> Files.lines(path)
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(CsvParser::parse)
.filter(Validator::valid);
long n = src.get().count();
List<Record> first100 = src.get().limit(100).toList();
Java入力の取り込みと前処理(軽く絞る・早く止める)
I/O を終端へ寄せる逐次処理
巨大ファイルは行を遅延で供給し、終端で逐次書き出すのが基本です。材質化を避けてピークメモリを一定に保ちます。
try (var w = Files.newBufferedWriter(Paths.get("out.txt"));
Stream<String> lines = Files.lines(Paths.get("in.txt"))) {
lines
.map(String::trim)
.filter(s -> !s.isEmpty())
.map(CsvParser::parse) // 軽いパース(Optional にして失敗行は落とすのも可)
.filter(Validator::valid)
.map(Formatter::toOutputLine)
.forEach(line -> {
try { w.write(line); w.newLine(); }
catch (IOException e) { throw new UncheckedIOException(e); }
});
}
Java短絡で仕事を減らす
検証用に先頭のサンプルだけ見るなど、limit を前段に置いて処理を早く止めます。
List<Record> preview = Files.lines(path)
.map(String::trim).filter(s -> !s.isEmpty())
.map(CsvParser::parse).filter(Validator::valid)
.limit(50)
.toList();
Java変換・検証・集約の設計(安全性と性能)
Optional ベースの安全変換
失敗し得る変換は Optional を返し、flatMap(Optional::stream) で自然に落とします。例外で全体を止めないのがコツです。
static Optional<Record> safeParse(String line) {
try { return Optional.of(CsvParser.parse(line)); }
catch (RuntimeException e) { return Optional.empty(); }
}
List<Record> records = Files.lines(path)
.map(String::trim).filter(s -> !s.isEmpty())
.map(App::safeParse)
.flatMap(Optional::stream)
.filter(Validator::valid)
.toList();
Java集約は結合可能に(counting/summing/reducing)
集計は結合律を満たす演算で記述し、並列にも耐える形にします。
long errorCount = lines.stream()
.map(CsvParser::parse)
.map(Validator::errors) // 例:List<Error>
.mapToLong(List::size)
.sum();
Map<String, Long> freqByType = records.stream()
.collect(Collectors.groupingBy(Record::type, Collectors.counting()));
Javaソートと重い処理は後段へ
sorted や複雑な map は後段に置き、先にフィルタと短絡で対象を絞ることで仕事量を減らします。
List<String> topNames = records.stream()
.filter(r -> r.score() >= 80)
.sorted(Comparator.comparingInt(Record::score).reversed())
.limit(100)
.map(Record::name)
.toList();
Java出力段とエラーレポート(材質化最小・見える化)
複数出力をまとめて行う
終端は複数並列させず、必要なら一度だけ上流を流して分岐出力します。行を見ながら各シンクへ流すとピークは一定です。
try (var wOk = Files.newBufferedWriter(Paths.get("ok.csv"));
var wNg = Files.newBufferedWriter(Paths.get("ng.csv"));
Stream<String> lines = Files.lines(path)) {
lines.forEach(line -> {
Optional<Record> rec = safeParse(line);
if (rec.filter(Validator::valid).isPresent()) {
try { wOk.write(Formatter.toCsv(rec.get())); wOk.newLine(); }
catch (IOException e) { throw new UncheckedIOException(e); }
} else {
try { wNg.write(line); wNg.newLine(); }
catch (IOException e) { throw new UncheckedIOException(e); }
}
});
}
Javaデバッグ用のサンプルと統計
件数とサンプルを同時に取ると、問題の所在が見えやすくなります。
record Debug<T>(List<T> sample, long count) {}
Debug<Record> dbg = records.stream()
.collect(Collectors.teeing(
Collectors.limit(20).collect(Collectors.toList()), // 先頭20
Collectors.counting(),
Debug::new
));
Java実例で学ぶパイプライン設計
例題 1:ユーザ CSV から DTO へ変換し、上位スコアのレポート出力
record User(long id, String name, int score) {}
static Optional<User> parseUser(String line) { /* 安全パース */ }
Path in = Paths.get("users.csv");
try (var w = Files.newBufferedWriter(Paths.get("top_users.txt"));
Stream<String> lines = Files.lines(in)) {
lines.skip(1) // ヘッダー
.map(String::trim).filter(s -> !s.isEmpty())
.map(App::parseUser).flatMap(Optional::stream)
.filter(u -> u.score() >= 80)
.sorted(Comparator.comparingInt(User::score).reversed())
.limit(100)
.map(u -> "%d,%s,%d".formatted(u.id(), u.name(), u.score()))
.forEach(s -> {
try { w.write(s); w.newLine(); }
catch (IOException e) { throw new UncheckedIOException(e); }
});
}
Java短絡と遅延で巨大入力にも耐え、整形は終端で逐次出力します。
例題 2:ログ整形パイプライン(クレンジング→分類→集計)
record Log(String level, String msg) {}
Function<String, Optional<Log>> parseLog = line -> {
String t = line.trim();
if (t.isEmpty()) return Optional.empty();
int sp = t.indexOf(' ');
if (sp <= 0) return Optional.empty();
return Optional.of(new Log(t.substring(0, sp), t.substring(sp + 1)));
};
Path log = Paths.get("app.log");
Map<String, Long> perLevel;
try (Stream<String> lines = Files.lines(log)) {
perLevel = lines
.map(parseLog).flatMap(Optional::stream)
.filter(l -> List.of("INFO","WARN","ERROR").contains(l.level()))
.collect(Collectors.groupingBy(Log::level, Collectors.counting()));
}
Java可視化は集計後に別途行えばよく、パイプライン自体は純粋に保ちます。
深掘り:正しさ・性能・保守性の勘所
遅延評価を前提に「上流で削る・後段で重くする」
終端が処理を動かします。フィルタと短絡で対象を絞り、重い map や sorted は可能な限り後段へ。これだけで無駄が劇的に減ります。
副作用は終端へ限定し、中間は純粋に
中間操作に外部状態の更新を入れると、短絡や並列で壊れます。外部への書き出しやメトリクス送信は forEach の終端に限定します。
並列は計算が重いときだけ、結合可能な集約で
並列は CPU バウンドの純粋計算に効きます。groupingByConcurrent、counting、summing のような結合可能なコレクタを使い、順序に依存しない設計にします。I/O 主体は並列化の恩恵が薄いことを理解して選択します。
スキーマ変化に強い設計(ヘッダー駆動・正規化)
ヘッダーから列位置を決める、ID/キーの正規化(trim, lower-case)を前段で行うと、データ整形パイプラインはスキーマ変更や揺れに強くなります。
エラーは Optional で落とすか、境界で例外化
想定内の欠損や軽微な不正は Optional で落とす。致命的なフォーマット破綻や I/O は UncheckedIOException で境界から外へ伝える。この線引きでパイプラインは強靭になります。
テンプレート(そのまま使える雛形)
段階分離の基本パイプライン
Stream<T> pipeline(Stream<S> in,
Function<S, Optional<T>> parse,
Predicate<T> valid,
UnaryOperator<Stream<T>> transform) {
return in
.map(s -> parse.apply(s))
.flatMap(Optional::stream)
.filter(valid)
.let(transform); // ※ Java には let はないので適宜置き換え
}
Java安全変換と逐次出力
static <S,T> void etl(Stream<S> in,
Function<S, Optional<T>> parse,
Predicate<T> valid,
Consumer<T> sink) {
in.map(parse).flatMap(Optional::stream)
.filter(valid)
.forEach(sink);
}
Java正規化→検証→トップN→整形
List<R> topN = src.stream()
.map(Normalizer::apply)
.filter(Validator::valid)
.sorted(Comparator.comparingInt(Domain::score).reversed())
.limit(N)
.map(Formatter::toView)
.toList();
Java集計テンプレート(件数・合計)
long count = src.stream()
.filter(Validator::valid)
.count();
int sum = src.stream()
.filter(Validator::valid)
.mapToInt(Domain::value)
.sum();
Javaまとめ
Stream を用いたデータ整形パイプラインの本質は、段階を分離し、上流で削り、短絡で止め、重い処理は後段へ、出力は終端で逐次にすること。Optional を使って失敗を自然に落とし、結合可能な集約で正しさと性能を両立。スキーマ変化に強いヘッダー駆動と正規化、エラーの線引きを整えれば、巨大データでも読みやすく、速く、壊れないパイプラインが組めます。
