協(xié)程與任務?

本節(jié)將簡述用于協(xié)程與任務的高層級 API。

協(xié)程?

Coroutines declared with the async/await syntax is the preferred way of writing asyncio applications. For example, the following snippet of code prints "hello", waits 1 second, and then prints "world":

>>>
>>> import asyncio

>>> async def main():
...     print('hello')
...     await asyncio.sleep(1)
...     print('world')

>>> asyncio.run(main())
hello
world

注意:簡單地調(diào)用一個協(xié)程并不會使其被調(diào)度執(zhí)行

>>>
>>> main()
<coroutine object main at 0x1053bb7c8>

要真正運行一個協(xié)程,asyncio 提供了三種主要機制:

  • asyncio.run() 函數(shù)用來運行最高層級的入口點 "main()" 函數(shù) (參見上面的示例。)

  • 等待一個協(xié)程。以下代碼段會在等待 1 秒后打印 "hello",然后 再次 等待 2 秒后打印 "world":

    import asyncio
    import time
    
    async def say_after(delay, what):
        await asyncio.sleep(delay)
        print(what)
    
    async def main():
        print(f"started at {time.strftime('%X')}")
    
        await say_after(1, 'hello')
        await say_after(2, 'world')
    
        print(f"finished at {time.strftime('%X')}")
    
    asyncio.run(main())
    

    預期的輸出:

    started at 17:13:52
    hello
    world
    finished at 17:13:55
    
  • asyncio.create_task() 函數(shù)用來并發(fā)運行作為 asyncio 任務 的多個協(xié)程。

    讓我們修改以上示例,并發(fā) 運行兩個 say_after 協(xié)程:

    async def main():
        task1 = asyncio.create_task(
            say_after(1, 'hello'))
    
        task2 = asyncio.create_task(
            say_after(2, 'world'))
    
        print(f"started at {time.strftime('%X')}")
    
        # Wait until both tasks are completed (should take
        # around 2 seconds.)
        await task1
        await task2
    
        print(f"finished at {time.strftime('%X')}")
    

    注意,預期的輸出顯示代碼段的運行時間比之前快了 1 秒:

    started at 17:14:32
    hello
    world
    finished at 17:14:34
    

可等待對象?

如果一個對象可以在 await 語句中使用,那么它就是 可等待 對象。許多 asyncio API 都被設計為接受可等待對象。

可等待 對象有三種主要類型: 協(xié)程, 任務Future.

協(xié)程

Python 協(xié)程屬于 可等待 對象,因此可以在其他協(xié)程中被等待:

import asyncio

async def nested():
    return 42

async def main():
    # Nothing happens if we just call "nested()".
    # A coroutine object is created but not awaited,
    # so it *won't run at all*.
    nested()

    # Let's do it differently now and await it:
    print(await nested())  # will print "42".

asyncio.run(main())

重要

在本文檔中 "協(xié)程" 可用來表示兩個緊密關聯(lián)的概念:

  • 協(xié)程函數(shù): 定義形式為 async def 的函數(shù);

  • 協(xié)程對象: 調(diào)用 協(xié)程函數(shù) 所返回的對象。

任務

任務 被用來“并行的”調(diào)度協(xié)程

當一個協(xié)程通過 asyncio.create_task() 等函數(shù)被封裝為一個 任務,該協(xié)程會被自動調(diào)度執(zhí)行:

import asyncio

async def nested():
    return 42

async def main():
    # Schedule nested() to run soon concurrently
    # with "main()".
    task = asyncio.create_task(nested())

    # "task" can now be used to cancel "nested()", or
    # can simply be awaited to wait until it is complete:
    await task

asyncio.run(main())

Futures

Future 是一種特殊的 低層級 可等待對象,表示一個異步操作的 最終結(jié)果。

當一個 Future 對象 被等待,這意味著協(xié)程將保持等待直到該 Future 對象在其他地方操作完畢。

在 asyncio 中需要 Future 對象以便允許通過 async/await 使用基于回調(diào)的代碼。

