Python | 自動化:マルチスレッド

Python
スポンサーリンク
  1. 概要(マルチスレッドは「待ち時間のあいだに、別の仕事を回す」仕組み)
  2. 基本イメージ(1人作業 vs 複数人作業)
    1. シングルスレッドは「1人で全部こなす」
    2. マルチスレッドは「複数人に仕事を分ける」
  3. 最小例(ThreadPoolExecutor で複数URLを同時に叩く)
    1. まずは「順番に実行する」バージョン
    2. ThreadPoolExecutor を使ったマルチスレッド版
  4. 「何に効くか/何に効かないか」(GIL と I/O バウンド vs CPU バウンド)
    1. GIL のざっくりイメージ
    2. 逆に「待ち時間が多い処理」はマルチスレッドの得意分野
  5. 典型的な自動化パターンの例(ファイル複数個の処理を並列化)
    1. 例:大量の CSV ファイルを順番に処理する
    2. ThreadPoolExecutor でファイル処理を並列化
  6. 重要ポイント:スレッド安全性(同じものを同時に触ると壊れる)
    1. 「共有データを同時に書き換える」のが一番危ない
    2. Lock(排他ロック)で「同じ場所に入れるのは1人まで」にする
    3. 「スレッド毎に自分用の箱を持たせる」発想
  7. どこまでマルチスレッドを使うか(初心者のためのガイドライン)
    1. まずは「ThreadPoolExecutor だけ」から始める
    2. 「やらないほうが良い」マルチスレッドの使い方
    3. 「本当にマルチスレッドが必要か」を一度立ち止まって考える
  8. まとめ(「待ち時間を埋めるための並行処理」として使う)

概要(マルチスレッドは「待ち時間のあいだに、別の仕事を回す」仕組み)

Python のマルチスレッドは、
「1つのプログラムの中で、複数の“流れ(スレッド)”を同時進行させる」仕組みです。

自動化の世界だと、

  • Web API をたくさん叩く
  • 複数サイトからスクレイピングする
  • 複数ファイルを読み書きする

といった「待ち時間(ネットワーク・ディスク)が多い処理」を、並行して進めたいときに強いです。

まずはここを押さえてください。

CPU を全力で使う計算処理の高速化は、Python の標準的なマルチスレッドではあまり得意ではない(GIL の話)。
ネット通信やファイル I/O のような「待ち」が多い処理を同時に走らせるときに、マルチスレッドはとても効果的。

この前提を踏まえた上で、「どんなときに使うか」「どう書けば安全か」をかみ砕いていきます。


基本イメージ(1人作業 vs 複数人作業)

シングルスレッドは「1人で全部こなす」

普通の Python スクリプトは、1本のスレッドで順番に処理を進めます。

1つ目の API を叩く(レスポンスが返ってくるまで待つ)
返ってきたら次の API を叩く(また待つ)
…という具合に、「待ち」のあいだは CPU はほぼ何もしていません。

これが人間だったら、「お客さんの返事を待つあいだに、別の仕事を進めたい」と思いますよね。
マルチスレッドはまさにそれをコードでやります。

マルチスレッドは「複数人に仕事を分ける」

例えば、10 個の URL に対してリクエストを送る場合。

シングルスレッドだと、
1番目を送って待つ → 終わったら 2番目 → …と 10 回繰り返す。

マルチスレッドだと、
スレッド A・B・C… に「この URL やっといて」と配って、
それぞれの待ち時間中に他の URL の処理を進めます。

重要なのは、「CPU が10倍速くなる」わけではなく、
「待ち時間を有効に使える」ことでトータル時間が短くなる、という点です。


最小例(ThreadPoolExecutor で複数URLを同時に叩く)

まずは「順番に実行する」バージョン

とりあえず、10 個のURLに対して順番に requests.get するコードを考えてみます。

import time
import requests

URLS = [
    "https://example.com",
    "https://httpbin.org/delay/1",  # 応答に1秒かかるURL(テスト用)
] * 5  # 2種類×5回 = 10件

def fetch(url):
    start = time.time()
    resp = requests.get(url, timeout=5)
    elapsed = time.time() - start
    print(f"{url} -> {resp.status_code} ({elapsed:.2f}秒)")
    return elapsed

def single_thread():
    total_start = time.time()
    for url in URLS:
        fetch(url)
    total_elapsed = time.time() - total_start
    print(f"シングルスレッド合計: {total_elapsed:.2f}秒")

if __name__ == "__main__":
    single_thread()
Python

