傳輸和協(xié)議?

前言

傳輸和協(xié)議會被像 loop.create_connection() 這類 底層 事件循環(huán)接口使用。它們使用基于回調(diào)的編程風(fēng)格支持網(wǎng)絡(luò)或IPC協(xié)議(如HTTP)的高性能實(shí)現(xiàn)。

基本上,傳輸和協(xié)議應(yīng)只在庫和框架上使用,而不應(yīng)該在高層的異步應(yīng)用中使用它們。

本文檔包含 TransportsProtocols 。

概述

在最頂層,傳輸只關(guān)心 怎樣 傳送字節(jié)內(nèi)容,而協(xié)議決定傳送 哪些 字節(jié)內(nèi)容(還要在一定程度上考慮何時)。

也可以這樣說:從傳輸?shù)慕嵌葋砜?,傳輸是套接?或類似的I/O終端)的抽象,而協(xié)議是應(yīng)用程序的抽象。

換另一種說法,傳輸和協(xié)議一起定義網(wǎng)絡(luò)I/0和進(jìn)程間I/O的抽象接口。

傳輸對象和協(xié)議對象總是一對一關(guān)系:協(xié)議調(diào)用傳輸方法來發(fā)送數(shù)據(jù),而傳輸在接收到數(shù)據(jù)時調(diào)用協(xié)議方法傳遞數(shù)據(jù)。

大部分面向連接的事件循環(huán)方法(如 loop.create_connection() ) 通常接受 protocol_factory 參數(shù)為接收到的鏈接創(chuàng)建 協(xié)議 對象,并用 傳輸 對象來表示。這些方法一般會返回 (transport, protocol) 元組。

目錄

本文檔包含下列小節(jié):

傳輸?

源碼: Lib/asyncio/transports.py


傳輸屬于 asyncio 模塊中的類,用來抽象各種通信通道。

傳輸對象總是由 異步IO事件循環(huán) 實(shí)例化。

異步IO實(shí)現(xiàn)TCP、UDP、SSL和子進(jìn)程管道的傳輸。傳輸上可用的方法由傳輸?shù)念愋蜎Q定。

傳輸類屬于 線程不安全

傳輸層級?

class asyncio.BaseTransport?

所有傳輸?shù)幕?。包含所有異步IO傳輸共用的方法。

class asyncio.WriteTransport(BaseTransport)?

只寫鏈接的基礎(chǔ)傳輸。

WriteTransport 類的實(shí)例由 loop.connect_write_pipe() 事件循環(huán)方法返回,也被子進(jìn)程相關(guān)的方法如 loop.subprocess_exec() 使用。

class asyncio.ReadTransport(BaseTransport)?

只讀鏈接的基礎(chǔ)傳輸。

ReadTransport 類的實(shí)例由 loop.connect_read_pipe() 事件循環(huán)方法返回,也被子進(jìn)程相關(guān)的方法如 loop.subprocess_exec() 使用。

class asyncio.Transport(WriteTransport, ReadTransport)?

接口代表一個雙向傳輸,如TCP鏈接。

用戶不用直接實(shí)例化傳輸;調(diào)用一個功能函數(shù),給它傳遞協(xié)議工廠和其它需要的信息就可以創(chuàng)建傳輸和協(xié)議。

傳輸 類實(shí)例由如 loop.create_connection() 、 loop.create_unix_connection() 、 loop.create_server() 、 loop.sendfile() 等這類事件循環(huán)方法使用或返回。

class asyncio.DatagramTransport(BaseTransport)?

數(shù)據(jù)報(UDP)傳輸鏈接。

DatagramTransport 類實(shí)例由事件循環(huán)方法 loop.create_datagram_endpoint() 返回。

class asyncio.SubprocessTransport(BaseTransport)?

表示父進(jìn)程和子進(jìn)程之間連接的抽象。

SubprocessTransport 類的實(shí)例由事件循環(huán)方法 loop.subprocess_shell()loop.subprocess_exec() 返回。

基礎(chǔ)傳輸?

BaseTransport.close()?