通常情況下 沒有必要 在應用層級的代碼中創(chuàng)建 Future 對象。

Future 對象有時會由庫和某些 asyncio API 暴露給用戶,用作可等待對象:

async def main():
    await function_that_returns_a_future_object()

    # this is also valid:
    await asyncio.gather(
        function_that_returns_a_future_object(),
        some_python_coroutine()
    )

一個很好的返回對象的低層級函數(shù)的示例是 loop.run_in_executor()

創(chuàng)建任務?

asyncio.create_task(coro, *, name=None, context=None)?

coro 協(xié)程 封裝為一個 Task 并調(diào)度其執(zhí)行。返回 Task 對象。

name 不為 None,它將使用 Task.set_name() 來設為任務的名稱。

An optional keyword-only context argument allows specifying a custom contextvars.Context for the coro to run in. The current context copy is created when no context is provided.

該任務會在 get_running_loop() 返回的循環(huán)中執(zhí)行,如果當前線程沒有在運行的循環(huán)則會引發(fā) RuntimeError。

重要

Save a reference to the result of this function, to avoid a task disappearing mid execution.

3.7 新版功能.

在 3.8 版更改: Added the name parameter.

在 3.11 版更改: Added the context parameter.

休眠?

coroutine asyncio.sleep(delay, result=None)?

阻塞 delay 指定的秒數(shù)。

如果指定了 result,則當協(xié)程完成時將其返回給調(diào)用者。

sleep() 總是會掛起當前任務,以允許其他任務運行。

將 delay 設為 0 將提供一個經(jīng)優(yōu)化的路徑以允許其他任務運行。 這可供長期間運行的函數(shù)使用以避免在函數(shù)調(diào)用的全過程中阻塞事件循環(huán)。

以下協(xié)程示例運行 5 秒,每秒顯示一次當前日期:

import asyncio
import datetime

async def display_date():
    loop = asyncio.get_running_loop()
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

asyncio.run(display_date())

在 3.10 版更改: Removed the loop parameter.

并發(fā)運行任務?

awaitable asyncio.gather(*aws, return_exceptions=False)?

并發(fā) 運行 aws 序列中的 可等待對象

如果 aws 中的某個可等待對象為協(xié)程,它將自動被作為一個任務調(diào)度。

如果所有可等待對象都成功完成,結(jié)果將是一個由所有返回值聚合而成的列表。結(jié)果值的順序與 aws 中可等待對象的順序一致。

如果 return_exceptionsFalse (默認),所引發(fā)的首個異常會立即傳播給等待 gather() 的任務。aws 序列中的其他可等待對象 不會被取消 并將繼續(xù)運行。

如果 return_exceptionsTrue,異常會和成功的結(jié)果一樣處理,并聚合至結(jié)果列表。

如果 gather() 被取消,所有被提交 (尚未完成) 的可等待對象也會 被取消。

如果 aws 序列中的任一 Task 或 Future 對象 被取消,它將被當作引發(fā)了 CancelledError 一樣處理 -- 在此情況下 gather() 調(diào)用 不會 被取消。這是為了防止一個已提交的 Task/Future 被取消導致其他 Tasks/Future 也被取消。

在 3.10 版更改: Removed the loop parameter.

示例:

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number + 1):
        print(f"Task {name}: Compute factorial({number}), currently i={i}...")
        await asyncio.sleep(1)
        f *= i
    print(f"Task {name}: factorial({number}) = {f}")
    return f

async def main():
    # Schedule three calls *concurrently*:
    L = await asyncio.gather(
        factorial("A", 2),
        factorial("B", 3),
        factorial("C", 4),
    )
    print(L)

asyncio.run(main())

# Expected output:
#
#     Task A: Compute factorial(2), currently i=2...
#     Task B: Compute factorial(3), currently i=2...
#     Task C: Compute factorial(4), currently i=2...
#     Task A: factorial(2) = 2
#     Task B: Compute factorial(3), currently i=3...
#     Task C: Compute factorial(4), currently i=3...
#     Task B: factorial(3) = 6
#     Task C: Compute factorial(4), currently i=4...
#     Task C: factorial(4) = 24
#     [2, 6, 24]