delay/1 があるので、単純に考えると 10 秒前後かかるはずです(正確ではありませんがイメージとして)。

ThreadPoolExecutor を使ったマルチスレッド版

今度は、同じ fetch を「スレッドプール」で並行実行してみます。

from concurrent.futures import ThreadPoolExecutor, as_completed

def multi_thread(max_workers=5):
    total_start = time.time()

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(fetch, url) for url in URLS]

        for future in as_completed(futures):
            _ = future.result()

    total_elapsed = time.time() - total_start
    print(f"マルチスレッド合計: {total_elapsed:.2f}秒")

if __name__ == "__main__":
    single_thread()
    multi_thread(max_workers=5)
Python

ポイントを整理します。

ThreadPoolExecutor(max_workers=5)
同時に動かせるスレッド数を 5 にしています。
10件のフェッチを「5人の作業員」で回すイメージです。

executor.submit(fetch, url)
fetch(url) をスレッドに渡し、「終わったら result() で中身を受け取れる Future」を返します。

as_completed(futures)
どのスレッドが先に終わっても、終わった順に結果を処理できます。
ここでは単に result() を呼んでいるだけですが、本来は結果をまとめるなどの後処理をします。

この例は I/O(ネットワーク待ち)が主なので、マルチスレッドにすることで合計時間がかなり短くなります。


「何に効くか/何に効かないか」(GIL と I/O バウンド vs CPU バウンド)

GIL のざっくりイメージ

Python(CPython)には GIL(グローバルインタプリタロック)という仕組みがあり、
「1つのプロセスの中で、“純粋な Python コード”を同時に複数のスレッドでガンガン実行させる」のは実は苦手です。

乱暴に言うと、「CPU をぶん回す計算処理は、スレッドを増やしてもあまり速くならない」ことが多いです。

例:巨大なリストに対してガンガン for 文で計算するだけ、みたいな処理。

こういうときは、マルチスレッドではなくマルチプロセス(multiprocessing)を使うほうが本筋になります。

逆に「待ち時間が多い処理」はマルチスレッドの得意分野

I/O バウンドな処理(=CPU ではなく「待ち」がボトルネック)には、マルチスレッドはとてもよく効きます。

典型例は、

  • HTTPアクセス(requests, Selenium)
  • ファイル読み書き(大量のファイルを順番に処理するなど)
  • データベースアクセス(クエリの待ち時間が長い)

こういう処理は、1つ1つは「待ち」が長いけれど、
その待ち時間のあいだに他のスレッドの処理を進められるため、全体のスループットが上がります。

自動化の文脈では、「データ収集 BOT」「Web API 集計 BOT」「スクレイピング」「ファイル変換」などがまさにここに当たります。


典型的な自動化パターンの例(ファイル複数個の処理を並列化)

例:大量の CSV ファイルを順番に処理する

まずはシングルスレッド版をイメージします。

from pathlib import Path
import time

DATA_DIR = Path("data")

def process_file(path: Path):
    print(f"処理開始: {path.name}")
    time.sleep(1)  # ここに本来は重めの処理(読み込み+変換など)
    print(f"処理終了: {path.name}")

def single_thread_files():
    start = time.time()
    for path in DATA_DIR.glob("*.csv"):
        process_file(path)
    elapsed = time.time() - start
    print(f"シングルスレッド合計: {elapsed:.2f}秒")
Python

ファイルが 20 個あるなら、単純に 20 秒くらいかかる想定です。

ThreadPoolExecutor でファイル処理を並列化

from concurrent.futures import ThreadPoolExecutor, as_completed

def multi_thread_files(max_workers=5):
    start = time.time()
    paths = list(DATA_DIR.glob("*.csv"))
    print(f"対象ファイル数: {len(paths)}")

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_file, p) for p in paths]
        for future in as_completed(futures):
            future.result()

    elapsed = time.time() - start
    print(f"マルチスレッド合計: {elapsed:.2f}秒")
Python

ここでは process_file の中身を time.sleep(1) で代用していますが、
実際には「読み込み→変換→書き出し」といった I/O を伴う処理になることが多いです。

重要なのは、「1ファイルの処理内容は、他のファイルとは独立している」点です。
こういうときは、マルチスレッドに乗せやすく、バグりにくい。


重要ポイント:スレッド安全性(同じものを同時に触ると壊れる)

「共有データを同時に書き換える」のが一番危ない

マルチスレッドの難しさは、まさにここです。

2つ以上のスレッドが、同じ変数・同じリスト・同じファイルなどを同時に書き換えると、
結果がぐちゃぐちゃになったり、たまにおかしい値になったりします。

