Python 3.11で asyncio.Timeout コンテキストマネージャーが追加されました。その使い方と追加された経緯を追ってみます。


この記事は JSL (日本システム技研) | Advent Calendar 2022 1日目 の記事です。

Python 3.11 asyncioの新機能

Python 3.11が2022年10月24日にリリースされました。

Python 3.11は「Faster CPython」プロジェクトが始まり、高速化が話題のバージョンです。(高速化の詳細は↓を参照してください)

大好きな非同期IOの標準ライブラリ、asyncioでもいくつかのアップデートがあります。高レベルAPIの新機能がいくつか追加になっています。気になったのは次のとおりです。

この中で今回は asyncio.Timeout を追ってみたいと思います。

動作確認は2022年12月4日時点で最新のPython 3.11.0です。

asyncio.Timeout とは

ドキュメントを見てみましょう。まずは What’s Newです。
タイムアウトを設定できるAPIのようです。従来のAPIである wait_for() よりこちらが推奨とのこと。

Added timeout(), an asynchronous context manager for setting a timeout on asynchronous operations. For new code this is recommended over using wait_for() directly. (Contributed by Andrew Svetlov in gh-90927.)

次に公式ドキュメントCoroutines and Tasks — Python 3.11.0 documentationです。
delay でタイムアウトの時間を設定ができるようです。

coroutine asyncio.timeout(delay)
An asynchronous context manager that can be used to limit the amount of time spent waiting on something.

delay can either be None, or a float/int number of seconds to wait. If delay is None, no time limit will be applied; this can be useful if the delay is unknown when the context manager is created.

Exampleを動かしてみる

Exampleのコードを動かしてみましょう。

(Example)

async def main():
    try:
        async with asyncio.timeout(10):
            await long_running_task()
    except TimeoutError:
        print("The long operation timed out, but we've handled it.")

    print("This statement will run regardless.")

ちょっと改造します。
long_running_task() コルーチンが定義されていないので、適当に定義してみます。タイムアウトをさせたいので、長く動作するコルーチンにします。 出力結果を見てみましょう。

import asyncio


async def long_running_task() -> bool:
    print("長いコルーチン 開始")
    await asyncio.sleep(36000)
    print("長いコルーチン 終了")
    return True



async def main():
    try:
        async with asyncio.timeout(1):
            await long_running_task()
    except TimeoutError as err:
        print(f"{err=}")
        print("長時間の操作はタイムアウトになったが、対応した。")

    print("この文は関係なく実行されます。")


asyncio.run(main())

処理が途中で中断され、TimeoutError例外が送出されています。

$ python3.11 example_timeout.py
長いコルーチン 開始
err=TimeoutError()
長時間の操作はタイムアウトになったが、対応した。
この文は関係なく実行されます。

例外処理とキャンセル

asyncioを使う動機は「複数の処理を同時に実行して高速化したい!」だと思います。その場合、つきまとう問題が 同時に発生するエラーの補足と他のタスクのキャンセル です。いわばasyncioを利用する上で、必須の機能ではないかと思っています。

Python 3.11では asyncio.Taskgroup という新機能も追加になっており、この 同時に発生するエラーの補足と他のタスクのキャンセル を機能として持っています。 また、この課題の解決に合わせ、新構文「Except*」も追加になっています。
詳細は以下の記事をご参考ください。

asyncio.Taskgroup と同じバージョンである3.11に追加された asyncio.Timeout にもその機能は備わっているのでしょうか。試してみましょう。

asyncio.Taskgroup の場合

まずは asyncio.Taskgroup での例外の捕捉とキャンセルを見てみます。複数のタスクを同時に実行し、同時に発生した例外を新構文「Except*」で捕捉します。

import asyncio

async def coro_success():
    return "成功"

async def coro_value_err():
    raise ValueError

async def coro_type_err():
    raise TypeError

async def long_running_task():
    print("長いコルーチン 開始")
    await asyncio.sleep(3)
    print("完了していないタスクが出力しています")
    return "成功?"

async def main():
    try:
        async with asyncio.TaskGroup() as g:
            task1 = g.create_task(coro_success())
            task2 = g.create_task(coro_value_err())
            task3 = g.create_task(coro_type_err())
            task4 = g.create_task(long_running_task(), name="残るコルーチン")
        results = [task1.result(), task2.result(), task3.result()]
        print(f"{results=}")
    except* Exception as err:
        print(f"{err.exceptions=}")

    print(f"完了していないタスク {task1._state=}")
    print(f"完了していないタスク {task2._state=}")
    print(f"完了していないタスク {task3._state=}")
    print(f"完了していないタスク {task4._state=}")
    await asyncio.sleep(5)  # long_running_taskのprintが出力されないことを確認する
    print("完了")


asyncio.run(main())

これを実行してみましょう。タスクは3つのステータスを持っています。

以下の出力結果から2つのことがわかります。

$ python3.11 sample_taskgroup.py 
長いコルーチン 開始
err.exceptions=(ValueError(), TypeError())
完了していないタスク task1._state='FINISHED'
完了していないタスク task2._state='FINISHED'
完了していないタスク task3._state='FINISHED'
完了していないタスク task4._state='CANCELLED'
完了

asyncio.Timeout の場合

では続いて、 asyncio.Timeout を試してみましょう。 main() を次のように修正します。

async def main():
    try:
        task1 = asyncio.create_task(coro_success())
        task2 = asyncio.create_task(coro_value_err())
        task3 = asyncio.create_task(coro_type_err())
        task4 = asyncio.create_task(long_running_task())
        async with asyncio.timeout(1):
            await task1
            await task2
            await task3
            await task4
    except* Exception as err:
        print(f"{err.exceptions=}")

    print(f"タスクの状態 {task1._state=}")
    print(f"タスクの状態 {task2._state=}")
    print(f"タスクの状態 {task3._state=}")
    print(f"タスクの状態 {task4._state=}")
    await asyncio.sleep(5)

これを実行してみます。

「PENDING」(待機中)のタスクが残り、また同時に発生するエラーが捕捉されていないことが分かります。 機能として、 asyncio.Timeout はキャンセルや複数の例外の捕捉は持っていないようです。

$ python3.11 sample_timeout.py 
長いコルーチン 開始
err.exceptions=(ValueError(),)
タスクの状態 task1._state='FINISHED'
タスクの状態 task2._state='FINISHED'
タスクの状態 task3._state='FINISHED'
タスクの状態 task4._state='PENDING'
完了していないタスクが出力しています
Task exception was never retrieved
future: <Task finished name='Task-4' coro=<coro_type_err() done, defined at ... /sample_timeout.py:10> exception=TypeError()>
Traceback (most recent call last):
  File " ... /sample_timeout.py", line 11, in coro_type_err
    raise TypeError
TypeError

アドベントカレンダーの次の回にて、 asyncio.Timeout の利用方法を考えてみたいと思います。

まとめ

さて、連載は続くのでしょうか。
次回は、なぜ asyncio.Timeout が生まれたのか。既存のAPI1にどんな課題があったのか、を追ってみたいと思います。
どういう使い方を想定しているのか、までたどりつきたいですね。


  1. asyncio.wait_for()asyncio.wait() でもタイムアウトの設定が可能 ↩︎