Python | 自動化:concurrent.futures

Python
スポンサーリンク

概要(concurrent.futures は「並列処理をいい感じにラップしてくれる道具」)

concurrent.futures は、
「マルチスレッド」「マルチプロセス」を、初心者でも扱いやすい形にしてくれる標準ライブラリです。

やっていることはシンプルで、

  • 並列に走らせたい「仕事」を関数として書く
  • それを ThreadPoolExecutorProcessPoolExecutor に投げる
  • 終わった順/投げた順に結果を受け取る

という形に統一してくれます。

重要なのは、「低レベルな threadingmultiprocessing を直接いじらなくても、多くのケースはこれだけで間に合う」という点です。


2つの Executor(スレッド用とプロセス用)をどう使い分けるか

ThreadPoolExecutor と ProcessPoolExecutor の違い

concurrent.futures には、主に2種類の “Executor(実行者)” がいます。

ThreadPoolExecutor は「スレッドプール」で、I/O が多い処理向き。
ProcessPoolExecutor は「プロセスプール」で、CPU を使いまくる重い処理向き。

ここは、マルチスレッド/マルチプロセスの話と直結します。

ThreadPoolExecutor は、
Web API 呼び出し、スクレイピング、ファイル読み書きなど、「待ち時間が長い処理」の並列実行に向いています。

ProcessPoolExecutor は、
大量の計算・画像解析・重い集計など、「CPU がボトルネック」の処理を速くしたいときに使います。

思考の流れとしては、

「この処理は、待ち時間が多い(ネットワーク・ディスク)か、計算そのものが重いか」
それによって使う Executor を切り替える、というイメージを持っておいてください。


基本パターン1:map で「リストに一気に投げて、一気に結果を受け取る」

例:複数 URL を並列に requests する(ThreadPoolExecutor)

I/O 系の典型例からいきます。
10 個の URL に対して requests.get を投げ、レスポンスコードを取得する処理を考えます。

まずは、順番に処理するシングル版。

import time
import requests

URLS = [
    "https://example.com",
    "https://httpbin.org/delay/1",
] * 5

def fetch_status(url: str) -> int:
    resp = requests.get(url, timeout=5)
    return resp.status_code

def single_thread():
    start = time.time()
    results = [fetch_status(url) for url in URLS]
    elapsed = time.time() - start
    print("結果:", results)
    print(f"シングル合計: {elapsed:.2f}秒")
Python

これを、ThreadPoolExecutor で並列化します。

from concurrent.futures import ThreadPoolExecutor

def multi_thread_map(max_workers=5):
    start = time.time()
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        results = list(executor.map(fetch_status, URLS))
    elapsed = time.time() - start
    print("結果:", results)
    print(f"マルチ(スレッド)合計: {elapsed:.2f}秒")
Python

ここで深掘りしておきたいポイントは3つあります。

1つ目は、executor.map(fetch_status, URLS) の形です。
普通の map(fetch_status, URLS) とほぼ同じ感覚で使えますが、中身は「スレッドプールを使って並列実行」になっています。

2つ目は、戻り値の順番です。
map は「投げた順に結果が返ってくる」ことが保証されます。
URLS の順番と results の順番が対応しているので、後処理がシンプルです。

3つ目は、with ブロックです。
with ThreadPoolExecutor(...) as executor: と書くことで、
使い終わったスレッドをきちんとクリーンアップしてくれます。
終了しないまま残り続ける心配がありません。


基本パターン2:submit + as_completed で「終わった順に処理する」

なぜ「終わった順」が必要になるのか

map は「引数のリストの順番どおり」に結果が返ってきますが、
実際には、処理時間がURLごとにバラバラなことが多いです。

例えば、

https://httpbin.org/delay/5
https://httpbin.org/delay/1

の2つを投げたとき、
先に速いほうの結果を処理したい、という場面があります。

こういうときに使うのが、submitas_completed です。

例:終わった順にログを出す

from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests

URLS = [
    "https://httpbin.org/delay/3",
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
]

def fetch_with_time(url: str) -> tuple[str, float]:
    start = time.time()
    resp = requests.get(url, timeout=10)
    elapsed = time.time() - start
    return url, elapsed

def multi_thread_submit(max_workers=3):
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_to_url = {
            executor.submit(fetch_with_time, url): url for url in URLS
        }

        for future in as_completed(future_to_url):
            url, elapsed = future.result()
            print(f"完了: {url} -> {elapsed:.2f}秒")

if __name__ == "__main__":
    multi_thread_submit()
Python

ここで押さえるべきポイントは次の通りです。

executor.submit(fetch_with_time, url) は、「この関数を別スレッドで実行してくれ」という依頼を出し、その結果を表す Future オブジェクトを返します。
future_to_url という dict で、「どの Future がどの URL に対応しているか」を覚えておきます。
as_completed(future_to_url) は、「どの Future が先に終わるかわからないけど、終わった順に yield してくれる」イテレータです。

これによって、「遅い URL を待たずに、速く終わったものから順にログ出しや後処理ができる」ようになります。


ProcessPoolExecutor で CPU を使う重い処理を並列化する

例:heavy_sum をマルチプロセス化する

CPU がボトルネックの重い処理は、ProcessPoolExecutor を使います。
例として、「0〜N までの二乗の総和」を計算する heavy_sum を使いましょう。

import time

def heavy_sum(n: int) -> int:
    s = 0
    for i in range(n):
        s += i * i
    return s
Python

これをマルチプロセスで並列実行します。

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count