關(guān)閉傳輸。

如果傳輸具有發(fā)送數(shù)據(jù)緩沖區(qū),將會異步發(fā)送已緩存的數(shù)據(jù)。在所有已緩存的數(shù)據(jù)都已處理后,就會將 None 作為協(xié)議 protocol.connection_lost() 方法的參數(shù)并進(jìn)行調(diào)用。在這之后,傳輸不再接收任何數(shù)據(jù)。

BaseTransport.is_closing()?

返回 True ,如果傳輸正在關(guān)閉或已經(jīng)關(guān)閉。

BaseTransport.get_extra_info(name, default=None)?

返回 傳輸或它使用的相關(guān)資源信息。

name 是表示要獲取傳輸特定信息的字符串。

default 是在信息不可用或傳輸不支持第三方事件循環(huán)實(shí)現(xiàn)或當(dāng)前平臺查詢時返回的值。

例如下面代碼嘗試獲取傳輸相關(guān)套接字對象:

sock = transport.get_extra_info('socket')
if sock is not None:
    print(sock.getsockopt(...))

傳輸可查詢信息類別:

BaseTransport.set_protocol(protocol)?

設(shè)置一個新協(xié)議。

只有兩種協(xié)議都寫明支持切換才能完成切換協(xié)議。

BaseTransport.get_protocol()?

返回當(dāng)前協(xié)議。

只讀傳輸?

ReadTransport.is_reading()?

如果傳輸接收到新數(shù)據(jù)時返回 True 。

3.7 新版功能.

ReadTransport.pause_reading()?

暫停傳輸?shù)慕邮斩恕?a title="asyncio.Protocol.data_received" class="reference internal" href="#asyncio.Protocol.data_received">protocol.data_received() 方法將不會收到數(shù)據(jù),除非 resume_reading()  被調(diào)用。

在 3.7 版更改: 這個方法冪等的, 它可以在傳輸已經(jīng)暫?;蜿P(guān)閉時調(diào)用。

ReadTransport.resume_reading()?

恢復(fù)接收端。如果有數(shù)據(jù)可讀取時,協(xié)議方法 protocol.data_received() 將再次被調(diào)用。

在 3.7 版更改: 這個方法冪等的, 它可以在傳輸已經(jīng)準(zhǔn)備好讀取數(shù)據(jù)時調(diào)用。

只寫傳輸?

WriteTransport.abort()?

立即關(guān)閉傳輸,不會等待已提交的操作處理完畢。已緩存的數(shù)據(jù)將會丟失。不會接收更多數(shù)據(jù)。 最終 None 將作為協(xié)議的 protocol.connection_lost() 方法的參數(shù)被調(diào)用。

WriteTransport.can_write_eof()?

如果傳輸支持 write_eof() 返回 True 否則返回 False 。

WriteTransport.get_write_buffer_size()?

返回傳輸使用輸出緩沖區(qū)的當(dāng)前大小。

WriteTransport.get_write_buffer_limits()?

獲取寫入流控制 highlow 高低標(biāo)記位。返回元組 (low, high)lowhigh 為正字節(jié)數(shù)。

使用 set_write_buffer_limits() 設(shè)置限制。

3.4.2 新版功能.

WriteTransport.set_write_buffer_limits(high=None, low=None)?

設(shè)置寫入流控制 highlow 高低標(biāo)記位。

這兩個值(以字節(jié)數(shù)表示)控制何時調(diào)用協(xié)議的 protocol.pause_writing()protocol.resume_writing() 方法。 如果指明,則低水位必須小于或等于高水位。 highlow 都不能為負(fù)值。

pause_writing() 會在緩沖區(qū)尺寸大于或等于 high 值時被調(diào)用。 如果寫入已經(jīng)被暫停,resume_writing() 會在緩沖區(qū)尺寸小于或等于 low 值時被調(diào)用。