備注

如果 return_exceptions 為 False,則在 gather() 被標記為已完成后取消它將不會取消任何已提交的可等待對象。 例如,在將一個異常傳播給調(diào)用者之后,gather 可被標記為已完成,因此,在從 gather 捕獲一個(由可等待對象所引發(fā)的)異常之后調(diào)用 gather.cancel() 將不會取消任何其他可等待對象。

在 3.7 版更改: 如果 gather 本身被取消,則無論 return_exceptions 取值為何,消息都會被傳播。

在 3.10 版更改: Removed the loop parameter.

3.10 版后已移除: 如果未提供位置參數(shù)或者并非所有位置參數(shù)均為 Future 類對象并且沒有正在運行的事件循環(huán)則會發(fā)出棄用警告。

屏蔽取消操作?

awaitable asyncio.shield(aw)?

保護一個 可等待對象 防止其被 取消。

如果 aw 是一個協(xié)程,它將自動被作為任務調(diào)度。

以下語句:

res = await shield(something())

相當于:

res = await something()

不同之處 在于如果包含它的協(xié)程被取消,在 something() 中運行的任務不會被取消。從 something() 的角度看來,取消操作并沒有發(fā)生。然而其調(diào)用者已被取消,因此 "await" 表達式仍然會引發(fā) CancelledError。

如果通過其他方式取消 something() (例如在其內(nèi)部操作) 則 shield() 也會取消。

如果希望完全忽略取消操作 (不推薦) 則 shield() 函數(shù)需要配合一個 try/except 代碼段,如下所示:

try:
    res = await shield(something())
except CancelledError:
    res = None

在 3.10 版更改: Removed the loop parameter.

3.10 版后已移除: 如果 aw 不是 Future 類對象并且沒有正在運行的事件循環(huán)則會發(fā)出棄用警告。

超時?

coroutine asyncio.wait_for(aw, timeout)?

等待 aw 可等待對象 完成,指定 timeout 秒數(shù)后超時。

如果 aw 是一個協(xié)程,它將自動被作為任務調(diào)度。

timeout 可以為 None,也可以為 float 或 int 型數(shù)值表示的等待秒數(shù)。如果 timeoutNone,則等待直到完成。

If a timeout occurs, it cancels the task and raises TimeoutError.

要避免任務 取消,可以加上 shield()。

此函數(shù)將等待直到 Future 確實被取消,所以總等待時間可能超過 timeout。 如果在取消期間發(fā)生了異常,異常將會被傳播。

如果等待被取消,則 aw 指定的對象也會被取消。

在 3.10 版更改: Removed the loop parameter.

示例:

async def eternity():
    # Sleep for one hour
    await asyncio.sleep(3600)
    print('yay!')

async def main():
    # Wait for at most 1 second
    try:
        await asyncio.wait_for(eternity(), timeout=1.0)
    except TimeoutError:
        print('timeout!')

asyncio.run(main())

# Expected output:
#
#     timeout!

在 3.7 版更改: When aw is cancelled due to a timeout, wait_for waits for aw to be cancelled. Previously, it raised TimeoutError immediately.

在 3.10 版更改: Removed the loop parameter.

簡單等待?

coroutine asyncio.wait(aws, *, timeout=None, return_when=ALL_COMPLETED)?

Run Future and Task instances in the aws iterable concurrently and block until the condition specified by return_when.

aws 可迭代對象必須不為空。

返回兩個 Task/Future 集合: (done, pending)。

用法:

done, pending = await asyncio.wait(aws)

如指定 timeout (float 或 int 類型) 則它將被用于控制返回之前等待的最長秒數(shù)。

Note that this function does not raise TimeoutError. Futures or Tasks that aren't done when the timeout occurs are simply returned in the second set.

return_when 指定此函數(shù)應在何時返回。它必須為以下常數(shù)之一:

