Python | 自動化:await / async

Python
スポンサーリンク

概要(async / await は「同時進行のための文法」)

async / await は、「Python に“ここは待ち時間だから、その間ほかの仕事をしていいよ”と伝えるための文法」です。

  • async def で「この関数は“同時進行できる特別な関数”だよ」と宣言する
  • await で「ここで一旦手を止めていいから、その間に他の async 処理を進めて」とイベントループに渡す

という役割です。

CPU を速くする魔法ではなく、「待ってる時間をムダにしない」ための仕組みだと思ってください。


まずはイメージから(人間の仕事に例える)

同期処理(いつもの Python)は「1人の事務員」

ふつうの Python コードは、1人の事務員が順番に仕事をこなすイメージです。

1つの API を呼ぶ(結果が返ってくるまでじっと待つ)

終わってから次の API を呼ぶ(また待つ)

…を繰り返す。

待ち時間のあいだ、その人は何もしません。
コード的には「関数呼び出し→終わるまでブロック」です。

async / await は「仕事の区切りで、自分から譲る事務員」

async / await を使うと、その事務員はこう振る舞います。

API を呼ぶ

「結果が返ってくるまで時間かかりそうだから、その間に他の仕事やっていいよ」と await する

結果が返ってきたタイミングで、また自分の仕事に戻る

この「自分から切り替えポイントを明示する」のが await です。
「await できる特別な関数」を async def で宣言します。


async / await の最低限ルール(ここを深掘り)

ルール1:async def の中でしか await は使えない

Python は、

  • 普通の関数:def func(): ...
  • 非同期関数:async def func(): ...

という2種類を持ちます。

await は、必ず async def の中でしか使えません。
外で書くと SyntaxError です。

async def my_sleep():
    await asyncio.sleep(1)   # OK

def normal():
    await asyncio.sleep(1)   # NG(構文エラー)
Python

「待ち時間の譲り合い」ができるのは、非同期関数(コルーチン)だけ、というイメージです。

ルール2:async def を呼んでも「即実行されない」

ここが一番つまずきポイントです。

普通の関数は、func() と呼べばその場で実行されます。
でも、async def で定義した関数を呼ぶと、「コルーチンオブジェクト」が返ってくるだけで、まだ動いていません。

async def f():
    print("実行された")

coro = f()       # ここでは何も表示されない
Python

この coro は「実行予定のタスクの箱」です。
実行するには、

  • await coro とする
  • asyncio.create_task(coro) としてイベントループに渡す

必要があります。

ここが理解できていないと、

「async 関数を呼んだのに動かない」
「警告が出る(coroutine was never awaited)」

という状態になります。

ルール3:await できるのは「await 可能なオブジェクト」だけ

await の右側には、コルーチンや「await 可能なオブジェクト」しか置けません。

代表的なものは、

  • async def で作られた関数を呼んだ結果(コルーチン)
  • asyncio.sleep() などの asyncio 製の非同期関数
  • aiohttp の session.get(...) のような非同期 I/O

です。

例えば、

await 1         # ダメ
await func()    # func が async def で定義されていれば OK
Python

いちばん小さい async / await の動く例

asyncio.sleep だけで「同時進行」を体感する

まだネットワークやファイルは使わず、「待つだけ」で動きを感じてみます。

import asyncio

async def worker(name, delay):
    print(f"{name} 開始")
    await asyncio.sleep(delay)
    print(f"{name} 終了({delay}秒待ち)")

async def main():
    task1 = asyncio.create_task(worker("A", 2))
    task2 = asyncio.create_task(worker("B", 3))

    await task1
    await task2

asyncio.run(main())
Python

流れを丁寧に追います。

async def worker(...)
この関数は「非同期関数」です。中で await を使えます。

await asyncio.sleep(delay)
「delay秒間、何もしないで待つ」処理ですが、普通の time.sleep() と違って、
この間に「他の async タスク」が動けます。

asyncio.create_task(worker("A", 2))
worker(“A”, 2) という仕事をイベントループに登録し、「Task オブジェクト」を返します。
Task になった時点で、イベントループがタイミングを見て実行を始めます。

await task1
task1 が終わるまで待ちます。
ただし、task1 が内部で await している間は、他のタスク(task2 など)が動き続けます。

asyncio.run(main())
イベントループを起動し、main() を走らせるおまじないです。
Python スクリプトの入り口では、基本的にこう書くと思ってOKです。

実行してみると、「A 開始」「B 開始」がすぐ出て、その後「A 終了」「B 終了」と続きます。
A が待っている 2 秒の間に B が進行しているのがポイントです。


実用に近づける:同期 vs 非同期 HTTP を比較する

同期版(requests)での複数リクエスト

まず普通の requests から。

import time
import requests

URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
]

def fetch(url):
    resp = requests.get(url, timeout=10)
    return url, resp.status_code

def main_sync():
    start = time.time()
    results = []
    for url in URLS:
        results.append(fetch(url))
    elapsed = time.time() - start
    print("結果:", results)
    print(f"同期処理 合計: {elapsed:.2f}秒")

if __name__ == "__main__":
    main_sync()
Python

delay/1, 2, 3 を順に呼ぶので、だいたい 1+2+3 秒 ≒ 6 秒くらいかかります。

非同期版(aiohttp)+ async / await で一気に投げる

今度は async / await を使った非同期版。

import asyncio
import time
import aiohttp

URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
]

async def fetch_async(session, url):
    async with session.get(url, timeout=10) as resp:
        await resp.text()  # 中身は捨てる想定
        return url, resp.status

async def main_async():
    start = time.time()
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch_async(session, url)) for url in URLS]
        results = await asyncio.gather(*tasks)
    elapsed = time.time() - start
    print("結果:", results)
    print(f"非同期処理 合計: {elapsed:.2f}秒")