默認(rèn)值是實(shí)現(xiàn)專屬的。 如果只給出了高水位值,則低水位值默認(rèn)為一個小于或等于高水位值的實(shí)現(xiàn)傳屬值。 將 high 設(shè)為零會強(qiáng)制將 low 也設(shè)為零,并使得 pause_writing() 在緩沖區(qū)變?yōu)榉强盏娜魏螘r刻被調(diào)用。 將 low 設(shè)為零會使得 resume_writing() 在緩沖區(qū)為空時只被調(diào)用一次。 對于上下限都使用零值通常是不夠優(yōu)化的,因為它減少了并發(fā)執(zhí)行 I/O 和計算的機(jī)會。

可使用 get_write_buffer_limits() 來獲取上下限值。

WriteTransport.write(data)?

將一些 data 字節(jié)串寫入傳輸。

此方法不會阻塞;它會緩沖數(shù)據(jù)并安排其被異步地發(fā)出。

WriteTransport.writelines(list_of_data)?

將數(shù)據(jù)字節(jié)串的列表(或任意可迭代對象)寫入傳輸。 這在功能上等價于在可迭代對象產(chǎn)生的每個元素上調(diào)用 write(),但其實(shí)現(xiàn)可能更為高效。

WriteTransport.write_eof()?

在刷新所有已緩沖數(shù)據(jù)之后關(guān)閉傳輸?shù)膶懭攵恕?數(shù)據(jù)仍可以被接收。

如果傳輸(例如 SSL)不支持半關(guān)閉的連接,此方法會引發(fā) NotImplementedError。

數(shù)據(jù)報傳輸?

DatagramTransport.sendto(data, addr=None)?

data 字節(jié)串發(fā)送到 addr (基于傳輸?shù)哪繕?biāo)地址) 所給定的遠(yuǎn)端對等方。 如果 addrNone,則將數(shù)據(jù)發(fā)送到傳輸創(chuàng)建時給定的目標(biāo)地址。

此方法不會阻塞;它會緩沖數(shù)據(jù)并安排其被異步地發(fā)出。

DatagramTransport.abort()?

立即關(guān)閉傳輸,不會等待已提交的操作執(zhí)行完畢。 已緩存的數(shù)據(jù)將會丟失。 不會接收更多的數(shù)據(jù)。 協(xié)議的 protocol.connection_lost() 方法最終將附帶 None 作為參數(shù)被調(diào)用。

子進(jìn)程傳輸?

SubprocessTransport.get_pid()?

將子進(jìn)程的進(jìn)程 ID 以整數(shù)形式返回。

SubprocessTransport.get_pipe_transport(fd)?

返回對應(yīng)于整數(shù)文件描述符 fd 的通信管道的傳輸:

  • 0: 標(biāo)準(zhǔn)輸入 (stdin) 的可讀流式傳輸,如果子進(jìn)程創(chuàng)建時未設(shè)置 stdin=PIPE 則為 None

  • 1: 標(biāo)準(zhǔn)輸出 (stdout) 的可寫流式傳輸,如果子進(jìn)程創(chuàng)建時未設(shè)置 stdout=PIPE 則為 None

  • 2: 標(biāo)準(zhǔn)錯誤 (stderr) 的可寫流式傳輸,如果子進(jìn)程創(chuàng)建時未設(shè)置 stderr=PIPE 則為 None

  • 其他 fd: None

SubprocessTransport.get_returncode()?

返回整數(shù)形式的進(jìn)程返回碼,或者如果還未返回則為 None,這類似于 subprocess.Popen.returncode 屬性。

SubprocessTransport.kill()?

殺死子進(jìn)程。

在 POSIX 系統(tǒng)中,函數(shù)會發(fā)送 SIGKILL 到子進(jìn)程。 在 Windows 中,此方法是 terminate() 的別名。

另請參見 subprocess.Popen.kill()。

SubprocessTransport.send_signal(signal)?

發(fā)送 signal 編號到子進(jìn)程,與 subprocess.Popen.send_signal() 一樣。

SubprocessTransport.terminate()?

停止子進(jìn)程。