def multi_process_map():
    nums = [10_000_00, 10_000_00, 10_000_00, 10_000_00]
    workers = min(len(nums), cpu_count())

    start = time.time()
    with ProcessPoolExecutor(max_workers=workers) as executor:
        results = list(executor.map(heavy_sum, nums))
    elapsed = time.time() - start

    print("結果:", results)
    print(f"マルチ(プロセス)合計: {elapsed:.2f}秒")

if __name__ == "__main__":
    multi_process_map()
Python

重要なポイントは次の通りです。

関数 heavy_sum は「モジュールのトップレベル」に定義されている必要があります。
ProcessPoolExecutor は内部的に multiprocessing を使っており、プロセス間で関数を渡すために pickle します。
ネスト関数や lambda はそのままでは渡せません。

Windows では、必ず if __name__ == "__main__": ガードの中で Executor を使うようにします。
これは multiprocessing と同じ理由です。

max_workers には cpu_count() をベースにした値を使います。
コア数より極端に大きな値を指定すると、逆に遅くなることがあります。


初心者がハマりがちなポイントを深掘りする

共有変数を書き換えない設計にする

スレッドでもプロセスでも、「共有変数に対する同時書き込み」は地雷です。

concurrent.futures を使うときは、基本的にこう考えてください。

実行関数(fetch_status や heavy_sum)は、
「引数だけを読み」「戻り値として結果を返す」純粋な関数にする。
結果の集約は、Executor を抜けたメインスレッド/メインプロセスで行う。

例えば、悪い例と良い例を比べてみます。

悪い例(グローバルなリスト results に各スレッドから append)
良い例(各スレッド/プロセスの戻り値を executor.map でリストとして受け取り、その結果を元にメイン側で合計などを計算)

この「状態を外に持たない」「戻り値で返す」設計を徹底すると、
ロックやキューに手を出さなくても、かなり安全に並列処理が書けます。

例外処理:Future.result() で例外が再スローされることを知っておく

実行関数の中で例外が起きた場合、その例外は Future の中にキャプチャされ、future.result() を呼んだタイミングで再スローされます。

例えば、次のような関数があるとします。

def may_fail(n: int) -> int:
    if n == 3:
        raise ValueError("3はダメ")
    return n * 2
Python

これを executor.mapsubmit で投げると、3 に対する処理で例外が発生します。

executor.map の場合、map のイテレーション中に例外が飛んできて止まります。
submit + as_completed の場合、future.result() を try/except で囲んでおけば、
「どのタスクが失敗したか」を把握しつつ、他のタスクの結果処理を続けられます。

from concurrent.futures import ThreadPoolExecutor, as_completed

def run_may_fail():
    nums = [1, 2, 3, 4]
    with ThreadPoolExecutor(max_workers=4) as executor:
        futures = {executor.submit(may_fail, n): n for n in nums}
        for future in as_completed(futures):
            n = futures[future]
            try:
                result = future.result()
                print(f"{n} -> {result}")
            except Exception as e:
                print(f"{n} でエラー: {e}")
Python

このように、submit + as_completed のスタイルは、「一部失敗しても全体を回し続けたい」ようなバッチで特に威力を発揮します。

プールサイズ(max_workers)をどう決めるか

ThreadPoolExecutor の場合は、「外部サービスやネットワークに優しい値」を選ぶ意識が重要です。

CPU のコア数より少し多い程度から始めて、
「相手の API のレート制限」「サーバー負荷」「自分の回線」を見ながら調整します。

ProcessPoolExecutor の場合は、「コア数を超えない」ことが基本です。
max_workers=cpu_count() をベースにして、ほどよく調整します。


自動化でどう活かすか(典型パターンの整理)

Web API / スクレイピングの同時実行

ThreadPoolExecutor で URL リストを map に流し込み、
レスポンスを同時並行で取得しつつ、
終わった順に as_completed で結果を処理していく。

このパターンは、あなたがこれまで学んできた「データ収集 BOT」「Web API 集計 BOT」と相性抜群です。

例えば、100 ユーザー分のステータスを API で取るとき、
ユーザーIDリストを ThreadPoolExecutor で回すだけで、全体時間をかなり短縮できます。

重い集計や変換の並列化

大量のファイルやデータチャンクを ProcessPoolExecutor に投げて、
各プロセスで重い集計や前処理をさせる。

例えば、大きな CSV を日付ごとに分割し、
「1日ごとに重い集計」をプロセスプールに投げて同時並行で処理する、
といった構成が典型です。

ここでも、「1チャンクの処理=引数と戻り値だけで完結する関数」にしておくことが、設計の鍵になります。


まとめ(「関数を投げて、Futureから結果を受け取る」感覚を身につける)

concurrent.futures を使うとき、頭の中に置いておいてほしい軸はこうです。

ThreadPoolExecutor は「I/O 待ちの多い処理を並行に」、ProcessPoolExecutor は「CPU を使う重い処理を並列に」。
やりたい仕事は、引数と戻り値だけで完結する“素直な関数”として書き、共有状態を極力持たない。
Executor.map は「投げた順に結果が欲しい」とき、submit+as_completed は「終わった順に処理したい/エラーを個別に扱いたい」ときに使い分ける。
例外は Future の result() 呼び出し時に再スローされるので、必要なら try/except で包んで処理する。

この感覚さえ掴んでしまえば、
これまで書いてきた「シングルスレッドの自動化スクリプト」を、
必要なところだけ気持ちよく並列化できるようになります。

タイトルとURLをコピーしました