常量

描述

FIRST_COMPLETED

函數(shù)將在任意可等待對象結(jié)束或取消時返回。

FIRST_EXCEPTION

函數(shù)將在任意可等待對象因引發(fā)異常而結(jié)束時返回。當沒有引發(fā)任何異常時它就相當于 ALL_COMPLETED。

ALL_COMPLETED

函數(shù)將在所有可等待對象結(jié)束或取消時返回。

wait_for() 不同,wait() 在超時發(fā)生時不會取消可等待對象。

在 3.10 版更改: Removed the loop parameter.

在 3.11 版更改: Passing coroutine objects to wait() directly is forbidden.

asyncio.as_completed(aws, *, timeout=None)?

并發(fā)地運行 aws 可迭代對象中的 可等待對象。 返回一個協(xié)程的迭代器。 所返回的每個協(xié)程可被等待以從剩余的可等待對象的可迭代對象中獲得最早的下一個結(jié)果。

Raises TimeoutError if the timeout occurs before all Futures are done.

在 3.10 版更改: Removed the loop parameter.

示例:

for coro in as_completed(aws):
    earliest_result = await coro
    # ...

在 3.10 版更改: Removed the loop parameter.

3.10 版后已移除: 如果 aws 可迭代對象中的可等待對象不全為 Future 類對象并且沒有正在運行的事件循環(huán)則會發(fā)出棄用警告。

在線程中運行?

coroutine asyncio.to_thread(func, /, *args, **kwargs)?

在不同的線程中異步地運行函數(shù) func

向此函數(shù)提供的任何 *args 和 **kwargs 會被直接傳給 func。 并且,當前 contextvars.Context 會被傳播,允許在不同的線程中訪問來自事件循環(huán)的上下文變量。

返回一個可被等待以獲取 func 的最終結(jié)果的協(xié)程。

This coroutine function is primarily intended to be used for executing IO-bound functions/methods that would otherwise block the event loop if they were run in the main thread. For example:

def blocking_io():
    print(f"start blocking_io at {time.strftime('%X')}")
    # Note that time.sleep() can be replaced with any blocking
    # IO-bound operation, such as file operations.
    time.sleep(1)
    print(f"blocking_io complete at {time.strftime('%X')}")

async def main():
    print(f"started main at {time.strftime('%X')}")

    await asyncio.gather(
        asyncio.to_thread(blocking_io),
        asyncio.sleep(1))

    print(f"finished main at {time.strftime('%X')}")


asyncio.run(main())

# Expected output:
#
# started main at 19:50:53
# start blocking_io at 19:50:53
# blocking_io complete at 19:50:54
# finished main at 19:50:54

在任何協(xié)程中直接調(diào)用 blocking_io() 將會在調(diào)用期間阻塞事件循環(huán),導致額外的 1 秒運行時間。 而通過改用 asyncio.to_thread(),我們可以在不同的線程中運行它從而不會阻塞事件循環(huán)。

備注

由于 GIL 的存在,asyncio.to_thread() 通常只能被用來將 IO 密集型函數(shù)變?yōu)榉亲枞摹?但是,對于會釋放 GIL 的擴展模塊或無此限制的替代性 Python 實現(xiàn)來說,asyncio.to_thread() 也可被用于 CPU 密集型函數(shù)。

3.9 新版功能.

跨線程調(diào)度?

asyncio.run_coroutine_threadsafe(coro, loop)?

向指定事件循環(huán)提交一個協(xié)程。(線程安全)

返回一個 concurrent.futures.Future 以等待來自其他 OS 線程的結(jié)果。

此函數(shù)應該從另一個 OS 線程中調(diào)用,而非事件循環(huán)運行所在線程。示例:

# Create a coroutine
coro = asyncio.sleep(1, result=3)

# Submit the coroutine to a given loop
future = asyncio.run_coroutine_threadsafe(coro, loop)

# Wait for the result with an optional timeout argument
assert future.result(timeout) == 3

