同步原語(yǔ)?
源代碼: Lib/asyncio/locks.py
asyncio 同步原語(yǔ)被設(shè)計(jì)為與 threading
模塊的類似,但有兩個(gè)關(guān)鍵注意事項(xiàng):
asyncio 原語(yǔ)不是線程安全的,因此它們不應(yīng)被用于 OS 線程同步 (而應(yīng)當(dāng)使用
threading
);這些同步原語(yǔ)的方法不接受 timeout 參數(shù);請(qǐng)使用
asyncio.wait_for()
函數(shù)來(lái)執(zhí)行帶有超時(shí)的操作。
asyncio 具有下列基本同步原語(yǔ):
Lock?
- class asyncio.Lock?
實(shí)現(xiàn)一個(gè)用于 asyncio 任務(wù)的互斥鎖。 非線程安全。
asyncio 鎖可被用來(lái)保證對(duì)共享資源的獨(dú)占訪問(wèn)。
使用 Lock 的推薦方式是通過(guò)
async with
語(yǔ)句:lock = asyncio.Lock() # ... later async with lock: # access shared state
這等價(jià)于:
lock = asyncio.Lock() # ... later await lock.acquire() try: # access shared state finally: lock.release()
在 3.10 版更改: Removed the loop parameter.
- coroutine acquire()?
獲取鎖。
此方法會(huì)等待直至鎖為 unlocked,將其設(shè)為 locked 并返回
True
。當(dāng)有一個(gè)以上的協(xié)程在
acquire()
中被阻塞則會(huì)等待解鎖,最終只有一個(gè)協(xié)程會(huì)被執(zhí)行。鎖的獲取是 公平的: 被執(zhí)行的協(xié)程將是第一個(gè)開(kāi)始等待鎖的協(xié)程。
- release()?
釋放鎖。
當(dāng)鎖為 locked 時(shí),將其設(shè)為 unlocked 并返回。
如果鎖為 unlocked,則會(huì)引發(fā)
RuntimeError
。
- locked()?
如果鎖為 locked 則返回
True
。
事件?
- class asyncio.Event?
事件對(duì)象。 該對(duì)象不是線程安全的。
asyncio 事件可被用來(lái)通知多個(gè) asyncio 任務(wù)已經(jīng)有事件發(fā)生。
Event 對(duì)象會(huì)管理一個(gè)內(nèi)部旗標(biāo),可通過(guò)
set()
方法將其設(shè)為 true 并通過(guò)clear()
方法將其重設(shè)為 false。wait()
方法會(huì)阻塞直至該旗標(biāo)被設(shè)為 true。 該旗標(biāo)初始時(shí)會(huì)被設(shè)為 false。在 3.10 版更改: Removed the loop parameter.
示例:
async def waiter(event): print('waiting for it ...') await event.wait() print('... got it!') async def main(): # Create an Event object. event = asyncio.Event() # Spawn a Task to wait until 'event' is set. waiter_task = asyncio.create_task(waiter(event)) # Sleep for 1 second and set the event. await asyncio.sleep(1) event.set() # Wait until the waiter task is finished. await waiter_task asyncio.run(main())
- set()?
設(shè)置事件。
所有等待事件被設(shè)置的任務(wù)將被立即喚醒。
- is_set()?
如果事件已被設(shè)置則返回
True
。
Condition?
- class asyncio.Condition(lock=None)?
條件對(duì)象。 該對(duì)象不是線程安全的。
asyncio 條件原語(yǔ)可被任務(wù)用于等待某個(gè)事件發(fā)生,然后獲取對(duì)共享資源的獨(dú)占訪問(wèn)。
在本質(zhì)上,Condition 對(duì)象合并了
Event
和Lock
的功能。 多個(gè) Condition 對(duì)象有可能共享一個(gè) Lock,這允許關(guān)注于共享資源的特定狀態(tài)的不同任務(wù)實(shí)現(xiàn)對(duì)共享資源的協(xié)同獨(dú)占訪問(wèn)。可選的 lock 參數(shù)必須為
Lock
對(duì)象或None
。 在后一種情況下會(huì)自動(dòng)創(chuàng)建一個(gè)新的 Lock 對(duì)象。在 3.10 版更改: Removed the loop parameter.
使用 Condition 的推薦方式是通過(guò)
async with
語(yǔ)句:cond = asyncio.Condition() # ... later async with cond: await cond.wait()
這等價(jià)于:
cond = asyncio.Condition() # ... later await cond.acquire() try: await cond.wait() finally: cond.release()
- coroutine acquire()?
獲取下層的鎖。
此方法會(huì)等待直至下層的鎖為 unlocked,將其設(shè)為 locked 并返回 returns
True
。
- notify(n=1)?
喚醒最多 n 個(gè)正在等待此條件的任務(wù)(默認(rèn)為 1 個(gè))。 如果沒(méi)有任務(wù)正在等待則此方法為空操作。
鎖必須在此方法被調(diào)用前被獲取并在隨后被快速釋放。 如果通過(guò)一個(gè) unlocked 鎖調(diào)用則會(huì)引發(fā)
RuntimeError
。
- locked()?
如果下層的鎖已被獲取則返回
True
。
- notify_all()?
喚醒所有正在等待此條件的任務(wù)。
此方法的行為類似于
notify()
,但會(huì)喚醒所有正在等待的任務(wù)。鎖必須在此方法被調(diào)用前被獲取并在隨后被快速釋放。 如果通過(guò)一個(gè) unlocked 鎖調(diào)用則會(huì)引發(fā)
RuntimeError
。
- release()?
釋放下層的鎖。
當(dāng)在未鎖定的鎖上發(fā)起調(diào)用時(shí),會(huì)引發(fā)
RuntimeError
。
- coroutine wait()?
等待直至收到通知。
當(dāng)此方法被調(diào)用時(shí)如果調(diào)用方任務(wù)未獲得鎖,則會(huì)引發(fā)
RuntimeError
。這個(gè)方法會(huì)釋放下層的鎖,然后保持阻塞直到被
notify()
或notify_all()
調(diào)用所喚醒。 一旦被喚醒,Condition 會(huì)重新獲取它的鎖并且此方法將返回True
。
- coroutine wait_for(predicate)?
等待直到目標(biāo)值變?yōu)?true。
目標(biāo)必須為一個(gè)可調(diào)用對(duì)象,其結(jié)果將被解讀為一個(gè)布爾值。 最終的值將為返回值。
Semaphore?
- class asyncio.Semaphore(value=1)?
信號(hào)量對(duì)象。 該對(duì)象不是線程安全的。
信號(hào)量會(huì)管理一個(gè)內(nèi)部計(jì)數(shù)器,該計(jì)數(shù)器會(huì)隨每次
acquire()
調(diào)用遞減并隨每次release()
調(diào)用遞增。 計(jì)數(shù)器的值永遠(yuǎn)不會(huì)降到零以下;當(dāng)acquire()
發(fā)現(xiàn)其值為零時(shí),它將保持阻塞直到有某個(gè)任務(wù)調(diào)用了release()
。可選的 value 參數(shù)用來(lái)為內(nèi)部計(jì)數(shù)器賦初始值 (默認(rèn)值為
1
)。 如果給定的值小于0
則會(huì)引發(fā)ValueError
。在 3.10 版更改: Removed the loop parameter.
使用 Semaphore 的推薦方式是通過(guò)
async with
語(yǔ)句。:sem = asyncio.Semaphore(10) # ... later async with sem: # work with shared resource
這等價(jià)于:
sem = asyncio.Semaphore(10) # ... later await sem.acquire() try: # work with shared resource finally: sem.release()
- coroutine acquire()?
獲取一個(gè)信號(hào)量。
如果內(nèi)部計(jì)數(shù)器的值大于零,則將其減一并立即返回
True
。 如果其值為零,則會(huì)等待直到release()
并調(diào)用并返回True
。
- locked()?
如果信號(hào)量對(duì)象無(wú)法被立即獲取則返回
True
。
- release()?
釋放一個(gè)信號(hào)量對(duì)象,將內(nèi)部計(jì)數(shù)器的值加一。 可以喚醒一個(gè)正在等待獲取信號(hào)量對(duì)象的任務(wù)。
不同于
BoundedSemaphore
,Semaphore
允許執(zhí)行的release()
調(diào)用多于acquire()
調(diào)用。
BoundedSemaphore?
- class asyncio.BoundedSemaphore(value=1)?
綁定的信號(hào)量對(duì)象。 該對(duì)象不是線程安全的。
BoundedSemaphore 是特殊版本的
Semaphore
,如果在release()
中內(nèi)部計(jì)數(shù)器值增加到初始 value 以上它將引發(fā)一個(gè)ValueError
。在 3.10 版更改: Removed the loop parameter.
Barrier?
- class asyncio.Barrier(parties, action=None)?
A barrier object. Not thread-safe.
A barrier is a simple synchronization primitive that allows to block until parties number of tasks are waiting on it. Tasks can wait on the
wait()
method and would be blocked until the specified number of tasks end up waiting onwait()
. At that point all of the waiting tasks would unblock simultaneously.async with
can be used as an alternative to awaiting onwait()
.The barrier can be reused any number of times.
示例:
async def example_barrier(): # barrier with 3 parties b = asyncio.Barrier(3) # create 2 new waiting tasks asyncio.create_task(b.wait()) asyncio.create_task(b.wait()) await asyncio.sleep(0) print(b) # The third .wait() call passes the barrier await b.wait() print(b) print("barrier passed") await asyncio.sleep(0) print(b) asyncio.run(example_barrier())
Result of this example is:
<asyncio.locks.Barrier object at 0x... [filling, waiters:2/3]> <asyncio.locks.Barrier object at 0x... [draining, waiters:0/3]> barrier passed <asyncio.locks.Barrier object at 0x... [filling, waiters:0/3]>
3.11 新版功能.
- coroutine wait()?
Pass the barrier. When all the tasks party to the barrier have called this function, they are all unblocked simultaneously.
When a waiting or blocked task in the barrier is cancelled, this task exits the barrier which stays in the same state. If the state of the barrier is "filling", the number of waiting task decreases by 1.
The return value is an integer in the range of 0 to
parties-1
, different for each task. This can be used to select a task to do some special housekeeping, e.g.:... async with barrier as position: if position == 0: # Only one task print this print('End of *draining phasis*')
This method may raise a
BrokenBarrierError
exception if the barrier is broken or reset while a task is waiting. It could raise aCancelledError
if a task is cancelled.
- coroutine reset()?
Return the barrier to the default, empty state. Any tasks waiting on it will receive the
BrokenBarrierError
exception.If a barrier is broken it may be better to just leave it and create a new one.
- coroutine abort()?
Put the barrier into a broken state. This causes any active or future calls to
wait()
to fail with theBrokenBarrierError
. Use this for example if one of the taks needs to abort, to avoid infinite waiting tasks.
- parties?
The number of tasks required to pass the barrier.
- n_waiting?
The number of tasks currently waiting in the barrier while filling.
- broken?
A boolean that is
True
if the barrier is in the broken state.
- exception asyncio.BrokenBarrierError?
This exception, a subclass of
RuntimeError
, is raised when theBarrier
object is reset or broken.
在 3.9 版更改: 使用 await lock
或 yield from lock
以及/或者 with
語(yǔ)句 (with await lock
, with (yield from lock)
) 來(lái)獲取鎖的操作已被移除。 請(qǐng)改用 async with lock
。