これがいわゆる「競合状態(レースコンディション)」です。

極端に単純化した例を見てみます。

import threading

counter = 0

def increment():
    global counter
    for _ in range(100000):
        counter += 1

threads = [threading.Thread(target=increment) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()

print(counter)
Python

期待値は 200000 ですが、実際にはそれより小さくなることがあります。
理由は、counter += 1 が「読み込み → 加算 → 書き込み」という複数ステップの処理だからです。
2つのスレッドが同時にこの操作をしてしまうと、「片方の書き込みがもう片方の結果を踏みつぶす」ことが起きます。

Lock(排他ロック)で「同じ場所に入れるのは1人まで」にする

こういう競合を防ぐ基本手段が Lock です。

import threading

counter = 0
lock = threading.Lock()

def increment():
    global counter
    for _ in range(100000):
        with lock:
            counter += 1

threads = [threading.Thread(target=increment) for _ in range(2)]
for t in threads: t.start()
for t in threads: t.join()

print(counter)
Python

with lock: のブロックは「同時に入れるのは1スレッドだけ」です。
他のスレッドは、先に入ったスレッドが抜けるまで待ちます。

ただし、ロックを多用しすぎると、

  • コードが読みにくくなる
  • デッドロック(お互いが相手のロックを待ち続けて停止)を起こしやすくなる

などの問題が出てきます。

初心者のうちは、こう考えてください。

共有データを書き換える必要がある設計を避ける。
各スレッドは「自分の仕事の結果(リストや辞書)」だけ返すようにし、最終的なマージはメインスレッドでやる。

これだけで、かなり安全なマルチスレッド運用ができます。

「スレッド毎に自分用の箱を持たせる」発想

さっきの URL 例でいうと、

悪いパターン:全スレッドが同じグローバルリスト results に append する。
良いパターン:各スレッドの結果は return で Future 経由で受け取り、メイン側で results に追加する。

ThreadPoolExecutor を使えば、「結果を Future で受け取る」という形に自然になってくれるので、
グローバル変数に書き込まない設計がしやすくなります。


どこまでマルチスレッドを使うか(初心者のためのガイドライン)

まずは「ThreadPoolExecutor だけ」から始める

低レベルな threading.Thread から書き始めると、
join / lock / 例外処理 などで簡単に混乱します。

初心者向けには、

  • ThreadPoolExecutor でスレッドを管理してもらう
  • submit で関数と引数を投げる
  • result() で結果を受け取る
  • 共有変数を書き換えず、戻り値でやりとりする

というスタイルに統一するのがおすすめです。

「やらないほうが良い」マルチスレッドの使い方

次のようなものは、少なくとも最初は手を出さないほうが安全です。

GUI(Tkinter など)を、スレッドから直接いじる
データベースへの書き込みを、複数スレッドで同じコネクションに対して行う
複雑な状態を持つオブジェクト(クラス)を、複数スレッドから書き換える

これらは一気に難度が跳ね上がります。
まずは「1タスク=1関数」で、外部との接点をファイルや戻り値に限定する形で練習していくと良いです。

「本当にマルチスレッドが必要か」を一度立ち止まって考える

最後に、あえて冷静な話もしておきます。

多くの自動化の現場では、素直なシングルスレッドでも十分間に合うケースが多いです。
無理にマルチスレッドにしてコードを複雑にするより、

  • I/O を減らす工夫(まとめて取得する)
  • キャッシュを使う
  • 処理の順番を見直す

などで解決できることもあります。

「1時間かかっていた処理が、マルチスレッドで 10 分になる」ならやる価値があります。
「5分が 4分半になる」程度なら、シンプルさを優先してもいい、という判断も大事です。


まとめ(「待ち時間を埋めるための並行処理」として使う)

Python のマルチスレッドは、

  • CPU パワーを増やす魔法ではなく、「待ち時間のあいだに別の仕事を進める」仕組み。
  • Web API 呼び出し、スクレイピング、ファイル処理など、I/O がボトルネックの自動化で特に効果を発揮する。
  • ThreadPoolExecutor を使えば、「関数を投げて結果を受け取る」だけのシンプルな形で並行処理を書ける。
  • 共有変数の書き換えを避け、“結果は戻り値で受け取ってメインでまとめる”という設計にすることで、初心者でも比較的安全に扱える。

この軸を忘れずに、「ここは待ち時間が長いからスレッドで回してみようか」と
少しずつ適用範囲を広げていくのがおすすめです。

タイトルとURLをコピーしました