隊(duì)列集?
asyncio 隊(duì)列被設(shè)計(jì)成與 queue
模塊類似。盡管 asyncio隊(duì)列不是線程安全的,但是他們是被設(shè)計(jì)專用于 async/await 代碼。
注意asyncio 的隊(duì)列沒有 timeout 形參;請使用 asyncio.wait_for()
函數(shù)為隊(duì)列添加超時操作。
參見下面的 Examples 部分。
Queue?
- class asyncio.Queue(maxsize=0)?
先進(jìn),先出(FIFO)隊(duì)列
如果 maxsize 小于等于零,則隊(duì)列尺寸是無限的。如果是大于
0
的整數(shù),則當(dāng)隊(duì)列達(dá)到 maxsize 時,await put()
將阻塞至某個元素被get()
取出。不像標(biāo)準(zhǔn)庫中的并發(fā)型
queue
,隊(duì)列的尺寸一直是已知的,可以通過調(diào)用qsize()
方法返回。在 3.10 版更改: Removed the loop parameter.
這個類不是線程安全的(not thread safe)。
- maxsize?
隊(duì)列中可存放的元素數(shù)量。
- empty()?
如果隊(duì)列為空返回
True
,否則返回False
。
- full()?
如果有
maxsize
個條目在隊(duì)列中,則返回True
。如果隊(duì)列用
maxsize=0
(默認(rèn))初始化,則full()
永遠(yuǎn)不會返回True
。
- coroutine get()?
從隊(duì)列中刪除并返回一個元素。如果隊(duì)列為空,則等待,直到隊(duì)列中有元素。
- get_nowait()?
立即返回一個隊(duì)列中的元素,如果隊(duì)列內(nèi)有值,否則引發(fā)異常
QueueEmpty
。
- coroutine join()?
阻塞至隊(duì)列中所有的元素都被接收和處理完畢。
當(dāng)條目添加到隊(duì)列的時候,未完成任務(wù)的計(jì)數(shù)就會增加。每當(dāng)消費(fèi)協(xié)程調(diào)用
task_done()
表示這個條目已經(jīng)被回收,該條目所有工作已經(jīng)完成,未完成計(jì)數(shù)就會減少。當(dāng)未完成計(jì)數(shù)降到零的時候,join()
阻塞被解除。
- coroutine put(item)?
添加一個元素進(jìn)隊(duì)列。如果隊(duì)列滿了,在添加元素之前,會一直等待空閑插槽可用。
- qsize()?
返回隊(duì)列用的元素數(shù)量。
- task_done()?
表明前面排隊(duì)的任務(wù)已經(jīng)完成,即get出來的元素相關(guān)操作已經(jīng)完成。
由隊(duì)列使用者控制。每個
get()
用于獲取一個任務(wù),任務(wù)最后調(diào)用task_done()
告訴隊(duì)列,這個任務(wù)已經(jīng)完成。如果
join()
當(dāng)前正在阻塞,在所有條目都被處理后,將解除阻塞(意味著每個put()
進(jìn)隊(duì)列的條目的task_done()
都被收到)。如果被調(diào)用的次數(shù)多于放入隊(duì)列中的項(xiàng)目數(shù)量,將引發(fā)
ValueError
。
優(yōu)先級隊(duì)列?
后進(jìn)先出隊(duì)列?
異常?
- exception asyncio.QueueEmpty?
當(dāng)隊(duì)列為空的時候,調(diào)用
get_nowait()
方法而引發(fā)這個異常。
- exception asyncio.QueueFull?
當(dāng)隊(duì)列中條目數(shù)量已經(jīng)達(dá)到它的 maxsize 的時候,調(diào)用
put_nowait()
方法而引發(fā)的異常。
例子?
隊(duì)列能被用于多個的并發(fā)任務(wù)的工作量分配:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())