バッチ処理用チャンク分割(partitioning) — メモリ制御
大量データを「少しずつ」処理することで、メモリ使用量と失敗リスクを抑えるのがチャンク分割です。固定サイズで区切って読み・変換・書き込みを進めれば、ヒープを圧迫せずに安定運用できます。フレームワークを使わない素朴な実装から、Spring Batch のチャンクモデルまでを初心者向けに整理します。
基本考え方(なぜチャンク分割が効くか)
- メモリ上限の回避: 100万件を一度に読み込む代わりに、例えば1000件ごとに処理すれば、ピーク使用メモリをほぼチャンクサイズ+αに抑えられる。
- 失敗時のリカバリ: チャンク単位でコミットすれば、失敗しても巻き戻し範囲が小さい。
- I/Oの効率化: まとめて書き込む(バルク)ことでストレージやネットワークを効率化できる。
- フレームワークのモデル: Spring Batch は「Read → Process → Write」をチャンクで繰り返し、信頼性・速度・メモリ効率を高める設計を提供している。
素朴なチャンク分割パターン(フレームワーク無し)
リストを固定サイズに分割
import java.util.*;
public class Partitioner {
public static <T> List<List<T>> partition(List<T> src, int size) {
List<List<T>> parts = new ArrayList<>();
for (int i = 0; i < src.size(); i += size) {
int end = Math.min(i + size, src.size());
parts.add(src.subList(i, end)); // ビュー(コピーではない)
}
return parts;
}
public static void main(String[] args) {
List<Integer> data = new ArrayList<>();
for (int i = 1; i <= 23; i++) data.add(i);
for (List<Integer> chunk : partition(data, 5)) {
// ここで処理+バルク書き込みなど
System.out.println(chunk);
}
}
}
Java- ポイント: subList は元のリストのビュー。処理中に元リストを構造変更しない。必要なら
new ArrayList<>(subList)でコピー。
イテレータ駆動(逐次読み込み→チャンク)
import java.util.*;
public class IteratorChunk {
public static <T> List<T> nextChunk(Iterator<T> it, int size) {
List<T> chunk = new ArrayList<>(size);
for (int i = 0; i < size && it.hasNext(); i++) {
chunk.add(it.next());
}
return chunk;
}
public static void main(String[] args) {
Iterator<Integer> it = new ArrayList<>(List.of(1,2,3,4,5,6,7,8,9)).iterator();
int size = 4;
List<Integer> chunk;
while (!(chunk = nextChunk(it, size)).isEmpty()) {
// チャンク処理&コミット
System.out.println(chunk);
}
}
}
Java- ポイント: 入力が巨大でも、必要な分だけ前へ進む。ファイルやDBカーソルに相性が良い。
ストリームをチャンクごとに消費(Collector なしの簡易版)
import java.util.*;
import java.util.stream.*;
public class StreamChunk {
public static <T> void forEachChunk(Stream<T> stream, int size, java.util.function.Consumer<List<T>> consumer) {
Iterator<T> it = stream.iterator();
List<T> buf = new ArrayList<>(size);
while (it.hasNext()) {
buf.add(it.next());
if (buf.size() == size) { consumer.accept(buf); buf = new ArrayList<>(size); }
}
if (!buf.isEmpty()) consumer.accept(buf);
}
public static void main(String[] args) {
forEachChunk(IntStream.range(0, 17).boxed(), 6, chunk -> {
// バルク書き込みやまとめ集計
System.out.println(chunk);
});
}
}
Java- ポイント: 遅延読み込み+チャンク単位のアクション。終端で後処理をまとめる。
例題で身につける
例題1: CSVファイルを1000行ずつ読み→DBへバルク挿入
import java.io.*;
import java.util.*;
class CsvBatchInsert {
static final int CHUNK = 1000;
public static void main(String[] args) throws Exception {
try (BufferedReader br = new BufferedReader(new FileReader("data.csv"))) {
String line;
List<String[]> batch = new ArrayList<>(CHUNK);
while ((line = br.readLine()) != null) {
batch.add(line.split(","));
if (batch.size() == CHUNK) {
bulkInsert(batch);
batch.clear();
}
}
if (!batch.isEmpty()) bulkInsert(batch);
}
}
static void bulkInsert(List<String[]> rows) {
// まとめてDB書き込み(疑似)
// 実際はPreparedStatementのaddBatch/executeBatch など
System.out.println("insert " + rows.size());
}
}
Java- ポイント: 読み込みも書き込みもチャンク化して、ピークメモリを抑える。
例題2: API からページング取得→チャンク処理
import java.util.*;
class PagedApi {
static List<String> fetchPage(int page, int size) {
// 実際はHTTPで取得。ここではダミー。
int start = page * size, end = Math.min(start + size, 10500);
if (start >= end) return List.of();
List<String> res = new ArrayList<>();
for (int i = start; i < end; i++) res.add("item-" + i);
return res;
}
public static void main(String[] args) {
int page = 0, size = 500;
while (true) {
List<String> items = fetchPage(page++, size);
if (items.isEmpty()) break;
process(items); // チャンク単位で処理
}
}
static void process(List<String> items) {
// バルクの検証・保存など
System.out.println("process " + items.size());
}
}
Java- ポイント: 外部ソースがページング対応なら、そのままチャンク分割に使える。
例題3: スライディングウィンドウで集計負荷を一定に
import java.util.*;
class SlidingAvg {
private final Deque<Integer> dq = new ArrayDeque<>();
private final int max;
private long sum = 0;
SlidingAvg(int max) { this.max = max; }
void add(int x) {
dq.addLast(x); sum += x;
if (dq.size() > max) sum -= dq.pollFirst();
}
double avg() { return dq.isEmpty() ? 0.0 : (double) sum / dq.size(); }
public static void main(String[] args) {
SlidingAvg s = new SlidingAvg(1000);
for (int i = 1; i <= 10_000; i++) { s.add(i); }
System.out.println(s.avg());
}
}
Java- ポイント: 窓サイズでメモリと計算量を制御。
Spring Batch のチャンクモデル(実務フレームワーク)
- 仕組み: 大量データを「ItemReader → ItemProcessor → ItemWriter」でチャンク単位に処理し、コミット境界を制御する構成。失敗時のリトライやリスタート、ログ・監視の仕組みを備え、信頼性の高いバッチ実装が可能。
- 利点: チャンクサイズに応じたトランザクション管理、スキップ・リトライ戦略、監視用のリスナー(ChunkListener)などを標準提供。
- 導入の目安: スケジューリングや再実行、業務的なエラーハンドリングが重要なら、素朴実装よりフレームワークの採用が保守性・安全性で有利。
テンプレート集(そのまま使える)
- 単純リスト分割
List<List<T>> parts = new ArrayList<>();
for (int i = 0; i < src.size(); i += size) {
parts.add(src.subList(i, Math.min(i + size, src.size())));
}
Java- 逐次チャンク読み(イテレータ)
List<T> chunk = new ArrayList<>(size);
while (it.hasNext()) {
chunk.add(it.next());
if (chunk.size() == size) { consume(chunk); chunk.clear(); }
}
if (!chunk.isEmpty()) consume(chunk);
Java- ファイル読み+バルク書き
try (var br = new java.io.BufferedReader(new java.io.FileReader(path))) {
List<Row> batch = new ArrayList<>(N);
for (String line; (line = br.readLine()) != null; ) {
batch.add(parse(line));
if (batch.size() == N) { writeBulk(batch); batch.clear(); }
}
if (!batch.isEmpty()) writeBulk(batch);
}
Java- Spring Batch(概念)
// Reader -> Processor -> Writer をchunk(N)で構成し、commit境界を設定
// 失敗時はスキップ/リトライ、JobRepositoryに履歴保存(フレームワーク機能)
Java落とし穴と回避策
- subListのビュー誤用: 元リストを構造変更するとビューが壊れる。必要ならコピーして扱う。
- 巨大バッファの一括処理: チャンクサイズを現場のヒープ余裕に合わせて調整(測定必須)。
- コミット境界の不適合: DBは小さすぎると遅く、大きすぎるとロックやメモリに影響。適正値をベンチで決める。
- 例外時の中途半端な状態: 素朴実装では再実行・スキップ戦略が難しい。要求が厳しいならフレームワークで管理。 Qiita
- I/Oボトルネックの見落とし: CPUよりストレージ/ネットワークが律速なことが多い。書き込みのバッチ化・接続再利用・並行度調整を検討。
まとめ
- チャンク分割は「メモリと信頼性」を両立するための基本戦略。リスト分割、イテレータ駆動、ページング、バルクI/Oで実装できる。
- 再実行・監視・トランザクション管理が重要なら、Spring Batch のチャンクモデルを使うと安全・保守的に運用できる。
- チャンクサイズとコミット境界は測定して最適化し、I/Oの実情に合わせて調整するのがコツです。
