Python | 自動化:バルク処理

Python
スポンサーリンク

概要(バルク処理は「チマチマやらずにまとめてやる」技)

バルク処理(bulk processing)は、ざっくり言うと

「1件ずつチマチマ処理するのではなく、まとめてガッと処理することで
速く・安定して・無駄なく動かすやり方」

です。

自動化の現場だと、例えば次のような場面で効いてきます。

同じ API を 1万件分呼ぶ。
1万行ある CSV を DB に入れる。
大量ファイルに同じ変換処理をかける。

こういうときに「1件ずつ for で回してそのたびに I/O する」か、
「ある程度まとめて一括でI/Oする」かで、性能も安定性も大きく変わります。

ここでは、初心者でも腑に落ちるように

  1. バルク処理の考え方(なぜ速くなるのか)
  2. ファイルやリストを「チャンク」に分けるパターン
  3. DB・Web API でのバルク処理のイメージ
  4. 実務で重要な「サイズ設計」と「エラー時の扱い」

を、例題を交えながらかみ砕いて説明します。


バルク処理の本質(「ループの中の I/O を減らす」)

なぜ「まとめてやる」と速くなるのか

コンピュータ処理の時間はだいたい

計算(CPU)にかかる時間
+ ネットワーク・ディスク・DBなどの I/O にかかる時間

の合計です。

特に自動化では、I/O(ファイル読み書き、HTTP、DB)がボトルネックになることが多いです。
ここがポイントで、

1行ずつファイルに書く、1件ずつ DB に INSERT、1件ずつ API を呼ぶ
といったやり方は、「I/Oをやたら多く呼び出す」ことになります。

バルク処理の本質はここにあります。

I/O の回数を減らすために
「データをある程度まとめて(チャンク単位で)処理・転送する」

これだけです。

同じ量のデータを扱うなら、

1件ごとに 10000 回 I/O するより
100件ずつ 100 回 I/O するほうが
圧倒的に速く・安定しやすくなります。

「1件処理」と「バルク処理」の対比イメージ

例えば 10000 行の CSV を DB に入れるとします。

1件ずつ INSERT する場合は、

ループで 10000 回 INSERT 文を実行
→ DB 側とのやりとり(ネットワーク・プロトコル)が 10000 回

バルク INSERT(まとめて挿入)の場合は、

100行ずつのバルク INSERT を 100 回
→ 通信のオーバーヘッドが 100 分の1

になります。

Python の処理はほぼ同じでも、外側で行われる I/O は全然違う。
これがバルク処理が効く理由です。


Python での「チャンク分割」の基本パターン

大量データを「小分けのかたまり」にする発想

バルク処理の第一歩は、

大きなリストやイテレータを「適当なサイズの小分け(チャンク)」にする

習慣をつけることです。

シンプルな例として、リストを n 個ずつに分ける関数を作ってみましょう。

def chunked(iterable, size):
    """iterable を size 個ずつのチャンクに分割して返すジェネレータ"""
    chunk = []
    for item in iterable:
        chunk.append(item)
        if len(chunk) >= size:
            yield chunk
            chunk = []
    if chunk:
        yield chunk
Python

使い方のイメージは次のようになります。

items = list(range(1, 21))

for chunk in chunked(items, 5):
    print(chunk)
Python

これで [1,2,3,4,5], [6,7,8,9,10] … のように小分けにできます。

この型を覚えておくと、

100 件ずつまとめて DB に投げる
50 件ずつまとめて API に投げる
1000 行ずつまとめて CSV に書く

など、「バルク単位」で処理するコードが書きやすくなります。

ファイルを「全部メモリに乗せない」ためのチャンク処理

CSV などの巨大ファイルを扱うとき、全部 read() したり、全部 list() にしてしまうとメモリが足りなくなります。

この場合も、「一定行数ずつ読みながら処理する」=バルク処理が使えます。

from pathlib import Path

def read_in_chunks(path: Path, chunk_size: int = 1000):
    with path.open("r", encoding="utf-8") as f:
        chunk = []
        for line in f:
            chunk.append(line.rstrip("\n"))
            if len(chunk) >= chunk_size:
                yield chunk
                chunk = []
        if chunk:
            yield chunk
Python

イメージとしては、

ファイルを開く
→ 行を 1000 行ずつのかたまりにして渡す
→ 呼び出し側が、その 1000 行をまとめて処理する

という流れです。

これもループの中で「1行ずつ DB に INSERT」のように書きがちなところを、
「1000行=1バルク」で効率よく処理する形に変えるための下地になります。


DB や Web API でのバルク処理のイメージ

DB のバルク INSERT(例:psycopg2 / SQLAlchemy など)

データベースでは、ほとんどのドライバが「まとめて INSERT」する手段を持っています。

単純化した擬似コードでイメージを掴んでください。

1件ずつ INSERT

for row in rows:  # rows は dict のリストとする
    cursor.execute(
        "INSERT INTO users(id, name, age) VALUES (%s, %s, %s)",
        (row["id"], row["name"], row["age"])
    )
Python

バルク INSERT(複数行まとめて)

params = [(r["id"], r["name"], r["age"]) for r in rows]
cursor.executemany(
    "INSERT INTO users(id, name, age) VALUES (%s, %s, %s)",
    params
)
Python

さらに DB によっては、「1つの SQL 文で複数行を一気に VALUES で並べる」ような方法もあります。

どの方法にせよ、共通しているのは

複数件ぶんのデータをまとめて DB に渡す
→ DB とのラウンドトリップ(往復)の回数を減らせる

という点です。

これを chunked(rows, 1000) と組み合わせると、

