Python | 自動化:aiohttp

Python
スポンサーリンク

概要(aiohttp は「非同期版 requests」だと思うと入りやすい)

aiohttp は、
「asyncio と一緒に使うための HTTP クライアントライブラリ」です。

雑に言うと、

  • requests …… 普通の(同期)HTTP クライアント
  • aiohttp …… 非同期(asyncio用)HTTP クライアント

という位置づけです。

だから、用途もイメージも requests とほぼ同じです。
違うのは「await で待つ前提になっている」ことと、「大量のリクエストを同時にさばける力が強い」ことです。

ここでは、

  • aiohttp の基本の書き方
  • 非同期ならではの「同時リクエスト」のメリット
  • 実務で大事になる「セッション」「エラー処理」「同時接続数の制御」

を、順番にかみ砕いていきます。


基本の使い方(requests との違いを対比で掴む)

requests の超基本と、aiohttp 版の対応関係

まず頭に requests の形を置いておくと、aiohttp を理解しやすくなります。

requests のシンプルな GET はこうです。

import requests

resp = requests.get("https://httpbin.org/get", timeout=10)
print(resp.status_code)
print(resp.text)
Python

これを aiohttp で書き直すと、こうなります。

import aiohttp
import asyncio

async def main():
    async with aiohttp.ClientSession() as session:
        async with session.get("https://httpbin.org/get", timeout=10) as resp:
            print(resp.status)
            text = await resp.text()
            print(text)

asyncio.run(main())
Python

重要な違いを整理します。

一つ目は、「関数が async def になっていること」です。
aiohttp は基本的に全部「await 前提」なので、async 関数の中で使います。

二つ目は、ClientSession を async with で作っていることです。
requests でも Session はありますが、aiohttp ではセッションが特に重要になります。
「接続を再利用する」「Cookie やヘッダーをまとめて持つ」などの役割です。

三つ目は、レスポンスの中身を取るときに await resp.text() のように await が必要なことです。
レスポンスボディの読み込みも非同期 I/O だからです。

この3つさえまず押さえれば、あとはパターンのバリエーションです。


単発リクエストから「大量同時リクエスト」までの流れ

まずは「1件だけ取る」をちゃんと書いてみる

超ミニマムなサンプルをもう一度、コメント付きで。

import aiohttp
import asyncio

async def fetch_once():
    url = "https://httpbin.org/get"

    async with aiohttp.ClientSession() as session:
        async with session.get(url, timeout=10) as resp:
            print("ステータス:", resp.status)
            text = await resp.text()
            print("内容の一部:", text[:100])

asyncio.run(fetch_once())
Python

ここでのポイントは、

  • ClientSession() で「HTTP の窓口」を開く
  • session.get(...) でリクエストを出す
  • await resp.text() でボディを非同期に読む

という流れです。「requests とほぼ同じだけど、async/await にした」とイメージしてください。

複数 URL を「順番に」叩く非同期コード(あえて非効率)

次に、あえて「非同期だけど順番」のコードを書きます。

import aiohttp
import asyncio
import time

URLS = [
    "https://httpbin.org/delay/1",
    "https://httpbin.org/delay/2",
    "https://httpbin.org/delay/3",
]

async def fetch(session, url):
    async with session.get(url, timeout=10) as resp:
        await resp.text()
        return url, resp.status

async def main_sequential():
    start = time.time()

    async with aiohttp.ClientSession() as session:
        results = []
        for url in URLS:
            result = await fetch(session, url)
            results.append(result)

    elapsed = time.time() - start
    print("結果:", results)
    print(f"順番実行 合計: {elapsed:.2f}秒")

asyncio.run(main_sequential())
Python

delay/1, 2, 3 を順番に叩いているので、合計時間は単純に 1+2+3 程度になります。
これは「非同期だけど、await で毎回待ち切ってから次に行っている」からです。

本領発揮:複数 URL を「同時に」叩く

ここでいよいよ、非同期の強みを使います。

async def main_concurrent():
    start = time.time()

    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch(session, url)) for url in URLS]
        results = await asyncio.gather(*tasks)

    elapsed = time.time() - start
    print("結果:", results)
    print(f"同時実行 合計: {elapsed:.2f}秒")

