Python | DB・SQL:バルク処理

Python
スポンサーリンク

バルク処理って何?まずはイメージから

バルク処理(バルク INSERT / バルク UPDATE など)は、「大量のデータをまとめて一気に処理する」やり方のことです。
1 行ずつチマチマ INSERT するのではなく、「1000 行まとめて 1 回の INSERT にする」といったイメージです。

なぜそんなことをするかというと、データベースにとって「1 回のクエリを実行するコスト」が意外と重いからです。
1 万件を 1 件ずつ INSERT すると「1 万回の往復」になりますが、1000 件ずつ 10 回にまとめれば「10 回の往復」で済みます。
この「往復回数を減らす」ことが、バルク処理の一番大きな狙いです。


SQL レベルでのバルク INSERT の基本

1 行ずつ INSERT する場合

まず、普通の INSERT を思い出します。

INSERT INTO users (name, email) VALUES ('Taro', 'taro@example.com');
INSERT INTO users (name, email) VALUES ('Hanako', 'hanako@example.com');
INSERT INTO users (name, email) VALUES ('Ken', 'ken@example.com');
SQL

この書き方だと、3 回クエリを投げています。
10000 人分なら 10000 回です。
クエリの回数が増えるほど、ネットワークや DB のオーバーヘッドが積み重なって遅くなります。

複数行をまとめて INSERT する(バルク INSERT)

同じことをバルク INSERT で書くと、こうなります。

INSERT INTO users (name, email) VALUES
  ('Taro',   'taro@example.com'),
  ('Hanako', 'hanako@example.com'),
  ('Ken',    'ken@example.com');
SQL

クエリは 1 回だけです。
これが「バルク INSERT」の最もシンプルな形です。

大量データになるほど、この差が効いてきます。
実測でも、1 件ずつよりバルクの方が何倍も速くなることが多いです。


Python(sqlite3)でのバルク INSERT

1 件ずつ INSERT するコード

まずは、素直に 1 件ずつ INSERT するパターンです。

import sqlite3

conn = sqlite3.connect("app.db")
cur = conn.cursor()

cur.execute(
    """
    CREATE TABLE IF NOT EXISTS users (
        id     INTEGER PRIMARY KEY AUTOINCREMENT,
        name   TEXT NOT NULL,
        email  TEXT UNIQUE NOT NULL
    )
    """
)
conn.commit()

users = [
    ("Taro", "taro@example.com"),
    ("Hanako", "hanako@example.com"),
    ("Ken", "ken@example.com"),
]

for name, email in users:
    cur.execute(
        "INSERT INTO users (name, email) VALUES (?, ?)",
        (name, email),
    )

conn.commit()
conn.close()
Python

ループのたびに execute を呼んでいるので、ユーザー数が増えるほど遅くなります。
10000 人なら 10000 回 execute です。

executemany を使ったバルク INSERT

同じことを、executemany でまとめて投げるとこうなります。

import sqlite3

conn = sqlite3.connect("app.db")
cur = conn.cursor()

cur.execute(
    """
    CREATE TABLE IF NOT EXISTS users (
        id     INTEGER PRIMARY KEY AUTOINCREMENT,
        name   TEXT NOT NULL,
        email  TEXT UNIQUE NOT NULL
    )
    """
)
conn.commit()

users = [
    ("Taro", "taro@example.com"),
    ("Hanako", "hanako@example.com"),
    ("Ken", "ken@example.com"),
]

cur.executemany(
    "INSERT INTO users (name, email) VALUES (?, ?)",
    users,
)

conn.commit()
conn.close()
Python

executemany は、「同じ SQL に対して、パラメータだけを変えながらまとめて実行する」ためのメソッドです。
DB ドライバ側が最適化してくれるので、1 件ずつループするよりずっと効率的になります。

大量データを扱うときは、
「ループの中で毎回 execute しない」
「できるだけ executemany でまとめる」
というのが、Python でのバルク処理の基本パターンです。


バッチサイズという考え方(いきなり全部まとめない)

一度に全部投げるのが正解とは限らない

「じゃあ 100 万件を 1 回の executemany で投げれば最強では?」と思うかもしれませんが、
現実にはメモリやタイムアウト、ロック時間などの問題が出てきます。

そこで出てくるのが「バッチサイズ」という考え方です。
例えば、100 万件を 1000 件ずつに分けて、1000 回のバッチで処理する、というやり方です。

Python でのバッチ処理の例

def chunked(iterable, size):
    for i in range(0, len(iterable), size):
        yield iterable[i:i + size]

import sqlite3

conn = sqlite3.connect("app.db")
cur = conn.cursor()