1000件単位で executemany
→ 全体 100万件の INSERT も現実的な時間で終わる

といった形に持っていけます。

Web API のバルク処理(バッチエンドポイント)

多くの Web API は、「1件ずつの操作」と「複数件をまとめて扱うバルクエンドポイント」の両方を持っています。

例えば、

POST /users → 1ユーザー分の登録
POST /users/bulk → 複数ユーザーをまとめて JSON で渡す

のような形です。

バルクエンドポイントを持つ API では、

ユーザー1件ごとに毎回 POST
→ 1万件なら 1万回 HTTP リクエスト

よりも

100件ぶんを1JSONにして /users/bulk に POST
→ 1万件なら 100回 POST

のほうが、圧倒的に速く・サーバー負荷も低くなります。

Python 側では、先ほどの chunked を使って、

import requests
import json

def send_users_bulk(users, bulk_size=100):
    for batch in chunked(users, bulk_size):
        payload = {"users": batch}
        resp = requests.post("https://api.example.com/users/bulk", json=payload, timeout=10)
        resp.raise_for_status()
Python

という形にします。

発想としてはいつも同じです。

データの単位を「1件」ではなく「バルク」にする。
I/O の粒度を大きくすることで、回数を減らす。

ここを意識するだけで、処理速度が何桁も変わることが普通にあります。


エラー時の扱い(バルク処理の「嫌なところ」をどう設計するか)

バルク処理の弱点:「一部だけ失敗したときどうするか」

バルク処理には明確な弱点があります。

1件ずつ処理している場合、

3件目でエラー
→ 3件目だけエラー処理して、4件目以降は普通に続けられる

といった細かい制御がしやすい。

一方、100件まとめて DB に投げた場合、

その 100件のうち 1件だけ制約違反で落ちた

このときに

残り 99件はどうする?
どのデータがダメだった?

という問題が一気に難しくなります。

ここは設計で決めるしかありません。

バルク単位を「運命共同体」と見なして、1件でもダメなら全部ロールバックするのか。
エラーメッセージの内容から「失敗したレコードだけを特定」して再処理するのか。
最初から「重要度が低いデータだから、バルク単位で一つでもダメなら丸ごと捨てる」のか。

バルク処理を導入するときは、
「一部失敗」のときにどうしたいかを、事前に言葉で決めることが本当に大事です。

リトライと組み合わせる(回線やAPIの揺らぎを吸収する)

バルク処理では、リクエスト1回が「まとまった件数」を抱えています。
その1回がタイムアウトしたり、503を返したりすると、「そのバルク全部がおじゃん」になります。

ここで効いてくるのが「リトライ処理」です。

例えば、

同じバルクを最大3回まで再送
→ それでもダメならこのバルクは諦めてログに残す

といったルールをリトライロジックとして実装します。

実装イメージは、

chunked でバルクを取り出す
→ そのバルクを送る関数を、タイムアウト+リトライ付きでラップ
→ 成功バルク/失敗バルクの件数を最後にまとめてレポート

という感じです。

ここでもポイントは「粒度」です。

どの単位で諦めるか。バルク単位なのか、全体なのか。
この線引きを先に決めておけば、コードはそれに従って素直に書くだけで済みます。


バルクサイズの決め方(ここが一番“センス”が要る)

「大きければいい」わけではない

バルクサイズは、1000件、100件、10件…と選び放題ですが、
「大きければ大きいほど良い」という単純な話ではありません。

サイズが大きすぎると、

1回あたりの処理時間が長くなりすぎて、タイムアウトしやすくなる
メモリ負荷が大きくなる
一部の失敗で巻き添えになる件数が増える

といった問題が出てきます。

逆に小さすぎると、「バルクにした意味が薄れる」=I/O回数があまり減らない状態になります。

現実的な探し方

現場では、結局こうやって決めます。

まずは直感で「これくらいなら安全そう」というサイズを決める(例:100件)。
テスト環境や小さめの本番データで実際に測る。
処理時間・メモリ使用量・エラー率を見て、倍にしてみる、半分にしてみる、を繰り返す。

例えば、DB に対して

100件単位 → 平均 0.2 秒/バルク
500件単位 → 平均 0.35 秒/バルク(ちょっと遅い)
1000件単位 → たまにタイムアウト

みたいな結果が出ることがあります。

この場合なら、「500件以下で運用しよう」「300件くらいが無難かも」といった判断ができます。

バルクサイズを決めるのは「経験」と「実測」です。
最初から正解サイズを当てにいく必要はなく、測って調整すればいい、というスタンスで十分です。


まとめ(バルク処理は「I/Oをまとめて、賢く失敗する」技術)

バルク処理を、自動化の視点から整理するとこうなります。

ループの中で 1件ずつ I/O するのではなく、チャンク(バルク)単位で I/O することで、I/O 回数を減らして高速化・安定化を狙う。
Python ではまず「チャンク分割関数」を身につけ、それを API 呼び出し・DB 操作・ファイル処理に適用する。
バルク処理は、一部の失敗時の扱いが難しくなるので、「バルク単位でのエラー方針(ロールバック・再送・捨てる)」を先に決めておく。
タイムアウトとリトライと組み合わせて、「バルクごとにどこまで粘るか・どこで諦めるか」をきちんと設計する。
バルクサイズは「直感 → 実測 → 調整」で決めるもので、最初から完璧な数字を当てようとしなくていい。

バルク処理の感覚が一度身につくと、

「これ、for で1件ずつやってるけど、まとめれば何倍も速くなるよな?」

と自然に気づけるようになります。

タイトルとURLをコピーしました