在 POSIX 系統(tǒng)中,此方法會發(fā)送 SIGTERM 到子進(jìn)程。 在 Windows 中,則會調(diào)用 Windows API 函數(shù) TerminateProcess() 來停止子進(jìn)程。

另請參見 subprocess.Popen.terminate()。

SubprocessTransport.close()?

通過調(diào)用 kill() 方法來殺死子進(jìn)程。

如果子進(jìn)程尚未返回,并關(guān)閉 stdin, stdoutstderr 管道的傳輸。

協(xié)議?

源碼: Lib/asyncio/protocols.py


asyncio 提供了一組抽象基類,它們應(yīng)當(dāng)被用于實(shí)現(xiàn)網(wǎng)絡(luò)協(xié)議。 這些類被設(shè)計為與 傳輸 配合使用。

抽象基礎(chǔ)協(xié)議類的子類可以實(shí)現(xiàn)其中的部分或全部方法。 所有這些方法都是回調(diào):它們由傳輸或特定事件調(diào)用,例如當(dāng)數(shù)據(jù)被接收的時候。 基礎(chǔ)協(xié)議方法應(yīng)當(dāng)由相應(yīng)的傳輸來調(diào)用。

基礎(chǔ)協(xié)議?

class asyncio.BaseProtocol?

帶有所有協(xié)議的共享方法的基礎(chǔ)協(xié)議。

class asyncio.Protocol(BaseProtocol)?

用于實(shí)現(xiàn)流式協(xié)議(TCP, Unix 套接字等等)的基類。

class asyncio.BufferedProtocol(BaseProtocol)?

用于實(shí)現(xiàn)可對接收緩沖區(qū)進(jìn)行手動控制的流式協(xié)議的基類。

class asyncio.DatagramProtocol(BaseProtocol)?

用于實(shí)現(xiàn)數(shù)據(jù)報(UDP)協(xié)議的基類。

class asyncio.SubprocessProtocol(BaseProtocol)?

用于實(shí)現(xiàn)與子進(jìn)程通信(單向管道)的協(xié)議的基類。

基礎(chǔ)協(xié)議?

所有 asyncio 協(xié)議均可實(shí)現(xiàn)基礎(chǔ)協(xié)議回調(diào)。

連接回調(diào)

連接回調(diào)會在所有協(xié)議上被調(diào)用,每個成功的連接將恰好調(diào)用一次。 所有其他協(xié)議回調(diào)只能在以下兩個方法之間被調(diào)用。

BaseProtocol.connection_made(transport)?

連接建立時被調(diào)用。

transport 參數(shù)是代表連接的傳輸。 此協(xié)議負(fù)責(zé)將引用保存至對應(yīng)的傳輸。

BaseProtocol.connection_lost(exc)?

連接丟失或關(guān)閉時將被調(diào)用。

方法的參數(shù)是一個異常對象或為 None。 后者意味著收到了常規(guī)的 EOF,或者連接被連接的一端取消或關(guān)閉。

流程控制回調(diào)

流程控制回調(diào)可由傳輸來調(diào)用以暫?;蚧謴?fù)協(xié)議所執(zhí)行的寫入操作。

請查看 set_write_buffer_limits() 方法的文檔了解詳情。

BaseProtocol.pause_writing()?

當(dāng)傳輸?shù)木彌_區(qū)升至高水位以上時將被調(diào)用。

BaseProtocol.resume_writing()?

當(dāng)傳輸?shù)木彌_區(qū)降到低水位以下時將被調(diào)用。

如果緩沖區(qū)大小等于高水位值,則 pause_writing() 不會被調(diào)用:緩沖區(qū)大小必須要高于該值。

相反地,resume_writing() 會在緩沖區(qū)大小等于或小于低水位值時被調(diào)用。 這些結(jié)束條件對于當(dāng)兩個水位取零值時也能確保符合預(yù)期的行為是很重要的。

流式協(xié)議?

事件方法,例如 loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe()loop.connect_write_pipe() 都接受返回流式協(xié)議的工廠。

Protocol.data_received(data)?

當(dāng)收到數(shù)據(jù)時被調(diào)用。 data 為包含入站數(shù)據(jù)的非空字節(jié)串對象。