如果在協(xié)程內(nèi)產(chǎn)生了異常,將會通知返回的 Future 對象。它也可被用來取消事件循環(huán)中的任務:

try:
    result = future.result(timeout)
except TimeoutError:
    print('The coroutine took too long, cancelling the task...')
    future.cancel()
except Exception as exc:
    print(f'The coroutine raised an exception: {exc!r}')
else:
    print(f'The coroutine returned: {result!r}')

參見 concurrency and multithreading 部分的文檔。

不同與其他 asyncio 函數(shù),此函數(shù)要求顯式地傳入 loop 參數(shù)。

3.5.1 新版功能.

內(nèi)省?

asyncio.current_task(loop=None)?

返回當前運行的 Task 實例,如果沒有正在運行的任務則返回 None。

如果 loopNone 則會使用 get_running_loop() 獲取當前事件循環(huán)。

3.7 新版功能.

asyncio.all_tasks(loop=None)?

返回事件循環(huán)所運行的未完成的 Task 對象的集合。

如果 loopNone,則會使用 get_running_loop() 獲取當前事件循環(huán)。

3.7 新版功能.

Task 對象?

class asyncio.Task(coro, *, loop=None, name=None)?

一個與 Future 類似 的對象,可運行 Python 協(xié)程。非線程安全。

Task 對象被用來在事件循環(huán)中運行協(xié)程。如果一個協(xié)程在等待一個 Future 對象,Task 對象會掛起該協(xié)程的執(zhí)行并等待該 Future 對象完成。當該 Future 對象 完成,被打包的協(xié)程將恢復執(zhí)行。

事件循環(huán)使用協(xié)同日程調(diào)度: 一個事件循環(huán)每次運行一個 Task 對象。而一個 Task 對象會等待一個 Future 對象完成,該事件循環(huán)會運行其他 Task、回調(diào)或執(zhí)行 IO 操作。

使用高層級的 asyncio.create_task() 函數(shù)來創(chuàng)建 Task 對象,也可用低層級的 loop.create_task()ensure_future() 函數(shù)。不建議手動實例化 Task 對象。

要取消一個正在運行的 Task 對象可使用 cancel() 方法。調(diào)用此方法將使該 Task 對象拋出一個 CancelledError 異常給打包的協(xié)程。如果取消期間一個協(xié)程正在等待一個 Future 對象,該 Future 對象也將被取消。

cancelled() 可被用來檢測 Task 對象是否被取消。如果打包的協(xié)程沒有抑制 CancelledError 異常并且確實被取消,該方法將返回 True。

asyncio.TaskFuture 繼承了其除 Future.set_result()Future.set_exception() 以外的所有 API。

Task 對象支持 contextvars 模塊。當一個 Task 對象被創(chuàng)建,它將復制當前上下文,然后在復制的上下文中運行其協(xié)程。

在 3.7 版更改: 加入對 contextvars 模塊的支持。

在 3.8 版更改: Added the name parameter.

3.10 版后已移除: 如果未指定 loop 并且沒有正在運行的事件循環(huán)則會發(fā)出棄用警告。

cancel(msg=None)?

請求取消 Task 對象。

這將安排在下一輪事件循環(huán)中拋出一個 CancelledError 異常給被封包的協(xié)程。

協(xié)程在之后有機會進行清理甚至使用 try ... ... except CancelledError ... finally 代碼塊抑制異常來拒絕請求。不同于 Future.cancel(),Task.cancel() 不保證 Task 會被取消,雖然抑制完全取消并不常見,也很不鼓勵這樣做。

在 3.9 版更改: Added the msg parameter.

Deprecated since version 3.11, will be removed in version 3.14: msg parameter is ambiguous when multiple cancel() are called with different cancellation messages. The argument will be removed.

以下示例演示了協(xié)程是如何偵聽取消請求的:

async def cancel_me():
    print('cancel_me(): before sleep')

    try:
        # Wait for 1 hour
        await asyncio.sleep(3600)
    except asyncio.CancelledError:
        print('cancel_me(): cancel sleep')
        raise
    finally:
        print('cancel_me(): after sleep')

