Java 逆引き集 | バッチ処理用チャンク分割(partitioning) — メモリ制御

Java Java
スポンサーリンク

バッチ処理用チャンク分割(partitioning) — メモリ制御

大量データを「少しずつ」処理することで、メモリ使用量と失敗リスクを抑えるのがチャンク分割です。固定サイズで区切って読み・変換・書き込みを進めれば、ヒープを圧迫せずに安定運用できます。フレームワークを使わない素朴な実装から、Spring Batch のチャンクモデルまでを初心者向けに整理します。


基本考え方(なぜチャンク分割が効くか)

  • メモリ上限の回避: 100万件を一度に読み込む代わりに、例えば1000件ごとに処理すれば、ピーク使用メモリをほぼチャンクサイズ+αに抑えられる。
  • 失敗時のリカバリ: チャンク単位でコミットすれば、失敗しても巻き戻し範囲が小さい。
  • I/Oの効率化: まとめて書き込む(バルク)ことでストレージやネットワークを効率化できる。
  • フレームワークのモデル: Spring Batch は「Read → Process → Write」をチャンクで繰り返し、信頼性・速度・メモリ効率を高める設計を提供している。

素朴なチャンク分割パターン(フレームワーク無し)

リストを固定サイズに分割

import java.util.*;

public class Partitioner {
    public static <T> List<List<T>> partition(List<T> src, int size) {
        List<List<T>> parts = new ArrayList<>();
        for (int i = 0; i < src.size(); i += size) {
            int end = Math.min(i + size, src.size());
            parts.add(src.subList(i, end)); // ビュー(コピーではない)
        }
        return parts;
    }

    public static void main(String[] args) {
        List<Integer> data = new ArrayList<>();
        for (int i = 1; i <= 23; i++) data.add(i);
        for (List<Integer> chunk : partition(data, 5)) {
            // ここで処理+バルク書き込みなど
            System.out.println(chunk);
        }
    }
}
Java
  • ポイント: subList は元のリストのビュー。処理中に元リストを構造変更しない。必要なら new ArrayList<>(subList) でコピー。

イテレータ駆動(逐次読み込み→チャンク)

import java.util.*;

public class IteratorChunk {
    public static <T> List<T> nextChunk(Iterator<T> it, int size) {
        List<T> chunk = new ArrayList<>(size);
        for (int i = 0; i < size && it.hasNext(); i++) {
            chunk.add(it.next());
        }
        return chunk;
    }

    public static void main(String[] args) {
        Iterator<Integer> it = new ArrayList<>(List.of(1,2,3,4,5,6,7,8,9)).iterator();
        int size = 4;
        List<Integer> chunk;
        while (!(chunk = nextChunk(it, size)).isEmpty()) {
            // チャンク処理&コミット
            System.out.println(chunk);
        }
    }
}
Java
  • ポイント: 入力が巨大でも、必要な分だけ前へ進む。ファイルやDBカーソルに相性が良い。

ストリームをチャンクごとに消費(Collector なしの簡易版)

import java.util.*;
import java.util.stream.*;

public class StreamChunk {
    public static <T> void forEachChunk(Stream<T> stream, int size, java.util.function.Consumer<List<T>> consumer) {
        Iterator<T> it = stream.iterator();
        List<T> buf = new ArrayList<>(size);
        while (it.hasNext()) {
            buf.add(it.next());
            if (buf.size() == size) { consumer.accept(buf); buf = new ArrayList<>(size); }
        }
        if (!buf.isEmpty()) consumer.accept(buf);
    }

    public static void main(String[] args) {
        forEachChunk(IntStream.range(0, 17).boxed(), 6, chunk -> {
            // バルク書き込みやまとめ集計
            System.out.println(chunk);
        });
    }
}
Java
  • ポイント: 遅延読み込み+チャンク単位のアクション。終端で後処理をまとめる。

例題で身につける

例題1: CSVファイルを1000行ずつ読み→DBへバルク挿入

import java.io.*;
import java.util.*;

class CsvBatchInsert {
    static final int CHUNK = 1000;

    public static void main(String[] args) throws Exception {
        try (BufferedReader br = new BufferedReader(new FileReader("data.csv"))) {
            String line;
            List<String[]> batch = new ArrayList<>(CHUNK);
            while ((line = br.readLine()) != null) {
                batch.add(line.split(","));
                if (batch.size() == CHUNK) {
                    bulkInsert(batch);
                    batch.clear();
                }
            }
            if (!batch.isEmpty()) bulkInsert(batch);
        }
    }

    static void bulkInsert(List<String[]> rows) {
        // まとめてDB書き込み(疑似)
        // 実際はPreparedStatementのaddBatch/executeBatch など
        System.out.println("insert " + rows.size());
    }
}
Java
  • ポイント: 読み込みも書き込みもチャンク化して、ピークメモリを抑える。