數(shù)據(jù)是否會被緩沖、分塊或重組取決于具體傳輸。 通常,你不應(yīng)依賴于特定的語義而應(yīng)使你的解析具有通用性和靈活性。 但是,數(shù)據(jù)總是要以正確的順序被接收。

此方法在連接打開期間可以被調(diào)用任意次數(shù)。

但是,protocol.eof_received() 最多只會被調(diào)用一次。 一旦 eof_received() 被調(diào)用,data_received() 就不會再被調(diào)用。

Protocol.eof_received()?

當(dāng)發(fā)出信號的另一端不再繼續(xù)發(fā)送數(shù)據(jù)時(例如通過調(diào)用 transport.write_eof(),如果另一端也使用 asyncio 的話)被調(diào)用。

此方法可能返回假值 (包括 None),在此情況下傳輸將會自行關(guān)閉。 相反地,如果此方法返回真值,將以所用的協(xié)議來確定是否要關(guān)閉傳輸。 由于默認(rèn)實(shí)現(xiàn)是返回 None,因此它會隱式地關(guān)閉連接。

某些傳輸,包括 SSL 在內(nèi),并不支持半關(guān)閉的連接,在此情況下從該方法返回真值將導(dǎo)致連接被關(guān)閉。

狀態(tài)機(jī):

start -> connection_made
    [-> data_received]*
    [-> eof_received]?
-> connection_lost -> end

緩沖流協(xié)議?

3.7 新版功能.

帶緩沖的協(xié)議可與任何支持 流式協(xié)議 的事件循環(huán)方法配合使用。

BufferedProtocol 實(shí)現(xiàn)允許顯式手動分配和控制接收緩沖區(qū)。 隨后事件循環(huán)可以使用協(xié)議提供的緩沖區(qū)來避免不必要的數(shù)據(jù)復(fù)制。 這對于接收大量數(shù)據(jù)的協(xié)議來說會有明顯的性能提升。 復(fù)雜的協(xié)議實(shí)現(xiàn)能顯著地減少緩沖區(qū)分配的數(shù)量。

以下回調(diào)是在 BufferedProtocol 實(shí)例上被調(diào)用的:

BufferedProtocol.get_buffer(sizehint)?

調(diào)用后會分配新的接收緩沖區(qū)。

sizehint 是推薦的返回緩沖區(qū)最小尺寸。 返回小于或大于 sizehint 推薦尺寸的緩沖區(qū)也是可接受的。 當(dāng)設(shè)為 -1 時,緩沖區(qū)尺寸可以是任意的。 返回尺寸為零的緩沖區(qū)則是錯誤的。

get_buffer() 必須返回一個實(shí)現(xiàn)了 緩沖區(qū)協(xié)議 的對象。

BufferedProtocol.buffer_updated(nbytes)?

用接收的數(shù)據(jù)更新緩沖區(qū)時被調(diào)用。

nbytes 是被寫入到緩沖區(qū)的字節(jié)總數(shù)。

BufferedProtocol.eof_received()?

請查看 protocol.eof_received() 方法的文檔。

在連接期間 get_buffer() 可以被調(diào)用任意次數(shù)。 但是,protocol.eof_received() 最多只能被調(diào)用一次,如果被調(diào)用,則在此之后 get_buffer()buffer_updated() 不能再被調(diào)用。

狀態(tài)機(jī):

start -> connection_made
    [-> get_buffer
        [-> buffer_updated]?
    ]*
    [-> eof_received]?
-> connection_lost -> end

數(shù)據(jù)報協(xié)議?

數(shù)據(jù)報協(xié)議實(shí)例應(yīng)當(dāng)由傳遞給 loop.create_datagram_endpoint() 方法的協(xié)議工廠來構(gòu)造。

DatagramProtocol.datagram_received(data, addr)?

當(dāng)接收到數(shù)據(jù)報時被調(diào)用。 data 是包含傳入數(shù)據(jù)的字節(jié)串對象。 addr 是發(fā)送數(shù)據(jù)的對等端地址;實(shí)際的格式取決于具體傳輸。