if __name__ == "__main__":
    asyncio.run(main_async())
Python

ここで async / await まわりを整理します。

async def fetch_async(...)
HTTP リクエストという「待ち時間だらけ」の処理を非同期関数にする。

async with session.get(...) as resp
非同期コンテキストマネージャ。with の中で接続を使い、出たら自動で後片付け(接続を解放)してくれます。

await resp.text()
レスポンスボディを非同期に読み込みます。ここが「待ちポイント」なので、読み込み中は他のタスクが動けます。

tasks = [asyncio.create_task(...)]
それぞれの URL に対してタスクを作り、「全部同時に処理していいよ」とイベントループに登録する。

await asyncio.gather(*tasks)
全部のタスクが終わるまで待ち、結果をリストで受け取ります。
待ち時間は「一番遅い URL に合わせた時間」にかなり近づきます。

これが async / await を「何のために使うか」の具体像です。


もう一段深く:async / await を設計するときの考え方

ポイント1:async 関数は「ひとまとまりの非同期タスク」にする

良い形の async 関数は、

  • 引数として必要な情報を受け取り
  • 中で await すべきところを await し
  • 結果を戻り値で返す

という「純粋な処理」です。

例えば、

async def fetch_user(session, user_id):
    url = f"https://api.example.com/users/{user_id}"
    async with session.get(url) as resp:
        resp.raise_for_status()
        return await resp.json()
Python

のように、「1ユーザー情報を取得する」という意味を持ったブロックにします。

その上で、「多人数ぶんを並列に投げる」のは呼び出し側の責任にします。

async def fetch_all_users(session, user_ids):
    tasks = [asyncio.create_task(fetch_user(session, uid)) for uid in user_ids]
    return await asyncio.gather(*tasks)
Python

この分け方を意識すると、コードの見通しが一気によくなります。

ポイント2:非同期の中で「同期のブロッキング処理」を使わない

async def の中で、

  • time.sleep()
  • requests.get()
  • 重い計算で CPU を長時間占有

をそのまま呼ぶと、その間イベントループが止まってしまい、「非同期の意味がなくなる」状態になります。

やるなら、

  • 待ち:await asyncio.sleep() に置き換える
  • HTTP:aiohttp や他の async 対応ライブラリに変える
  • どうしても同期処理が必要:await asyncio.to_thread(sync_func, ...) で別スレッドに逃がす

という風に、「イベントループを止めない」設計をします。

import asyncio
import time

def blocking():
    time.sleep(3)
    return "done"

async def main():
    print("start")
    result = await asyncio.to_thread(blocking)
    print("result:", result)

asyncio.run(main())
Python

「全部 async にする」ことよりも、「イベントループを長時間ブロックしない」ことを優先して考えてください。

ポイント3:エラーも非同期で飛んでくる(gather / task の扱い)

await some_async_func() で例外が起きると、その場で普通に例外として飛んできます。
問題は、asyncio.gathercreate_task を使ったときです。

gather の中のどれかのタスクがエラーになると、デフォルトでは gather 全体が例外を投げます。
他のタスクもキャンセルされるのが基本挙動です。

失敗だけをログに出して処理を続けたいときは、return_exceptions=True を指定します。

results = await asyncio.gather(*tasks, return_exceptions=True)
for r in results:
    if isinstance(r, Exception):
        print("失敗:", r)
    else:
        print("成功:", r)
Python

create_task で作ったタスクは、「どこかで await しないと例外が見えない」問題もあります。
作った Task は必ずどこかで await するか、gather で束ねて await する。
「作りっぱなしにしない」が大事です。


どんなときに async / await を使うべきか

向いている場面

例えば、次のような状況なら async / await が強く効きます。

  • Web API を何十回・何百回も呼ぶ
  • 外部サービスに対する I/O 待ちがボトルネックになっている
  • WebSocket やチャットのように「接続したまま待ち続ける」タイプの処理
  • 同一プロセス内で、たくさんの I/O を効率よく回したい

ここでは、マルチスレッドよりも「どこで待つか」がコードに明示される分、
設計がしやすいことが多いです。

まだ使わなくていい場面

逆に、こういうときはいったん落ち着いてください。

  • HTTP リクエストはせいぜい数回だけ
  • 処理時間のほとんどが CPU 計算や pandas の処理
  • まずはシンプルに動くものを書きたい段階

この場合は、まず同期コード(requests や単純な for ループ)+必要なら concurrent.futures(スレッド/プロセスプール)で十分です。

「本当に I/O の待ち時間がネックになっている」と感じたときが、async / await に進むタイミングとしてちょうどいいです。


まとめ(async / await を「待ち時間の位置をマークする道具」として捉える)

最後に、頭の中に残しておいてほしいポイントを整理します。

async def は「この関数の中には待ちポイント(await)があるよ」と宣言する。
await は「ここで一旦ストップしていいから、その間に他の async 処理を進めて」とイベントループに伝える。
async def を呼んでも即実行されず、コルーチンオブジェクトが返るので、await するか asyncio.create_task でタスクにする。
非同期関数の中では、time.sleeprequests.get のようなブロッキング処理をそのまま呼ばない(イベントループが止まる)。
async / await は「待ち時間が多い I/O 処理を同時進行させるための設計思想」であって、CPU を速くする魔法ではない。

ここを押さえながら、自分の処理を見て、

「どこが“待ち”なのか?」
「そこを async + await にすると、どれだけ得をしそうか?」

を一緒に考えていくと、async / await が「難しい文法」から「賢い待ち方」に変わって見えてきます。

タイトルとURLをコピーしました