概要(concurrent.futures は「並列処理をいい感じにラップしてくれる道具」)
concurrent.futures は、
「マルチスレッド」「マルチプロセス」を、初心者でも扱いやすい形にしてくれる標準ライブラリです。
やっていることはシンプルで、
- 並列に走らせたい「仕事」を関数として書く
- それを
ThreadPoolExecutorかProcessPoolExecutorに投げる - 終わった順/投げた順に結果を受け取る
という形に統一してくれます。
重要なのは、「低レベルな threading や multiprocessing を直接いじらなくても、多くのケースはこれだけで間に合う」という点です。
2つの Executor(スレッド用とプロセス用)をどう使い分けるか
ThreadPoolExecutor と ProcessPoolExecutor の違い
concurrent.futures には、主に2種類の “Executor(実行者)” がいます。
ThreadPoolExecutor は「スレッドプール」で、I/O が多い処理向き。ProcessPoolExecutor は「プロセスプール」で、CPU を使いまくる重い処理向き。
ここは、マルチスレッド/マルチプロセスの話と直結します。
ThreadPoolExecutor は、
Web API 呼び出し、スクレイピング、ファイル読み書きなど、「待ち時間が長い処理」の並列実行に向いています。
ProcessPoolExecutor は、
大量の計算・画像解析・重い集計など、「CPU がボトルネック」の処理を速くしたいときに使います。
思考の流れとしては、
「この処理は、待ち時間が多い(ネットワーク・ディスク)か、計算そのものが重いか」
それによって使う Executor を切り替える、というイメージを持っておいてください。
基本パターン1:map で「リストに一気に投げて、一気に結果を受け取る」
例:複数 URL を並列に requests する(ThreadPoolExecutor)
I/O 系の典型例からいきます。
10 個の URL に対して requests.get を投げ、レスポンスコードを取得する処理を考えます。
まずは、順番に処理するシングル版。
import time
import requests
URLS = [
"https://example.com",
"https://httpbin.org/delay/1",
] * 5
def fetch_status(url: str) -> int:
resp = requests.get(url, timeout=5)
return resp.status_code
def single_thread():
start = time.time()
results = [fetch_status(url) for url in URLS]
elapsed = time.time() - start
print("結果:", results)
print(f"シングル合計: {elapsed:.2f}秒")
Pythonこれを、ThreadPoolExecutor で並列化します。
from concurrent.futures import ThreadPoolExecutor
def multi_thread_map(max_workers=5):
start = time.time()
with ThreadPoolExecutor(max_workers=max_workers) as executor:
results = list(executor.map(fetch_status, URLS))
elapsed = time.time() - start
print("結果:", results)
print(f"マルチ(スレッド)合計: {elapsed:.2f}秒")
Pythonここで深掘りしておきたいポイントは3つあります。
1つ目は、executor.map(fetch_status, URLS) の形です。
普通の map(fetch_status, URLS) とほぼ同じ感覚で使えますが、中身は「スレッドプールを使って並列実行」になっています。
2つ目は、戻り値の順番です。
map は「投げた順に結果が返ってくる」ことが保証されます。
URLS の順番と results の順番が対応しているので、後処理がシンプルです。
3つ目は、with ブロックです。with ThreadPoolExecutor(...) as executor: と書くことで、
使い終わったスレッドをきちんとクリーンアップしてくれます。
終了しないまま残り続ける心配がありません。
基本パターン2:submit + as_completed で「終わった順に処理する」
なぜ「終わった順」が必要になるのか
map は「引数のリストの順番どおり」に結果が返ってきますが、
実際には、処理時間がURLごとにバラバラなことが多いです。
例えば、
https://httpbin.org/delay/5
https://httpbin.org/delay/1
の2つを投げたとき、
先に速いほうの結果を処理したい、という場面があります。
こういうときに使うのが、submit + as_completed です。
例:終わった順にログを出す
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import requests
URLS = [
"https://httpbin.org/delay/3",
"https://httpbin.org/delay/1",
"https://httpbin.org/delay/2",
]
def fetch_with_time(url: str) -> tuple[str, float]:
start = time.time()
resp = requests.get(url, timeout=10)
elapsed = time.time() - start
return url, elapsed
def multi_thread_submit(max_workers=3):
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_to_url = {
executor.submit(fetch_with_time, url): url for url in URLS
}
for future in as_completed(future_to_url):
url, elapsed = future.result()
print(f"完了: {url} -> {elapsed:.2f}秒")
if __name__ == "__main__":
multi_thread_submit()
Pythonここで押さえるべきポイントは次の通りです。
executor.submit(fetch_with_time, url) は、「この関数を別スレッドで実行してくれ」という依頼を出し、その結果を表す Future オブジェクトを返します。future_to_url という dict で、「どの Future がどの URL に対応しているか」を覚えておきます。as_completed(future_to_url) は、「どの Future が先に終わるかわからないけど、終わった順に yield してくれる」イテレータです。
これによって、「遅い URL を待たずに、速く終わったものから順にログ出しや後処理ができる」ようになります。
ProcessPoolExecutor で CPU を使う重い処理を並列化する
例:heavy_sum をマルチプロセス化する
CPU がボトルネックの重い処理は、ProcessPoolExecutor を使います。
例として、「0〜N までの二乗の総和」を計算する heavy_sum を使いましょう。
import time
def heavy_sum(n: int) -> int:
s = 0
for i in range(n):
s += i * i
return s
Pythonこれをマルチプロセスで並列実行します。
from concurrent.futures import ProcessPoolExecutor
from multiprocessing import cpu_count
def multi_process_map():
nums = [10_000_00, 10_000_00, 10_000_00, 10_000_00]
workers = min(len(nums), cpu_count())
start = time.time()
with ProcessPoolExecutor(max_workers=workers) as executor:
results = list(executor.map(heavy_sum, nums))
elapsed = time.time() - start
print("結果:", results)
print(f"マルチ(プロセス)合計: {elapsed:.2f}秒")
if __name__ == "__main__":
multi_process_map()
Python重要なポイントは次の通りです。
関数 heavy_sum は「モジュールのトップレベル」に定義されている必要があります。
ProcessPoolExecutor は内部的に multiprocessing を使っており、プロセス間で関数を渡すために pickle します。
ネスト関数や lambda はそのままでは渡せません。
Windows では、必ず if __name__ == "__main__": ガードの中で Executor を使うようにします。
これは multiprocessing と同じ理由です。
max_workers には cpu_count() をベースにした値を使います。
コア数より極端に大きな値を指定すると、逆に遅くなることがあります。
初心者がハマりがちなポイントを深掘りする
共有変数を書き換えない設計にする
スレッドでもプロセスでも、「共有変数に対する同時書き込み」は地雷です。
concurrent.futures を使うときは、基本的にこう考えてください。
実行関数(fetch_status や heavy_sum)は、
「引数だけを読み」「戻り値として結果を返す」純粋な関数にする。
結果の集約は、Executor を抜けたメインスレッド/メインプロセスで行う。
例えば、悪い例と良い例を比べてみます。
悪い例(グローバルなリスト results に各スレッドから append)
良い例(各スレッド/プロセスの戻り値を executor.map でリストとして受け取り、その結果を元にメイン側で合計などを計算)
この「状態を外に持たない」「戻り値で返す」設計を徹底すると、
ロックやキューに手を出さなくても、かなり安全に並列処理が書けます。
例外処理:Future.result() で例外が再スローされることを知っておく
実行関数の中で例外が起きた場合、その例外は Future の中にキャプチャされ、future.result() を呼んだタイミングで再スローされます。
例えば、次のような関数があるとします。
def may_fail(n: int) -> int:
if n == 3:
raise ValueError("3はダメ")
return n * 2
Pythonこれを executor.map や submit で投げると、3 に対する処理で例外が発生します。
executor.map の場合、map のイテレーション中に例外が飛んできて止まります。submit + as_completed の場合、future.result() を try/except で囲んでおけば、
「どのタスクが失敗したか」を把握しつつ、他のタスクの結果処理を続けられます。
from concurrent.futures import ThreadPoolExecutor, as_completed
def run_may_fail():
nums = [1, 2, 3, 4]
with ThreadPoolExecutor(max_workers=4) as executor:
futures = {executor.submit(may_fail, n): n for n in nums}
for future in as_completed(futures):
n = futures[future]
try:
result = future.result()
print(f"{n} -> {result}")
except Exception as e:
print(f"{n} でエラー: {e}")
Pythonこのように、submit + as_completed のスタイルは、「一部失敗しても全体を回し続けたい」ようなバッチで特に威力を発揮します。
プールサイズ(max_workers)をどう決めるか
ThreadPoolExecutor の場合は、「外部サービスやネットワークに優しい値」を選ぶ意識が重要です。
CPU のコア数より少し多い程度から始めて、
「相手の API のレート制限」「サーバー負荷」「自分の回線」を見ながら調整します。
ProcessPoolExecutor の場合は、「コア数を超えない」ことが基本です。max_workers=cpu_count() をベースにして、ほどよく調整します。
自動化でどう活かすか(典型パターンの整理)
Web API / スクレイピングの同時実行
ThreadPoolExecutor で URL リストを map に流し込み、
レスポンスを同時並行で取得しつつ、
終わった順に as_completed で結果を処理していく。
このパターンは、あなたがこれまで学んできた「データ収集 BOT」「Web API 集計 BOT」と相性抜群です。
例えば、100 ユーザー分のステータスを API で取るとき、
ユーザーIDリストを ThreadPoolExecutor で回すだけで、全体時間をかなり短縮できます。
重い集計や変換の並列化
大量のファイルやデータチャンクを ProcessPoolExecutor に投げて、
各プロセスで重い集計や前処理をさせる。
例えば、大きな CSV を日付ごとに分割し、
「1日ごとに重い集計」をプロセスプールに投げて同時並行で処理する、
といった構成が典型です。
ここでも、「1チャンクの処理=引数と戻り値だけで完結する関数」にしておくことが、設計の鍵になります。
まとめ(「関数を投げて、Futureから結果を受け取る」感覚を身につける)
concurrent.futures を使うとき、頭の中に置いておいてほしい軸はこうです。
ThreadPoolExecutor は「I/O 待ちの多い処理を並行に」、ProcessPoolExecutor は「CPU を使う重い処理を並列に」。
やりたい仕事は、引数と戻り値だけで完結する“素直な関数”として書き、共有状態を極力持たない。
Executor.map は「投げた順に結果が欲しい」とき、submit+as_completed は「終わった順に処理したい/エラーを個別に扱いたい」ときに使い分ける。
例外は Future の result() 呼び出し時に再スローされるので、必要なら try/except で包んで処理する。
この感覚さえ掴んでしまえば、
これまで書いてきた「シングルスレッドの自動化スクリプト」を、
必要なところだけ気持ちよく並列化できるようになります。