DatagramProtocol.error_received(exc)?

當(dāng)前一個發(fā)送或接收操作引發(fā) OSError 時被調(diào)用。 excOSError 的實(shí)例。

此方法會在當(dāng)傳輸(例如UDP)檢測到無法將數(shù)據(jù)報傳給接收方等極少數(shù)情況下被調(diào)用。 而在大多數(shù)情況下,無法送達(dá)的數(shù)據(jù)報將被靜默地丟棄。

備注

在 BSD 系統(tǒng)(macOS, FreeBSD 等等)上,數(shù)據(jù)報協(xié)議不支持流控制,因為沒有可靠的方式來檢測因?qū)懭攵噙^包所導(dǎo)致的發(fā)送失敗。

套接字總是顯示為 'ready' 且多余的包會被丟棄。 有一定的可能性會引發(fā) OSError 并設(shè)置 errnoerrno.ENOBUFS;如果此異常被引發(fā),它將被報告給 DatagramProtocol.error_received(),在其他情況下則會被忽略。

子進(jìn)程協(xié)議?

子進(jìn)程協(xié)議實(shí)例應(yīng)當(dāng)由傳遞給 loop.subprocess_exec()loop.subprocess_shell() 方法的協(xié)議工廠函數(shù)來構(gòu)造。

SubprocessProtocol.pipe_data_received(fd, data)?

當(dāng)子進(jìn)程向其 stdout 或 stderr 管道寫入數(shù)據(jù)時被調(diào)用。

fd 是以整數(shù)表示的管道文件描述符。

data 是包含已接收數(shù)據(jù)的非空字節(jié)串對象。

SubprocessProtocol.pipe_connection_lost(fd, exc)?

與子進(jìn)程通信的其中一個管道關(guān)閉時被調(diào)用。

fd 以整數(shù)表示的已關(guān)閉文件描述符。

SubprocessProtocol.process_exited()?

子進(jìn)程退出時被調(diào)用。

例子?

TCP 回顯服務(wù)器?

使用 loop.create_server() 方法創(chuàng)建 TCP 回顯服務(wù)器,發(fā)回已接收的數(shù)據(jù),并關(guān)閉連接:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        lambda: EchoServerProtocol(),
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

參見

使用流的 TCP 回顯服務(wù)器 示例,使用了高層級的 asyncio.start_server() 函數(shù)。

TCP 回顯客戶端?

使用 loop.create_connection() 方法的 TCP 回顯客戶端,發(fā)送數(shù)據(jù)并等待,直到連接被關(guān)閉:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

參見

使用流的 TCP 回顯客戶端 示例,使用了高層級的 asyncio.open_connection() 函數(shù)。

UDP 回顯服務(wù)器?

使用 loop.create_datagram_endpoint() 方法的 UDB 回顯服務(wù)器,發(fā)回已接收的數(shù)據(jù):

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoServerProtocol(),
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP 回顯客戶端?

使用 loop.create_datagram_endpoint() 方法的 UDP 回顯客戶端,發(fā)送數(shù)據(jù)并在收到回應(yīng)時關(guān)閉傳輸:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

鏈接已存在的套接字?

附帶一個協(xié)議使用 loop.create_connection() 方法,等待直到套接字接收數(shù)據(jù):

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

參見

使用低層級的 loop.add_reader() 方法來注冊一個 FD 的 監(jiān)視文件描述符以讀取事件 示例。

使用在協(xié)程中通過 open_connection() 函數(shù)創(chuàng)建的高層級流的 注冊一個打開的套接字以等待使用流的數(shù)據(jù) 示例。

loop.subprocess_exec() 與 SubprocessProtocol?

一個使用子進(jìn)程協(xié)議來獲取子進(jìn)程的輸出并等待子進(jìn)程退出的示例。

這個子進(jìn)程是由 loop.subprocess_exec() 方法創(chuàng)建的:

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

另請參閱使用高層級 API 編寫的 相同示例。