asyncio.run(main_concurrent())
Python

考え方としては、

  • fetch は「URLごとの仕事」を表す async 関数
  • asyncio.create_task(fetch(...)) で、「この仕事を並列メニューに登録」
  • asyncio.gather(*tasks) で、登録済みの全部が終わるまで待ち、結果をまとめて受け取る

delay/1, 2, 3 が「ほぼ同じタイミングで投げられる」ので、合計時間は最大の遅延(3秒)+ちょっと、くらいになります。

これが aiohttp を使う最大の理由です。
大量の API を「合計時間をほぼ最大遅延に近づける形」で取れる。


JSON API を想定した、実務寄りの例

JSON レスポンスを扱う基本形

多くの Web API は JSON を返します。
aiohttp では await resp.json() で dict/list に変換できます。

import aiohttp
import asyncio

async def fetch_json(session, url):
    async with session.get(url, timeout=10) as resp:
        resp.raise_for_status()
        data = await resp.json()
        return data

async def main():
    url = "https://httpbin.org/json"
    async with aiohttp.ClientSession() as session:
        data = await fetch_json(session, url)
        print(type(data))
        print(list(data.keys()))

asyncio.run(main())
Python

ここで重要なのは resp.raise_for_status() です。
ステータスコードが 4xx / 5xx の場合に例外を投げ、エラーとして扱えます。
大量リクエストを飛ばす BOT を作るときには、「失敗も見逃さない」設計がかなり大切になってきます。

パラメータ付きの GET と、ヘッダー・認証情報

requests と同様に、クエリパラメータやヘッダー、トークンも渡せます。

async def fetch_with_params(session, base_url, params, token):
    headers = {"Authorization": f"Bearer {token}"}
    async with session.get(base_url, params=params, headers=headers, timeout=10) as resp:
        resp.raise_for_status()
        return await resp.json()
Python

このようにしておくと、

  • URL は同じだけど、日付だけ変えたい
  • 同じ API をユーザーごとに別トークンで叩きたい

といった場合に、「引数の違いだけでタスクを量産」できます。
それを asyncio.create_task で投げ、gather で待つ、というのが aiohttp を使った API BOT の基本パターンです。


重要ポイントの深掘り1:ClientSession と接続の再利用

なぜ毎回 ClientSession() を作らない方がいいのか

間違った使い方としてよく見かけるのが、
fetch の中で毎回 aiohttp.ClientSession() を作ってしまうパターンです。