async def main():
    # Create a "cancel_me" Task
    task = asyncio.create_task(cancel_me())

    # Wait for 1 second
    await asyncio.sleep(1)

    task.cancel()
    try:
        await task
    except asyncio.CancelledError:
        print("main(): cancel_me is cancelled now")

asyncio.run(main())

# Expected output:
#
#     cancel_me(): before sleep
#     cancel_me(): cancel sleep
#     cancel_me(): after sleep
#     main(): cancel_me is cancelled now
cancelled()?

如果 Task 對象 被取消 則返回 True

當使用 cancel() 發(fā)出取消請求時 Task 會被 取消,其封包的協(xié)程將傳播被拋入的 CancelledError 異常。

done()?

如果 Task 對象 已完成 則返回 True。

當 Task 所封包的協(xié)程返回一個值、引發(fā)一個異?;?Task 本身被取消時,則會被認為 已完成。

result()?

返回 Task 的結(jié)果。

如果 Task 對象 已完成,其封包的協(xié)程的結(jié)果會被返回 (或者當協(xié)程引發(fā)異常時,該異常會被重新引發(fā)。)

如果 Task 對象 被取消,此方法會引發(fā)一個 CancelledError 異常。

如果 Task 對象的結(jié)果還不可用,此方法會引發(fā)一個 InvalidStateError 異常。

exception()?

返回 Task 對象的異常。

如果所封包的協(xié)程引發(fā)了一個異常,該異常將被返回。如果所封包的協(xié)程正常返回則該方法將返回 None。

如果 Task 對象 被取消,此方法會引發(fā)一個 CancelledError 異常。

如果 Task 對象尚未 完成,此方法將引發(fā)一個 InvalidStateError 異常。

add_done_callback(callback, *, context=None)?

添加一個回調(diào),將在 Task 對象 完成 時被運行。

此方法應該僅在低層級的基于回調(diào)的代碼中使用。

要了解更多細節(jié)請查看 Future.add_done_callback() 的文檔。

remove_done_callback(callback)?

從回調(diào)列表中移除 callback 。

此方法應該僅在低層級的基于回調(diào)的代碼中使用。

要了解更多細節(jié)請查看 Future.remove_done_callback() 的文檔。

get_stack(*, limit=None)?

返回此 Task 對象的棧框架列表。

如果所封包的協(xié)程未完成,這將返回其掛起所在的棧。如果協(xié)程已成功完成或被取消,這將返回一個空列表。如果協(xié)程被一個異常終止,這將返回回溯框架列表。

框架總是從按從舊到新排序。

每個被掛起的協(xié)程只返回一個??蚣?。

可選的 limit 參數(shù)指定返回框架的數(shù)量上限;默認返回所有框架。返回列表的順序要看是返回一個棧還是一個回溯:棧返回最新的框架,回溯返回最舊的框架。(這與 traceback 模塊的行為保持一致。)

print_stack(*, limit=None, file=None)?

打印此 Task 對象的?;蚧厮?。

此方法產(chǎn)生的輸出類似于 traceback 模塊通過 get_stack() 所獲取的框架。

limit 參數(shù)會直接傳遞給 get_stack()。

file 參數(shù)是輸出所寫入的 I/O 流;默認情況下輸出會寫入 sys.stderr。

get_coro()?

返回由 Task 包裝的協(xié)程對象。

3.8 新版功能.

get_name()?

返回 Task 的名稱。

如果沒有一個 Task 名稱被顯式地賦值,默認的 asyncio Task 實現(xiàn)會在實例化期間生成一個默認名稱。

3.8 新版功能.

set_name(value)?

設置 Task 的名稱。

value 參數(shù)可以為任意對象,它隨后會被轉(zhuǎn)換為字符串。

在默認的 Task 實現(xiàn)中,名稱將在任務對象的 repr() 輸出中可見。

3.8 新版功能.