用 asyncio 開發(fā)?
異步編程與傳統(tǒng)的“順序”編程不同。
本頁列出常見的錯誤和陷阱,并解釋如何避免它們。
Debug 模式?
默認(rèn)情況下,asyncio以生產(chǎn)模式運(yùn)行。為了簡化開發(fā),asyncio還有一種*debug 模式* 。
有幾種方法可以啟用異步調(diào)試模式:
將
PYTHONASYNCIODEBUG
環(huán)境變量設(shè)置為1
。使用 Python 開發(fā)模式。
將
debug=True
傳遞給asyncio.run()
。調(diào)用
loop.set_debug()
。
除了啟用調(diào)試模式外,還要考慮:
將 asyncio logger 的日志級別設(shè)置為
logging.DEBUG
,例如,下面的代碼片段可以在應(yīng)用程序啟動時運(yùn)行:logging.basicConfig(level=logging.DEBUG)
配置
warnings
模塊以顯示ResourceWarning
警告。一種方法是使用-W
default
命令行選項。
啟用調(diào)試模式時:
asyncio 檢查 未被等待的協(xié)程 并記錄他們;這將消除“被遺忘的等待”問題。
許多非線程安全的異步 APIs (例如
loop.call_soon()
和loop.call_at()
方法),如果從錯誤的線程調(diào)用,則會引發(fā)異常。如果執(zhí)行I/O操作花費(fèi)的時間太長,則記錄I/O選擇器的執(zhí)行時間。
Callbacks taking longer than 100 milliseconds are logged. The
loop.slow_callback_duration
attribute can be used to set the minimum execution duration in seconds that is considered "slow".
并發(fā)性和多線程?
事件循環(huán)在線程中運(yùn)行(通常是主線程),并在其線程中執(zhí)行所有回調(diào)和任務(wù)。當(dāng)一個任務(wù)在事件循環(huán)中運(yùn)行時,沒有其他任務(wù)可以在同一個線程中運(yùn)行。當(dāng)一個任務(wù)執(zhí)行一個 await
表達(dá)式時,正在運(yùn)行的任務(wù)被掛起,事件循環(huán)執(zhí)行下一個任務(wù)。
要調(diào)度來自另一 OS 線程的 callback,應(yīng)該使用 loop.call_soon_threadsafe()
方法。 例如:
loop.call_soon_threadsafe(callback, *args)
幾乎所有異步對象都不是線程安全的,這通常不是問題,除非在任務(wù)或回調(diào)函數(shù)之外有代碼可以使用它們。如果需要這樣的代碼來調(diào)用低級異步API,應(yīng)該使用 loop.call_soon_threadsafe()
方法,例如:
loop.call_soon_threadsafe(fut.cancel)
要從不同的OS線程調(diào)度一個協(xié)程對象,應(yīng)該使用 run_coroutine_threadsafe()
函數(shù)。它返回一個 concurrent.futures.Future
。查詢結(jié)果:
async def coro_func():
return await asyncio.sleep(1, 42)
# Later in another OS thread:
future = asyncio.run_coroutine_threadsafe(coro_func(), loop)
# Wait for the result:
result = future.result()
為了能夠處理信號和執(zhí)行子進(jìn)程,事件循環(huán)必須運(yùn)行于主線程中。
方法 loop.run_in_executor()
可以和 concurrent.futures.ThreadPoolExecutor
一起使用,用于在一個不同的操作系統(tǒng)線程中執(zhí)行阻塞代碼,并避免阻塞運(yùn)行事件循環(huán)的那個操作系統(tǒng)線程。
目前沒有什么辦法能直接從另一個進(jìn)程 (例如通過 multiprocessing
啟動的進(jìn)程) 安排協(xié)程或回調(diào)。 事件循環(huán)方法 小節(jié)列出了可以從管道讀取并監(jiān)視文件描述符而不會阻塞事件循環(huán)的 API。 此外,asyncio 的 子進(jìn)程 API 提供了一種啟動進(jìn)程并從事件循環(huán)與其通信的辦法。 最后,之前提到的 loop.run_in_executor()
方法也可配合 concurrent.futures.ProcessPoolExecutor
使用以在另一個進(jìn)程中執(zhí)行代碼。
運(yùn)行阻塞的代碼?
不應(yīng)該直接調(diào)用阻塞( CPU 綁定)代碼。例如,如果一個函數(shù)執(zhí)行1秒的 CPU 密集型計算,那么所有并發(fā)異步任務(wù)和 IO 操作都將延遲1秒。
可以用執(zhí)行器在不同的線程甚至不同的進(jìn)程中運(yùn)行任務(wù),以避免使用事件循環(huán)阻塞 OS 線程。 請參閱 loop.run_in_executor()
方法了解詳情。
日志記錄?
asyncio使用 logging
模塊,所有日志記錄都是通過 "asyncio"
logger執(zhí)行的。
默認(rèn)日志級別是 logging.INFO
??梢院苋菀椎卣{(diào)整:
logging.getLogger("asyncio").setLevel(logging.WARNING)
檢測 never-awaited 協(xié)同程序?
當(dāng)協(xié)程函數(shù)被調(diào)用而不是被等待時 (即執(zhí)行 coro()
而不是 await coro()
) 或者協(xié)程沒有通過 asyncio.create_task()
被排入計劃日程,asyncio 將會發(fā)出一條 RuntimeWarning
:
import asyncio
async def test():
print("never scheduled")
async def main():
test()
asyncio.run(main())
輸出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
test()
調(diào)試模式的輸出:
test.py:7: RuntimeWarning: coroutine 'test' was never awaited
Coroutine created at (most recent call last)
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
File "../t.py", line 7, in main
test()
test()
通常的修復(fù)方法是等待協(xié)程或者調(diào)用 asyncio.create_task()
函數(shù):
async def main():
await test()
檢測就再也沒異常?
如果調(diào)用 Future.set_exception()
,但不等待 Future 對象,將異常傳播到用戶代碼。在這種情況下,當(dāng) Future 對象被垃圾收集時,asyncio將發(fā)出一條日志消息。
未處理異常的例子:
import asyncio
async def bug():
raise Exception("not consumed")
async def main():
asyncio.create_task(bug())
asyncio.run(main())
輸出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed')>
Traceback (most recent call last):
File "test.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed
激活調(diào)試模式 以獲取任務(wù)創(chuàng)建處的跟蹤信息:
asyncio.run(main(), debug=True)
調(diào)試模式的輸出:
Task exception was never retrieved
future: <Task finished coro=<bug() done, defined at test.py:3>
exception=Exception('not consumed') created at asyncio/tasks.py:321>
source_traceback: Object created at (most recent call last):
File "../t.py", line 9, in <module>
asyncio.run(main(), debug=True)
< .. >
Traceback (most recent call last):
File "../t.py", line 4, in bug
raise Exception("not consumed")
Exception: not consumed