# 悪い例
async def bad_fetch(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            ...
Python

これだと、リクエストごとにセッション・接続を作って→閉じて、を繰り返します。
HTTP 的には毎回フルコストで接続しにいくので遅くなりますし、サーバーにも優しくありません。

良いパターンは、「外側で1つのセッションを作り、その中で複数のリクエストを投げる」形です。

async def good_main(urls):
    async with aiohttp.ClientSession() as session:
        tasks = [asyncio.create_task(fetch(session, url)) for url in urls]
        results = await asyncio.gather(*tasks)
        return results
Python

セッションは「TCP 接続をプーリングして再利用する」「Cookie や認証情報を持つ」役割があるので、使い回したほうが効率的です。

タイムアウトとコネクション数の制御(負荷のコントロール)

大量にリクエストを飛ばすときに大事なのが、「一度に何本まで接続するか」です。
何百・何千と同時接続すると、相手にも自分にも負担になります。

aiohttp の TCPConnector を使うと、セッション生成時に接続数を制限できます。

conn = aiohttp.TCPConnector(limit=10)  # 同時接続最大10
async with aiohttp.ClientSession(connector=conn) as session:
    ...
Python

さらに厳密に制御したい場合は、asyncio.Semaphore で「同時実行タスク数」を制限するやり方もあります。

sem = asyncio.Semaphore(10)

async def limited_fetch(session, url):
    async with sem:
        return await fetch(session, url)
Python

こうすることで、「全体のタスクは100件あっても、同時に動くのは10件まで」というような制御ができます。
データ収集 BOT や Web API 集計 BOT を本番で回すなら、ここは意識しておきたいポイントです。


重要ポイントの深掘り2:エラー処理と再試行(リトライ)

一時的な失敗にどう対応するか

ネットワークは必ず落ちます。必ずです。
一時的なタイムアウトや 5xx エラーは普通に起こるので、リトライを考えておく必要があります。

シンプルなリトライ付き fetch の例を見てみます。

import asyncio
import aiohttp

async def fetch_with_retry(session, url, retries=3, delay=2):
    for i in range(1, retries + 1):
        try:
            async with session.get(url, timeout=10) as resp:
                resp.raise_for_status()
                return await resp.text()
        except (aiohttp.ClientError, asyncio.TimeoutError) as e:
            print(f"[{i}/{retries}] エラー: {e} ({url})")
            if i == retries:
                raise
            await asyncio.sleep(delay)
Python

ここで重要なのは、

  • aiohttp.ClientErrorasyncio.TimeoutError をまとめて扱っている
  • 最後の試行でも失敗したら例外をそのまま投げる(→ gather 側で検知できる)
  • リトライ間隔に await asyncio.sleep() を使っている(time.sleep ではない)

という点です。

これを大量タスクに埋め込むことで、「たまたまネットワークが不安定だっただけで全体の BOT が落ちる」現象を減らせます。

gather とエラーの組み合わせ(どこまで止めるか)

asyncio.gather は、デフォルトだと「中の1つが例外を出したときに、その例外を投げて止まる」動作をします。

他のタスクの結果も知りたい場合や、「失敗したURLだけログしておきたい」場合は、return_exceptions=True を指定します。

results = await asyncio.gather(*tasks, return_exceptions=True)
for url, result in zip(URLS, results):
    if isinstance(result, Exception):
        print(f"失敗: {url} -> {result}")
    else:
        print(f"成功: {url} -> {result[:50]}")
Python

エラーと成功を一緒に扱うか、エラーが出た時点で全体を落とすかは、
システムの性質や運用方針に依存するので、ここは最初に決めておくと設計がブレません。


aiohttp を使うべき場面、使わなくていい場面

aiohttp が「ドンピシャ」でハマるケース

例えば次のような状況なら、aiohttp に踏み込む価値が大きいです。

  • Web API を何百/何千回も呼ぶ必要がある
  • レスポンスを待っている時間が長く、CPU はほとんど使っていない
  • 並行でリクエストを投げないと、処理が現実的な時間に収まらない
  • サーバー側のレート制限を意識しながら、効率よくリクエストしたい

ここでは「async + aiohttp」が、マルチスレッドよりもコードの見通しが良くなりやすいです。

逆に「とりあえず requests で十分」なケース

一方、次のような場面では、無理に aiohttp にしなくてもいいことが多いです。

  • せいぜい 5〜10 回くらいしか HTTP リクエストしない
  • 人が手でトリガーするツールで、多少の待ち時間はそこまで問題にならない
  • 非同期プログラミング自体がまだ慣れていなくて、まずはシンプルに書きたい

この場合、まずは requests でシンプルなコードを書き、「本当に遅くて困る」と感じてから aiohttp と asyncio に進むほうが、学びの順序として健全です。


まとめ(aiohttp は「asyncio のための HTTP エンジン」)

最後に、aiohttp を自分のものにするうえで大事なポイントを整理します。

aiohttp は「非同期(asyncio)前提の HTTP クライアント」で、「非同期版 requests」と捉えると理解しやすい。
async with ClientSession() の中でリクエストを投げ、await resp.text()await resp.json() でボディを読む。
大量のリクエストを扱うときは、asyncio.create_taskasyncio.gather で「同時に投げて同時に待つ」スタイルが真価を発揮する。
セッションの再利用・接続数の制限・タイムアウト・リトライ・エラー処理を意識すると、実務で通用する「Web API BOT」の土台になる。
「本当に非同期が必要な場面か?」を考えたうえで、requests からのステップアップとして採用するのが良い。

タイトルとURLをコピーしました