例題2: API からページング取得→チャンク処理

import java.util.*;

class PagedApi {
    static List<String> fetchPage(int page, int size) {
        // 実際はHTTPで取得。ここではダミー。
        int start = page * size, end = Math.min(start + size, 10500);
        if (start >= end) return List.of();
        List<String> res = new ArrayList<>();
        for (int i = start; i < end; i++) res.add("item-" + i);
        return res;
    }

    public static void main(String[] args) {
        int page = 0, size = 500;
        while (true) {
            List<String> items = fetchPage(page++, size);
            if (items.isEmpty()) break;
            process(items); // チャンク単位で処理
        }
    }

    static void process(List<String> items) {
        // バルクの検証・保存など
        System.out.println("process " + items.size());
    }
}
Java
  • ポイント: 外部ソースがページング対応なら、そのままチャンク分割に使える。

例題3: スライディングウィンドウで集計負荷を一定に

import java.util.*;

class SlidingAvg {
    private final Deque<Integer> dq = new ArrayDeque<>();
    private final int max;
    private long sum = 0;

    SlidingAvg(int max) { this.max = max; }

    void add(int x) {
        dq.addLast(x); sum += x;
        if (dq.size() > max) sum -= dq.pollFirst();
    }
    double avg() { return dq.isEmpty() ? 0.0 : (double) sum / dq.size(); }

    public static void main(String[] args) {
        SlidingAvg s = new SlidingAvg(1000);
        for (int i = 1; i <= 10_000; i++) { s.add(i); }
        System.out.println(s.avg());
    }
}
Java
  • ポイント: 窓サイズでメモリと計算量を制御。

Spring Batch のチャンクモデル(実務フレームワーク)

  • 仕組み: 大量データを「ItemReader → ItemProcessor → ItemWriter」でチャンク単位に処理し、コミット境界を制御する構成。失敗時のリトライやリスタート、ログ・監視の仕組みを備え、信頼性の高いバッチ実装が可能。
  • 利点: チャンクサイズに応じたトランザクション管理、スキップ・リトライ戦略、監視用のリスナー(ChunkListener)などを標準提供。
  • 導入の目安: スケジューリングや再実行、業務的なエラーハンドリングが重要なら、素朴実装よりフレームワークの採用が保守性・安全性で有利。

テンプレート集(そのまま使える)

  • 単純リスト分割
List<List<T>> parts = new ArrayList<>();
for (int i = 0; i < src.size(); i += size) {
    parts.add(src.subList(i, Math.min(i + size, src.size())));
}
Java
  • 逐次チャンク読み(イテレータ)
List<T> chunk = new ArrayList<>(size);
while (it.hasNext()) {
    chunk.add(it.next());
    if (chunk.size() == size) { consume(chunk); chunk.clear(); }
}
if (!chunk.isEmpty()) consume(chunk);
Java
  • ファイル読み+バルク書き
try (var br = new java.io.BufferedReader(new java.io.FileReader(path))) {
    List<Row> batch = new ArrayList<>(N);
    for (String line; (line = br.readLine()) != null; ) {
        batch.add(parse(line));
        if (batch.size() == N) { writeBulk(batch); batch.clear(); }
    }
    if (!batch.isEmpty()) writeBulk(batch);
}
Java
  • Spring Batch(概念)
// Reader -> Processor -> Writer をchunk(N)で構成し、commit境界を設定
// 失敗時はスキップ/リトライ、JobRepositoryに履歴保存(フレームワーク機能)
Java

落とし穴と回避策

  • subListのビュー誤用: 元リストを構造変更するとビューが壊れる。必要ならコピーして扱う。
  • 巨大バッファの一括処理: チャンクサイズを現場のヒープ余裕に合わせて調整(測定必須)。
  • コミット境界の不適合: DBは小さすぎると遅く、大きすぎるとロックやメモリに影響。適正値をベンチで決める。
  • 例外時の中途半端な状態: 素朴実装では再実行・スキップ戦略が難しい。要求が厳しいならフレームワークで管理。 Qiita
  • I/Oボトルネックの見落とし: CPUよりストレージ/ネットワークが律速なことが多い。書き込みのバッチ化・接続再利用・並行度調整を検討。

まとめ

  • チャンク分割は「メモリと信頼性」を両立するための基本戦略。リスト分割、イテレータ駆動、ページング、バルクI/Oで実装できる。
  • 再実行・監視・トランザクション管理が重要なら、Spring Batch のチャンクモデルを使うと安全・保守的に運用できる。
  • チャンクサイズとコミット境界は測定して最適化し、I/Oの実情に合わせて調整するのがコツです。
タイトルとURLをコピーしました