傳輸和協(xié)議?
前言
傳輸和協(xié)議會被像 loop.create_connection()
這類 底層 事件循環(huán)接口使用。它們使用基于回調(diào)的編程風(fēng)格支持網(wǎng)絡(luò)或IPC協(xié)議(如HTTP)的高性能實(shí)現(xiàn)。
基本上,傳輸和協(xié)議應(yīng)只在庫和框架上使用,而不應(yīng)該在高層的異步應(yīng)用中使用它們。
本文檔包含 Transports 和 Protocols 。
概述
在最頂層,傳輸只關(guān)心 怎樣 傳送字節(jié)內(nèi)容,而協(xié)議決定傳送 哪些 字節(jié)內(nèi)容(還要在一定程度上考慮何時)。
也可以這樣說:從傳輸?shù)慕嵌葋砜?,傳輸是套接?或類似的I/O終端)的抽象,而協(xié)議是應(yīng)用程序的抽象。
換另一種說法,傳輸和協(xié)議一起定義網(wǎng)絡(luò)I/0和進(jìn)程間I/O的抽象接口。
傳輸對象和協(xié)議對象總是一對一關(guān)系:協(xié)議調(diào)用傳輸方法來發(fā)送數(shù)據(jù),而傳輸在接收到數(shù)據(jù)時調(diào)用協(xié)議方法傳遞數(shù)據(jù)。
大部分面向連接的事件循環(huán)方法(如 loop.create_connection()
) 通常接受 protocol_factory 參數(shù)為接收到的鏈接創(chuàng)建 協(xié)議 對象,并用 傳輸 對象來表示。這些方法一般會返回 (transport, protocol)
元組。
目錄
本文檔包含下列小節(jié):
傳輸 部分記載異步IO
BaseTransport
、ReadTransport
、WriteTransport
、Transport
、DatagramTransport
和SubprocessTransport
等類。The Protocols section documents asyncio
BaseProtocol
,Protocol
,BufferedProtocol
,DatagramProtocol
, andSubprocessProtocol
classes.例子 部分展示怎樣使用傳輸、協(xié)議和底層事件循環(huán)接口。
傳輸?
傳輸屬于 asyncio
模塊中的類,用來抽象各種通信通道。
傳輸對象總是由 異步IO事件循環(huán) 實(shí)例化。
異步IO實(shí)現(xiàn)TCP、UDP、SSL和子進(jìn)程管道的傳輸。傳輸上可用的方法由傳輸?shù)念愋蜎Q定。
傳輸類屬于 線程不安全 。
傳輸層級?
- class asyncio.BaseTransport?
所有傳輸?shù)幕?。包含所有異步IO傳輸共用的方法。
- class asyncio.WriteTransport(BaseTransport)?
只寫鏈接的基礎(chǔ)傳輸。
WriteTransport 類的實(shí)例由
loop.connect_write_pipe()
事件循環(huán)方法返回,也被子進(jìn)程相關(guān)的方法如loop.subprocess_exec()
使用。
- class asyncio.ReadTransport(BaseTransport)?
只讀鏈接的基礎(chǔ)傳輸。
ReadTransport 類的實(shí)例由
loop.connect_read_pipe()
事件循環(huán)方法返回,也被子進(jìn)程相關(guān)的方法如loop.subprocess_exec()
使用。
- class asyncio.Transport(WriteTransport, ReadTransport)?
接口代表一個雙向傳輸,如TCP鏈接。
用戶不用直接實(shí)例化傳輸;調(diào)用一個功能函數(shù),給它傳遞協(xié)議工廠和其它需要的信息就可以創(chuàng)建傳輸和協(xié)議。
傳輸 類實(shí)例由如
loop.create_connection()
、loop.create_unix_connection()
、loop.create_server()
、loop.sendfile()
等這類事件循環(huán)方法使用或返回。
- class asyncio.DatagramTransport(BaseTransport)?
數(shù)據(jù)報(UDP)傳輸鏈接。
DatagramTransport 類實(shí)例由事件循環(huán)方法
loop.create_datagram_endpoint()
返回。
- class asyncio.SubprocessTransport(BaseTransport)?
表示父進(jìn)程和子進(jìn)程之間連接的抽象。
SubprocessTransport 類的實(shí)例由事件循環(huán)方法
loop.subprocess_shell()
和loop.subprocess_exec()
返回。
基礎(chǔ)傳輸?
- BaseTransport.close()?
關(guān)閉傳輸。
如果傳輸具有發(fā)送數(shù)據(jù)緩沖區(qū),將會異步發(fā)送已緩存的數(shù)據(jù)。在所有已緩存的數(shù)據(jù)都已處理后,就會將
None
作為協(xié)議protocol.connection_lost()
方法的參數(shù)并進(jìn)行調(diào)用。在這之后,傳輸不再接收任何數(shù)據(jù)。
- BaseTransport.is_closing()?
返回
True
,如果傳輸正在關(guān)閉或已經(jīng)關(guān)閉。
- BaseTransport.get_extra_info(name, default=None)?
返回 傳輸或它使用的相關(guān)資源信息。
name 是表示要獲取傳輸特定信息的字符串。
default 是在信息不可用或傳輸不支持第三方事件循環(huán)實(shí)現(xiàn)或當(dāng)前平臺查詢時返回的值。
例如下面代碼嘗試獲取傳輸相關(guān)套接字對象:
sock = transport.get_extra_info('socket') if sock is not None: print(sock.getsockopt(...))
傳輸可查詢信息類別:
套接字:
'peername'
: 套接字鏈接時的遠(yuǎn)端地址,socket.socket.getpeername()
方法的結(jié)果 (出錯時為None
)'socket'
:socket.socket
實(shí)例'sockname'
: 套接字本地地址,socket.socket.getsockname()
方法的結(jié)果
SSL套接字
'compression'
: 用字符串指定壓縮算法,或者鏈接沒有壓縮時為None
;ssl.SSLSocket.compression()
的結(jié)果。'cipher'
: 一個三值的元組,包含使用密碼的名稱,定義使用的SSL協(xié)議的版本,使用的加密位數(shù)。ssl.SSLSocket.cipher()
的結(jié)果。'peercert'
: 遠(yuǎn)端憑證;ssl.SSLSocket.getpeercert()
結(jié)果。'sslcontext'
:ssl.SSLContext
實(shí)例'ssl_object'
:ssl.SSLObject
或ssl.SSLSocket
實(shí)例
管道:
'pipe'
: 管道對象
子進(jìn)程:
'subprocess'
:subprocess.Popen
實(shí)例
- BaseTransport.set_protocol(protocol)?
設(shè)置一個新協(xié)議。
只有兩種協(xié)議都寫明支持切換才能完成切換協(xié)議。
- BaseTransport.get_protocol()?
返回當(dāng)前協(xié)議。
只讀傳輸?
- ReadTransport.is_reading()?
如果傳輸接收到新數(shù)據(jù)時返回
True
。3.7 新版功能.
- ReadTransport.pause_reading()?
暫停傳輸?shù)慕邮斩恕?a title="asyncio.Protocol.data_received" class="reference internal" href="#asyncio.Protocol.data_received">
protocol.data_received()
方法將不會收到數(shù)據(jù),除非resume_reading()
被調(diào)用。在 3.7 版更改: 這個方法冪等的, 它可以在傳輸已經(jīng)暫?;蜿P(guān)閉時調(diào)用。
- ReadTransport.resume_reading()?
恢復(fù)接收端。如果有數(shù)據(jù)可讀取時,協(xié)議方法
protocol.data_received()
將再次被調(diào)用。在 3.7 版更改: 這個方法冪等的, 它可以在傳輸已經(jīng)準(zhǔn)備好讀取數(shù)據(jù)時調(diào)用。
只寫傳輸?
- WriteTransport.abort()?
立即關(guān)閉傳輸,不會等待已提交的操作處理完畢。已緩存的數(shù)據(jù)將會丟失。不會接收更多數(shù)據(jù)。 最終
None
將作為協(xié)議的protocol.connection_lost()
方法的參數(shù)被調(diào)用。
- WriteTransport.can_write_eof()?
如果傳輸支持
write_eof()
返回True
否則返回False
。
- WriteTransport.get_write_buffer_size()?
返回傳輸使用輸出緩沖區(qū)的當(dāng)前大小。
- WriteTransport.get_write_buffer_limits()?
獲取寫入流控制 high 和 low 高低標(biāo)記位。返回元組
(low, high)
, low 和 high 為正字節(jié)數(shù)。使用
set_write_buffer_limits()
設(shè)置限制。3.4.2 新版功能.
- WriteTransport.set_write_buffer_limits(high=None, low=None)?
設(shè)置寫入流控制 high 和 low 高低標(biāo)記位。
這兩個值(以字節(jié)數(shù)表示)控制何時調(diào)用協(xié)議的
protocol.pause_writing()
和protocol.resume_writing()
方法。 如果指明,則低水位必須小于或等于高水位。 high 和 low 都不能為負(fù)值。pause_writing()
會在緩沖區(qū)尺寸大于或等于 high 值時被調(diào)用。 如果寫入已經(jīng)被暫停,resume_writing()
會在緩沖區(qū)尺寸小于或等于 low 值時被調(diào)用。默認(rèn)值是實(shí)現(xiàn)專屬的。 如果只給出了高水位值,則低水位值默認(rèn)為一個小于或等于高水位值的實(shí)現(xiàn)傳屬值。 將 high 設(shè)為零會強(qiáng)制將 low 也設(shè)為零,并使得
pause_writing()
在緩沖區(qū)變?yōu)榉强盏娜魏螘r刻被調(diào)用。 將 low 設(shè)為零會使得resume_writing()
在緩沖區(qū)為空時只被調(diào)用一次。 對于上下限都使用零值通常是不夠優(yōu)化的,因為它減少了并發(fā)執(zhí)行 I/O 和計算的機(jī)會。可使用
get_write_buffer_limits()
來獲取上下限值。
- WriteTransport.write(data)?
將一些 data 字節(jié)串寫入傳輸。
此方法不會阻塞;它會緩沖數(shù)據(jù)并安排其被異步地發(fā)出。
- WriteTransport.writelines(list_of_data)?
將數(shù)據(jù)字節(jié)串的列表(或任意可迭代對象)寫入傳輸。 這在功能上等價于在可迭代對象產(chǎn)生的每個元素上調(diào)用
write()
,但其實(shí)現(xiàn)可能更為高效。
- WriteTransport.write_eof()?
在刷新所有已緩沖數(shù)據(jù)之后關(guān)閉傳輸?shù)膶懭攵恕?數(shù)據(jù)仍可以被接收。
如果傳輸(例如 SSL)不支持半關(guān)閉的連接,此方法會引發(fā)
NotImplementedError
。
數(shù)據(jù)報傳輸?
- DatagramTransport.sendto(data, addr=None)?
將 data 字節(jié)串發(fā)送到 addr (基于傳輸?shù)哪繕?biāo)地址) 所給定的遠(yuǎn)端對等方。 如果 addr 為
None
,則將數(shù)據(jù)發(fā)送到傳輸創(chuàng)建時給定的目標(biāo)地址。此方法不會阻塞;它會緩沖數(shù)據(jù)并安排其被異步地發(fā)出。
- DatagramTransport.abort()?
立即關(guān)閉傳輸,不會等待已提交的操作執(zhí)行完畢。 已緩存的數(shù)據(jù)將會丟失。 不會接收更多的數(shù)據(jù)。 協(xié)議的
protocol.connection_lost()
方法最終將附帶None
作為參數(shù)被調(diào)用。
子進(jìn)程傳輸?
- SubprocessTransport.get_pid()?
將子進(jìn)程的進(jìn)程 ID 以整數(shù)形式返回。
- SubprocessTransport.get_pipe_transport(fd)?
返回對應(yīng)于整數(shù)文件描述符 fd 的通信管道的傳輸:
- SubprocessTransport.get_returncode()?
返回整數(shù)形式的進(jìn)程返回碼,或者如果還未返回則為
None
,這類似于subprocess.Popen.returncode
屬性。
- SubprocessTransport.kill()?
殺死子進(jìn)程。
在 POSIX 系統(tǒng)中,函數(shù)會發(fā)送 SIGKILL 到子進(jìn)程。 在 Windows 中,此方法是
terminate()
的別名。另請參見
subprocess.Popen.kill()
。
- SubprocessTransport.send_signal(signal)?
發(fā)送 signal 編號到子進(jìn)程,與
subprocess.Popen.send_signal()
一樣。
- SubprocessTransport.terminate()?
停止子進(jìn)程。
在 POSIX 系統(tǒng)中,此方法會發(fā)送 SIGTERM 到子進(jìn)程。 在 Windows 中,則會調(diào)用 Windows API 函數(shù) TerminateProcess() 來停止子進(jìn)程。
協(xié)議?
asyncio 提供了一組抽象基類,它們應(yīng)當(dāng)被用于實(shí)現(xiàn)網(wǎng)絡(luò)協(xié)議。 這些類被設(shè)計為與 傳輸 配合使用。
抽象基礎(chǔ)協(xié)議類的子類可以實(shí)現(xiàn)其中的部分或全部方法。 所有這些方法都是回調(diào):它們由傳輸或特定事件調(diào)用,例如當(dāng)數(shù)據(jù)被接收的時候。 基礎(chǔ)協(xié)議方法應(yīng)當(dāng)由相應(yīng)的傳輸來調(diào)用。
基礎(chǔ)協(xié)議?
- class asyncio.BaseProtocol?
帶有所有協(xié)議的共享方法的基礎(chǔ)協(xié)議。
- class asyncio.Protocol(BaseProtocol)?
用于實(shí)現(xiàn)流式協(xié)議(TCP, Unix 套接字等等)的基類。
- class asyncio.BufferedProtocol(BaseProtocol)?
用于實(shí)現(xiàn)可對接收緩沖區(qū)進(jìn)行手動控制的流式協(xié)議的基類。
- class asyncio.DatagramProtocol(BaseProtocol)?
用于實(shí)現(xiàn)數(shù)據(jù)報(UDP)協(xié)議的基類。
- class asyncio.SubprocessProtocol(BaseProtocol)?
用于實(shí)現(xiàn)與子進(jìn)程通信(單向管道)的協(xié)議的基類。
基礎(chǔ)協(xié)議?
所有 asyncio 協(xié)議均可實(shí)現(xiàn)基礎(chǔ)協(xié)議回調(diào)。
連接回調(diào)
連接回調(diào)會在所有協(xié)議上被調(diào)用,每個成功的連接將恰好調(diào)用一次。 所有其他協(xié)議回調(diào)只能在以下兩個方法之間被調(diào)用。
- BaseProtocol.connection_made(transport)?
連接建立時被調(diào)用。
transport 參數(shù)是代表連接的傳輸。 此協(xié)議負(fù)責(zé)將引用保存至對應(yīng)的傳輸。
- BaseProtocol.connection_lost(exc)?
連接丟失或關(guān)閉時將被調(diào)用。
方法的參數(shù)是一個異常對象或為
None
。 后者意味著收到了常規(guī)的 EOF,或者連接被連接的一端取消或關(guān)閉。
流程控制回調(diào)
流程控制回調(diào)可由傳輸來調(diào)用以暫?;蚧謴?fù)協(xié)議所執(zhí)行的寫入操作。
請查看 set_write_buffer_limits()
方法的文檔了解詳情。
- BaseProtocol.pause_writing()?
當(dāng)傳輸?shù)木彌_區(qū)升至高水位以上時將被調(diào)用。
- BaseProtocol.resume_writing()?
當(dāng)傳輸?shù)木彌_區(qū)降到低水位以下時將被調(diào)用。
如果緩沖區(qū)大小等于高水位值,則 pause_writing()
不會被調(diào)用:緩沖區(qū)大小必須要高于該值。
相反地,resume_writing()
會在緩沖區(qū)大小等于或小于低水位值時被調(diào)用。 這些結(jié)束條件對于當(dāng)兩個水位取零值時也能確保符合預(yù)期的行為是很重要的。
流式協(xié)議?
事件方法,例如 loop.create_server()
, loop.create_unix_server()
, loop.create_connection()
, loop.create_unix_connection()
, loop.connect_accepted_socket()
, loop.connect_read_pipe()
和 loop.connect_write_pipe()
都接受返回流式協(xié)議的工廠。
- Protocol.data_received(data)?
當(dāng)收到數(shù)據(jù)時被調(diào)用。 data 為包含入站數(shù)據(jù)的非空字節(jié)串對象。
數(shù)據(jù)是否會被緩沖、分塊或重組取決于具體傳輸。 通常,你不應(yīng)依賴于特定的語義而應(yīng)使你的解析具有通用性和靈活性。 但是,數(shù)據(jù)總是要以正確的順序被接收。
此方法在連接打開期間可以被調(diào)用任意次數(shù)。
但是,
protocol.eof_received()
最多只會被調(diào)用一次。 一旦 eof_received() 被調(diào)用,data_received()
就不會再被調(diào)用。
- Protocol.eof_received()?
當(dāng)發(fā)出信號的另一端不再繼續(xù)發(fā)送數(shù)據(jù)時(例如通過調(diào)用
transport.write_eof()
,如果另一端也使用 asyncio 的話)被調(diào)用。此方法可能返回假值 (包括
None
),在此情況下傳輸將會自行關(guān)閉。 相反地,如果此方法返回真值,將以所用的協(xié)議來確定是否要關(guān)閉傳輸。 由于默認(rèn)實(shí)現(xiàn)是返回None
,因此它會隱式地關(guān)閉連接。某些傳輸,包括 SSL 在內(nèi),并不支持半關(guān)閉的連接,在此情況下從該方法返回真值將導(dǎo)致連接被關(guān)閉。
狀態(tài)機(jī):
start -> connection_made
[-> data_received]*
[-> eof_received]?
-> connection_lost -> end
緩沖流協(xié)議?
3.7 新版功能.
帶緩沖的協(xié)議可與任何支持 流式協(xié)議 的事件循環(huán)方法配合使用。
BufferedProtocol
實(shí)現(xiàn)允許顯式手動分配和控制接收緩沖區(qū)。 隨后事件循環(huán)可以使用協(xié)議提供的緩沖區(qū)來避免不必要的數(shù)據(jù)復(fù)制。 這對于接收大量數(shù)據(jù)的協(xié)議來說會有明顯的性能提升。 復(fù)雜的協(xié)議實(shí)現(xiàn)能顯著地減少緩沖區(qū)分配的數(shù)量。
以下回調(diào)是在 BufferedProtocol
實(shí)例上被調(diào)用的:
- BufferedProtocol.get_buffer(sizehint)?
調(diào)用后會分配新的接收緩沖區(qū)。
sizehint 是推薦的返回緩沖區(qū)最小尺寸。 返回小于或大于 sizehint 推薦尺寸的緩沖區(qū)也是可接受的。 當(dāng)設(shè)為 -1 時,緩沖區(qū)尺寸可以是任意的。 返回尺寸為零的緩沖區(qū)則是錯誤的。
get_buffer()
必須返回一個實(shí)現(xiàn)了 緩沖區(qū)協(xié)議 的對象。
- BufferedProtocol.buffer_updated(nbytes)?
用接收的數(shù)據(jù)更新緩沖區(qū)時被調(diào)用。
nbytes 是被寫入到緩沖區(qū)的字節(jié)總數(shù)。
- BufferedProtocol.eof_received()?
請查看
protocol.eof_received()
方法的文檔。
在連接期間 get_buffer()
可以被調(diào)用任意次數(shù)。 但是,protocol.eof_received()
最多只能被調(diào)用一次,如果被調(diào)用,則在此之后 get_buffer()
和 buffer_updated()
不能再被調(diào)用。
狀態(tài)機(jī):
start -> connection_made
[-> get_buffer
[-> buffer_updated]?
]*
[-> eof_received]?
-> connection_lost -> end
數(shù)據(jù)報協(xié)議?
數(shù)據(jù)報協(xié)議實(shí)例應(yīng)當(dāng)由傳遞給 loop.create_datagram_endpoint()
方法的協(xié)議工廠來構(gòu)造。
- DatagramProtocol.datagram_received(data, addr)?
當(dāng)接收到數(shù)據(jù)報時被調(diào)用。 data 是包含傳入數(shù)據(jù)的字節(jié)串對象。 addr 是發(fā)送數(shù)據(jù)的對等端地址;實(shí)際的格式取決于具體傳輸。
- DatagramProtocol.error_received(exc)?
當(dāng)前一個發(fā)送或接收操作引發(fā)
OSError
時被調(diào)用。 exc 是OSError
的實(shí)例。此方法會在當(dāng)傳輸(例如UDP)檢測到無法將數(shù)據(jù)報傳給接收方等極少數(shù)情況下被調(diào)用。 而在大多數(shù)情況下,無法送達(dá)的數(shù)據(jù)報將被靜默地丟棄。
備注
在 BSD 系統(tǒng)(macOS, FreeBSD 等等)上,數(shù)據(jù)報協(xié)議不支持流控制,因為沒有可靠的方式來檢測因?qū)懭攵噙^包所導(dǎo)致的發(fā)送失敗。
套接字總是顯示為 'ready' 且多余的包會被丟棄。 有一定的可能性會引發(fā) OSError
并設(shè)置 errno
為 errno.ENOBUFS
;如果此異常被引發(fā),它將被報告給 DatagramProtocol.error_received()
,在其他情況下則會被忽略。
子進(jìn)程協(xié)議?
子進(jìn)程協(xié)議實(shí)例應(yīng)當(dāng)由傳遞給 loop.subprocess_exec()
和 loop.subprocess_shell()
方法的協(xié)議工廠函數(shù)來構(gòu)造。
- SubprocessProtocol.pipe_data_received(fd, data)?
當(dāng)子進(jìn)程向其 stdout 或 stderr 管道寫入數(shù)據(jù)時被調(diào)用。
fd 是以整數(shù)表示的管道文件描述符。
data 是包含已接收數(shù)據(jù)的非空字節(jié)串對象。
- SubprocessProtocol.pipe_connection_lost(fd, exc)?
與子進(jìn)程通信的其中一個管道關(guān)閉時被調(diào)用。
fd 以整數(shù)表示的已關(guān)閉文件描述符。
- SubprocessProtocol.process_exited()?
子進(jìn)程退出時被調(diào)用。
例子?
TCP 回顯服務(wù)器?
使用 loop.create_server()
方法創(chuàng)建 TCP 回顯服務(wù)器,發(fā)回已接收的數(shù)據(jù),并關(guān)閉連接:
import asyncio
class EchoServerProtocol(asyncio.Protocol):
def connection_made(self, transport):
peername = transport.get_extra_info('peername')
print('Connection from {}'.format(peername))
self.transport = transport
def data_received(self, data):
message = data.decode()
print('Data received: {!r}'.format(message))
print('Send: {!r}'.format(message))
self.transport.write(data)
print('Close the client socket')
self.transport.close()
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
server = await loop.create_server(
lambda: EchoServerProtocol(),
'127.0.0.1', 8888)
async with server:
await server.serve_forever()
asyncio.run(main())
參見
使用流的 TCP 回顯服務(wù)器 示例,使用了高層級的 asyncio.start_server()
函數(shù)。
TCP 回顯客戶端?
使用 loop.create_connection()
方法的 TCP 回顯客戶端,發(fā)送數(shù)據(jù)并等待,直到連接被關(guān)閉:
import asyncio
class EchoClientProtocol(asyncio.Protocol):
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
def connection_made(self, transport):
transport.write(self.message.encode())
print('Data sent: {!r}'.format(self.message))
def data_received(self, data):
print('Data received: {!r}'.format(data.decode()))
def connection_lost(self, exc):
print('The server closed the connection')
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = 'Hello World!'
transport, protocol = await loop.create_connection(
lambda: EchoClientProtocol(message, on_con_lost),
'127.0.0.1', 8888)
# Wait until the protocol signals that the connection
# is lost and close the transport.
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
參見
使用流的 TCP 回顯客戶端 示例,使用了高層級的 asyncio.open_connection()
函數(shù)。
UDP 回顯服務(wù)器?
使用 loop.create_datagram_endpoint()
方法的 UDB 回顯服務(wù)器,發(fā)回已接收的數(shù)據(jù):
import asyncio
class EchoServerProtocol:
def connection_made(self, transport):
self.transport = transport
def datagram_received(self, data, addr):
message = data.decode()
print('Received %r from %s' % (message, addr))
print('Send %r to %s' % (message, addr))
self.transport.sendto(data, addr)
async def main():
print("Starting UDP server")
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
# One protocol instance will be created to serve all
# client requests.
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoServerProtocol(),
local_addr=('127.0.0.1', 9999))
try:
await asyncio.sleep(3600) # Serve for 1 hour.
finally:
transport.close()
asyncio.run(main())
UDP 回顯客戶端?
使用 loop.create_datagram_endpoint()
方法的 UDP 回顯客戶端,發(fā)送數(shù)據(jù)并在收到回應(yīng)時關(guān)閉傳輸:
import asyncio
class EchoClientProtocol:
def __init__(self, message, on_con_lost):
self.message = message
self.on_con_lost = on_con_lost
self.transport = None
def connection_made(self, transport):
self.transport = transport
print('Send:', self.message)
self.transport.sendto(self.message.encode())
def datagram_received(self, data, addr):
print("Received:", data.decode())
print("Close the socket")
self.transport.close()
def error_received(self, exc):
print('Error received:', exc)
def connection_lost(self, exc):
print("Connection closed")
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
message = "Hello World!"
transport, protocol = await loop.create_datagram_endpoint(
lambda: EchoClientProtocol(message, on_con_lost),
remote_addr=('127.0.0.1', 9999))
try:
await on_con_lost
finally:
transport.close()
asyncio.run(main())
鏈接已存在的套接字?
附帶一個協(xié)議使用 loop.create_connection()
方法,等待直到套接字接收數(shù)據(jù):
import asyncio
import socket
class MyProtocol(asyncio.Protocol):
def __init__(self, on_con_lost):
self.transport = None
self.on_con_lost = on_con_lost
def connection_made(self, transport):
self.transport = transport
def data_received(self, data):
print("Received:", data.decode())
# We are done: close the transport;
# connection_lost() will be called automatically.
self.transport.close()
def connection_lost(self, exc):
# The socket has been closed
self.on_con_lost.set_result(True)
async def main():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
on_con_lost = loop.create_future()
# Create a pair of connected sockets
rsock, wsock = socket.socketpair()
# Register the socket to wait for data.
transport, protocol = await loop.create_connection(
lambda: MyProtocol(on_con_lost), sock=rsock)
# Simulate the reception of data from the network.
loop.call_soon(wsock.send, 'abc'.encode())
try:
await protocol.on_con_lost
finally:
transport.close()
wsock.close()
asyncio.run(main())
參見
使用低層級的 loop.add_reader()
方法來注冊一個 FD 的 監(jiān)視文件描述符以讀取事件 示例。
使用在協(xié)程中通過 open_connection()
函數(shù)創(chuàng)建的高層級流的 注冊一個打開的套接字以等待使用流的數(shù)據(jù) 示例。
loop.subprocess_exec() 與 SubprocessProtocol?
一個使用子進(jìn)程協(xié)議來獲取子進(jìn)程的輸出并等待子進(jìn)程退出的示例。
這個子進(jìn)程是由 loop.subprocess_exec()
方法創(chuàng)建的:
import asyncio
import sys
class DateProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
def pipe_data_received(self, fd, data):
self.output.extend(data)
def process_exited(self):
self.exit_future.set_result(True)
async def get_date():
# Get a reference to the event loop as we plan to use
# low-level APIs.
loop = asyncio.get_running_loop()
code = 'import datetime; print(datetime.datetime.now())'
exit_future = asyncio.Future(loop=loop)
# Create the subprocess controlled by DateProtocol;
# redirect the standard output into a pipe.
transport, protocol = await loop.subprocess_exec(
lambda: DateProtocol(exit_future),
sys.executable, '-c', code,
stdin=None, stderr=None)
# Wait for the subprocess exit using the process_exited()
# method of the protocol.
await exit_future
# Close the stdout pipe.
transport.close()
# Read the output which was collected by the
# pipe_data_received() method of the protocol.
data = bytes(protocol.output)
return data.decode('ascii').rstrip()
date = asyncio.run(get_date())
print(f"Current date: {date}")
另請參閱使用高層級 API 編寫的 相同示例。