users = [
    (f"User{i}", f"user{i}@example.com")
    for i in range(100000)
]

batch_size = 1000

for batch in chunked(users, batch_size):
    cur.executemany(
        "INSERT INTO users (name, email) VALUES (?, ?)",
        batch,
    )
    conn.commit()

conn.close()
Python

ここでは、100000 件を 1000 件ずつ 100 回に分けて INSERT しています。
一度に全部メモリに載せて巨大なクエリを投げるより、
「適度なサイズで分割して、でも 1 件ずつではなくバルクで」というバランスが大事です。


トランザクションとバルク処理の関係

1 件ずつ commit すると遅くなる

バルク処理でよくある「やってはいけないパターン」が、
「ループの中で毎回 commit する」ことです。

for user in users:
    cur.execute("INSERT ...", user)
    conn.commit()  # これはかなり遅くなる
Python

commit はトランザクションを確定する重い操作なので、
これを何万回も繰り返すと、バルク処理のメリットがほぼ消えます。

「まとめて commit」が基本

バルク処理では、

ある程度の件数をまとめて INSERT
その塊ごとに 1 回だけ commit

という形にするのが基本です。

batch_size = 1000

for batch in chunked(users, batch_size):
    cur.executemany("INSERT INTO users (name, email) VALUES (?, ?)", batch)
    conn.commit()
Python

これで、「バッチ単位でトランザクションを区切る」ことができます。
全部を 1 トランザクションにすると、ロック時間が長くなりすぎることもあるので、
「バッチサイズ」と「トランザクションの粒度」はセットで考えると良いです。


SQLAlchemy でのバルク処理のイメージ

通常の ORM INSERT(1 件ずつ add)

SQLAlchemy の ORM で、普通に INSERT するとこうなります。

session = SessionLocal()

for name, email in users:
    user = User(name=name, email=email)
    session.add(user)

session.commit()
session.close()
Python

これでも内部的にはある程度まとめてくれますが、
大量データになるとオブジェクト生成コストやトラッキングコストが効いてきます。

bulk_save_objects / bulk_insert_mappings を使う

SQLAlchemy には、バルク用の API もあります。

session = SessionLocal()

objs = [User(name=name, email=email) for name, email in users]
session.bulk_save_objects(objs)

session.commit()
session.close()
Python

あるいは、辞書ベースで直接カラムにマッピングする方法もあります。

session = SessionLocal()

mappings = [
    {"name": name, "email": email}
    for name, email in users
]

session.bulk_insert_mappings(User, mappings)

session.commit()
session.close()
Python

これらは「ORM の一部機能(バリデーションやイベントなど)を犠牲にしてでも、とにかく速く入れたい」ときに使うイメージです。
「大量データを一気に流し込むバッチ処理」などで威力を発揮します。


バルク処理で意識しておきたい注意点

エラー時の扱いと再実行戦略

バルク処理は「まとめてやる」ぶん、
途中でエラーが起きたときに「どこまで成功しているか」が分かりにくくなります。

そのため、実務ではよく次のような戦略を取ります。

バッチ単位でトランザクションを区切る(1 バッチ失敗ならそのバッチだけやり直せばよい)
入力データ側に「ID やタイムスタンプ」を持たせておき、再実行時に重複を避けられるようにする

「速さ」と「エラー時の扱いやすさ」はトレードオフなので、
どこまでまとめるかは要件と相談になります。

インデックスや制約が多いテーブルは遅くなりやすい

大量 INSERT のとき、インデックスや外部キー制約が多いテーブルは、
1 行入れるたびにそれらを更新・チェックする必要があるため、どうしても遅くなります。

本当に極端なバルク処理では、

一時的に不要なインデックスを外す
一時テーブルに入れてから本テーブルに移す

といったテクニックも使われますが、
これは本番運用レベルの話なので、最初は「インデックスが多いと挿入が重くなる」くらいの理解で十分です。


まとめ(バルク処理は「大量データを扱うときの必須スキル」)

バルク処理を初心者目線でまとめると、こうなります。

大量データを 1 件ずつ処理するのではなく、「まとめて一気に」処理することで、クエリ回数とトランザクション回数を減らし、圧倒的に速くするテクニックがバルク処理。
SQL レベルでは「複数行の VALUES を 1 つの INSERT にまとめる」、Python では executemany やバッチ処理を使うのが基本パターン。
一度に全部まとめすぎると別の問題(メモリ、ロック、タイムアウト)が出るので、「バッチサイズ」と「トランザクションの粒度」を意識して設計することが重要。

タイトルとURLをコピーしました