概要(バルク処理は「チマチマやらずにまとめてやる」技)
バルク処理(bulk processing)は、ざっくり言うと
「1件ずつチマチマ処理するのではなく、まとめてガッと処理することで
速く・安定して・無駄なく動かすやり方」
です。
自動化の現場だと、例えば次のような場面で効いてきます。
同じ API を 1万件分呼ぶ。
1万行ある CSV を DB に入れる。
大量ファイルに同じ変換処理をかける。
こういうときに「1件ずつ for で回してそのたびに I/O する」か、
「ある程度まとめて一括でI/Oする」かで、性能も安定性も大きく変わります。
ここでは、初心者でも腑に落ちるように
- バルク処理の考え方(なぜ速くなるのか)
- ファイルやリストを「チャンク」に分けるパターン
- DB・Web API でのバルク処理のイメージ
- 実務で重要な「サイズ設計」と「エラー時の扱い」
を、例題を交えながらかみ砕いて説明します。
バルク処理の本質(「ループの中の I/O を減らす」)
なぜ「まとめてやる」と速くなるのか
コンピュータ処理の時間はだいたい
計算(CPU)にかかる時間
+ ネットワーク・ディスク・DBなどの I/O にかかる時間
の合計です。
特に自動化では、I/O(ファイル読み書き、HTTP、DB)がボトルネックになることが多いです。
ここがポイントで、
1行ずつファイルに書く、1件ずつ DB に INSERT、1件ずつ API を呼ぶ
といったやり方は、「I/Oをやたら多く呼び出す」ことになります。
バルク処理の本質はここにあります。
I/O の回数を減らすために
「データをある程度まとめて(チャンク単位で)処理・転送する」
これだけです。
同じ量のデータを扱うなら、
1件ごとに 10000 回 I/O するより
100件ずつ 100 回 I/O するほうが
圧倒的に速く・安定しやすくなります。
「1件処理」と「バルク処理」の対比イメージ
例えば 10000 行の CSV を DB に入れるとします。
1件ずつ INSERT する場合は、
ループで 10000 回 INSERT 文を実行
→ DB 側とのやりとり(ネットワーク・プロトコル)が 10000 回
バルク INSERT(まとめて挿入)の場合は、
100行ずつのバルク INSERT を 100 回
→ 通信のオーバーヘッドが 100 分の1
になります。
Python の処理はほぼ同じでも、外側で行われる I/O は全然違う。
これがバルク処理が効く理由です。
Python での「チャンク分割」の基本パターン
大量データを「小分けのかたまり」にする発想
バルク処理の第一歩は、
大きなリストやイテレータを「適当なサイズの小分け(チャンク)」にする
習慣をつけることです。
シンプルな例として、リストを n 個ずつに分ける関数を作ってみましょう。
def chunked(iterable, size):
"""iterable を size 個ずつのチャンクに分割して返すジェネレータ"""
chunk = []
for item in iterable:
chunk.append(item)
if len(chunk) >= size:
yield chunk
chunk = []
if chunk:
yield chunk
Python使い方のイメージは次のようになります。
items = list(range(1, 21))
for chunk in chunked(items, 5):
print(chunk)
Pythonこれで [1,2,3,4,5], [6,7,8,9,10] … のように小分けにできます。
この型を覚えておくと、
100 件ずつまとめて DB に投げる
50 件ずつまとめて API に投げる
1000 行ずつまとめて CSV に書く
など、「バルク単位」で処理するコードが書きやすくなります。
ファイルを「全部メモリに乗せない」ためのチャンク処理
CSV などの巨大ファイルを扱うとき、全部 read() したり、全部 list() にしてしまうとメモリが足りなくなります。
この場合も、「一定行数ずつ読みながら処理する」=バルク処理が使えます。
from pathlib import Path
def read_in_chunks(path: Path, chunk_size: int = 1000):
with path.open("r", encoding="utf-8") as f:
chunk = []
for line in f:
chunk.append(line.rstrip("\n"))
if len(chunk) >= chunk_size:
yield chunk
chunk = []
if chunk:
yield chunk
Pythonイメージとしては、
ファイルを開く
→ 行を 1000 行ずつのかたまりにして渡す
→ 呼び出し側が、その 1000 行をまとめて処理する
という流れです。
これもループの中で「1行ずつ DB に INSERT」のように書きがちなところを、
「1000行=1バルク」で効率よく処理する形に変えるための下地になります。
DB や Web API でのバルク処理のイメージ
DB のバルク INSERT(例:psycopg2 / SQLAlchemy など)
データベースでは、ほとんどのドライバが「まとめて INSERT」する手段を持っています。
単純化した擬似コードでイメージを掴んでください。
1件ずつ INSERT
for row in rows: # rows は dict のリストとする
cursor.execute(
"INSERT INTO users(id, name, age) VALUES (%s, %s, %s)",
(row["id"], row["name"], row["age"])
)
Pythonバルク INSERT(複数行まとめて)
params = [(r["id"], r["name"], r["age"]) for r in rows]
cursor.executemany(
"INSERT INTO users(id, name, age) VALUES (%s, %s, %s)",
params
)
Pythonさらに DB によっては、「1つの SQL 文で複数行を一気に VALUES で並べる」ような方法もあります。
どの方法にせよ、共通しているのは
複数件ぶんのデータをまとめて DB に渡す
→ DB とのラウンドトリップ(往復)の回数を減らせる
という点です。
これを chunked(rows, 1000) と組み合わせると、
1000件単位で executemany
→ 全体 100万件の INSERT も現実的な時間で終わる
といった形に持っていけます。
Web API のバルク処理(バッチエンドポイント)
多くの Web API は、「1件ずつの操作」と「複数件をまとめて扱うバルクエンドポイント」の両方を持っています。
例えば、
POST /users → 1ユーザー分の登録POST /users/bulk → 複数ユーザーをまとめて JSON で渡す
のような形です。
バルクエンドポイントを持つ API では、
ユーザー1件ごとに毎回 POST
→ 1万件なら 1万回 HTTP リクエスト
よりも
100件ぶんを1JSONにして /users/bulk に POST
→ 1万件なら 100回 POST
のほうが、圧倒的に速く・サーバー負荷も低くなります。
Python 側では、先ほどの chunked を使って、
import requests
import json
def send_users_bulk(users, bulk_size=100):
for batch in chunked(users, bulk_size):
payload = {"users": batch}
resp = requests.post("https://api.example.com/users/bulk", json=payload, timeout=10)
resp.raise_for_status()
Pythonという形にします。
発想としてはいつも同じです。
データの単位を「1件」ではなく「バルク」にする。
I/O の粒度を大きくすることで、回数を減らす。
ここを意識するだけで、処理速度が何桁も変わることが普通にあります。
エラー時の扱い(バルク処理の「嫌なところ」をどう設計するか)
バルク処理の弱点:「一部だけ失敗したときどうするか」
バルク処理には明確な弱点があります。
1件ずつ処理している場合、
3件目でエラー
→ 3件目だけエラー処理して、4件目以降は普通に続けられる
といった細かい制御がしやすい。
一方、100件まとめて DB に投げた場合、
その 100件のうち 1件だけ制約違反で落ちた
このときに
残り 99件はどうする?
どのデータがダメだった?
という問題が一気に難しくなります。
ここは設計で決めるしかありません。
バルク単位を「運命共同体」と見なして、1件でもダメなら全部ロールバックするのか。
エラーメッセージの内容から「失敗したレコードだけを特定」して再処理するのか。
最初から「重要度が低いデータだから、バルク単位で一つでもダメなら丸ごと捨てる」のか。
バルク処理を導入するときは、
「一部失敗」のときにどうしたいかを、事前に言葉で決めることが本当に大事です。
リトライと組み合わせる(回線やAPIの揺らぎを吸収する)
バルク処理では、リクエスト1回が「まとまった件数」を抱えています。
その1回がタイムアウトしたり、503を返したりすると、「そのバルク全部がおじゃん」になります。
ここで効いてくるのが「リトライ処理」です。
例えば、
同じバルクを最大3回まで再送
→ それでもダメならこのバルクは諦めてログに残す
といったルールをリトライロジックとして実装します。
実装イメージは、
chunked でバルクを取り出す
→ そのバルクを送る関数を、タイムアウト+リトライ付きでラップ
→ 成功バルク/失敗バルクの件数を最後にまとめてレポート
という感じです。
ここでもポイントは「粒度」です。
どの単位で諦めるか。バルク単位なのか、全体なのか。
この線引きを先に決めておけば、コードはそれに従って素直に書くだけで済みます。
バルクサイズの決め方(ここが一番“センス”が要る)
「大きければいい」わけではない
バルクサイズは、1000件、100件、10件…と選び放題ですが、
「大きければ大きいほど良い」という単純な話ではありません。
サイズが大きすぎると、
1回あたりの処理時間が長くなりすぎて、タイムアウトしやすくなる
メモリ負荷が大きくなる
一部の失敗で巻き添えになる件数が増える
といった問題が出てきます。
逆に小さすぎると、「バルクにした意味が薄れる」=I/O回数があまり減らない状態になります。
現実的な探し方
現場では、結局こうやって決めます。
まずは直感で「これくらいなら安全そう」というサイズを決める(例:100件)。
テスト環境や小さめの本番データで実際に測る。
処理時間・メモリ使用量・エラー率を見て、倍にしてみる、半分にしてみる、を繰り返す。
例えば、DB に対して
100件単位 → 平均 0.2 秒/バルク
500件単位 → 平均 0.35 秒/バルク(ちょっと遅い)
1000件単位 → たまにタイムアウト
みたいな結果が出ることがあります。
この場合なら、「500件以下で運用しよう」「300件くらいが無難かも」といった判断ができます。
バルクサイズを決めるのは「経験」と「実測」です。
最初から正解サイズを当てにいく必要はなく、測って調整すればいい、というスタンスで十分です。
まとめ(バルク処理は「I/Oをまとめて、賢く失敗する」技術)
バルク処理を、自動化の視点から整理するとこうなります。
ループの中で 1件ずつ I/O するのではなく、チャンク(バルク)単位で I/O することで、I/O 回数を減らして高速化・安定化を狙う。
Python ではまず「チャンク分割関数」を身につけ、それを API 呼び出し・DB 操作・ファイル処理に適用する。
バルク処理は、一部の失敗時の扱いが難しくなるので、「バルク単位でのエラー方針(ロールバック・再送・捨てる)」を先に決めておく。
タイムアウトとリトライと組み合わせて、「バルクごとにどこまで粘るか・どこで諦めるか」をきちんと設計する。
バルクサイズは「直感 → 実測 → 調整」で決めるもので、最初から完璧な数字を当てようとしなくていい。
バルク処理の感覚が一度身につくと、
「これ、for で1件ずつやってるけど、まとめれば何倍も速くなるよな?」
と自然に気づけるようになります。
