multiprocessing.shared_memory --- 可從進程直接訪問的共享內存?

源代碼: Lib/multiprocessing/shared_memory.py

3.8 新版功能.


該模塊提供了一個 SharedMemory 類,用于分配和管理多核或對稱多處理器(SMP)機器上進程間的共享內存。為了協(xié)助管理不同進程間的共享內存生命周期,multiprocessing.managers 模塊也提供了一個 BaseManager 的子類: SharedMemoryManager

本模塊中,共享內存是指 "System V 類型" 的共享內存塊(雖然可能和它實現(xiàn)方式不完全一致)而不是 “分布式共享內存”。這種類型的的共享內存允許不同進程讀寫一片公共(或者共享)的易失性存儲區(qū)域。一般來說,進程被限制只能訪問屬于自己進程空間的內存,但是共享內存允許跨進程共享數(shù)據(jù),從而避免通過進程間發(fā)送消息的形式傳遞數(shù)據(jù)。相比通過磁盤、套接字或者其他要求序列化、反序列化和復制數(shù)據(jù)的共享形式,直接通過內存共享數(shù)據(jù)擁有更出色性能。

class multiprocessing.shared_memory.SharedMemory(name=None, create=False, size=0)?

創(chuàng)建一個新的共享內存塊或者連接到一片已經(jīng)存在的共享內存塊。每個共享內存塊都被指定了一個全局唯一的名稱。通過這種方式,進程可以使用一個特定的名字創(chuàng)建共享內存區(qū)塊,然后其他進程使用同樣的名字連接到這個共享內存塊。

作為一種跨進程共享數(shù)據(jù)的方式,共享內存塊的壽命可能超過創(chuàng)建它的原始進程。一個共享內存塊可能同時被多個進程使用,當一個進程不再需要訪問這個共享內存塊的時候,應該調用 close() 方法。當一個共享內存塊不被任何進程使用的時候,應該調用 unlink() 方法以執(zhí)行必要的清理。

name 是共享內存的唯一名稱,字符串類型。如果創(chuàng)建一個新共享內存塊的時候,名稱指定為 None (默認值),將會隨機產(chǎn)生一個新名稱。

create 指定創(chuàng)建一個新的共享內存塊 (True) 還是連接到已存在的共享內存塊 (False) 。

如果是新創(chuàng)建共享內存塊則 size 用于指定塊的大小為多少字節(jié)。由于某些平臺是以內存頁大小為最小單位來分配內存的,最終得到的內存塊大小可能大于或等于要求的大小。如果是連接到已經(jīng)存在的共享內存塊, size 參數(shù)會被忽略。

close()?

關閉實例對于共享內存的訪問連接。所有實例確認自己不再需要使用共享內存的時候都應該調用 close() ,以保證必要的資源清理。調用 close() 并不會銷毀共享內存區(qū)域。

請求銷毀底層的共享內存塊。為了執(zhí)行必要的資源清理, 在所有使用這個共享內存塊的進程中, unlink() 應該調用一次(且只能調用一次) 。發(fā)出此銷毀請求后,共享內存塊可能會、也可能不會立即銷毀,且此行為在不同操作系統(tǒng)之間可能不同。調用 unlink() 后再嘗試方位其中的數(shù)據(jù)可能導致內存錯誤。注意: 最后一個關閉共享內存訪問權限的進程可以以任意順序調用 unlink()close() 。

buf?

共享內存塊內容的 memoryview 。

name?

共享內存塊的唯一標識,只讀屬性。

size?

共享內存塊的字節(jié)大小,只讀屬性。

以下示例展示了 SharedMemory 底層的用法:

>>>
>>> from multiprocessing import shared_memory
>>> shm_a = shared_memory.SharedMemory(create=True, size=10)
>>> type(shm_a.buf)
<class 'memoryview'>
>>> buffer = shm_a.buf
>>> len(buffer)
10
>>> buffer[:4] = bytearray([22, 33, 44, 55])  # Modify multiple at once
>>> buffer[4] = 100                           # Modify single byte at a time
>>> # Attach to an existing shared memory block
>>> shm_b = shared_memory.SharedMemory(shm_a.name)
>>> import array
>>> array.array('b', shm_b.buf[:5])  # Copy the data into a new array.array
array('b', [22, 33, 44, 55, 100])
>>> shm_b.buf[:5] = b'howdy'  # Modify via shm_b using bytes
>>> bytes(shm_a.buf[:5])      # Access via shm_a
b'howdy'
>>> shm_b.close()   # Close each SharedMemory instance
>>> shm_a.close()
>>> shm_a.unlink()  # Call unlink only once to release the shared memory

以下示例展示了一個現(xiàn)實中的例子,使用 SharedMemory 類和 NumPy arrays 結合, 從兩個 Python shell 中訪問同一個 numpy.ndarray :

>>>
>>> # In the first Python interactive shell
>>> import numpy as np
>>> a = np.array([1, 1, 2, 3, 5, 8])  # Start with an existing NumPy array
>>> from multiprocessing import shared_memory
>>> shm = shared_memory.SharedMemory(create=True, size=a.nbytes)
>>> # Now create a NumPy array backed by shared memory
>>> b = np.ndarray(a.shape, dtype=a.dtype, buffer=shm.buf)
>>> b[:] = a[:]  # Copy the original data into shared memory
>>> b
array([1, 1, 2, 3, 5, 8])
>>> type(b)
<class 'numpy.ndarray'>
>>> type(a)
<class 'numpy.ndarray'>
>>> shm.name  # We did not specify a name so one was chosen for us
'psm_21467_46075'

>>> # In either the same shell or a new Python shell on the same machine
>>> import numpy as np
>>> from multiprocessing import shared_memory
>>> # Attach to the existing shared memory block
>>> existing_shm = shared_memory.SharedMemory(name='psm_21467_46075')
>>> # Note that a.shape is (6,) and a.dtype is np.int64 in this example
>>> c = np.ndarray((6,), dtype=np.int64, buffer=existing_shm.buf)
>>> c
array([1, 1, 2, 3, 5, 8])
>>> c[-1] = 888
>>> c
array([  1,   1,   2,   3,   5, 888])

>>> # Back in the first Python interactive shell, b reflects this change
>>> b
array([  1,   1,   2,   3,   5, 888])

>>> # Clean up from within the second Python shell
>>> del c  # Unnecessary; merely emphasizing the array is no longer used
>>> existing_shm.close()

>>> # Clean up from within the first Python shell
>>> del b  # Unnecessary; merely emphasizing the array is no longer used
>>> shm.close()
>>> shm.unlink()  # Free and release the shared memory block at the very end
class multiprocessing.managers.SharedMemoryManager([address[, authkey]])?

BaseManager 的子類,可用于管理跨進程的共享內存塊。

調用 SharedMemoryManager 實例上的 start() 方法會啟動一個新進程。這個新進程的唯一目的就是管理所有由它創(chuàng)建的共享內存塊的生命周期。想要釋放此進程管理的所有共享內存塊,可以調用實例的 shutdown() 方法。這會觸發(fā)執(zhí)行它管理的所有 SharedMemory 對象的 SharedMemory.unlink() 方法,然后停止這個進程。通過 SharedMemoryManager 創(chuàng)建 SharedMemory 實例,我們可以避免手動跟蹤和釋放共享內存資源。

這個類提供了創(chuàng)建和返回 SharedMemory 實例的方法,以及以共享內存為基礎創(chuàng)建一個列表類對象 (ShareableList) 的方法。

有關繼承的可選輸入?yún)?shù) addressauthkey 以及他們如何用于從進程連接已經(jīng)存在的 SharedMemoryManager 服務,參見 multiprocessing.managers.BaseManager 。

SharedMemory(size)?

使用 size 參數(shù),創(chuàng)建一個新的指定字節(jié)大小的 SharedMemory 對象并返回。

ShareableList(sequence)?

創(chuàng)建并返回一個新的 ShareableList 對象,通過輸入?yún)?shù) sequence 初始化。

下面的案例展示了 SharedMemoryManager 的基本機制:

>>>
>>> from multiprocessing.managers import SharedMemoryManager
>>> smm = SharedMemoryManager()
>>> smm.start()  # Start the process that manages the shared memory blocks
>>> sl = smm.ShareableList(range(4))
>>> sl
ShareableList([0, 1, 2, 3], name='psm_6572_7512')
>>> raw_shm = smm.SharedMemory(size=128)
>>> another_sl = smm.ShareableList('alpha')
>>> another_sl
ShareableList(['a', 'l', 'p', 'h', 'a'], name='psm_6572_12221')
>>> smm.shutdown()  # Calls unlink() on sl, raw_shm, and another_sl

以下案例展示了 SharedMemoryManager 對象的一種可能更方便的使用方式,通過 with 語句來保證所有共享內存塊在使用完后被釋放。

>>>
>>> with SharedMemoryManager() as smm:
...     sl = smm.ShareableList(range(2000))
...     # Divide the work among two processes, storing partial results in sl
...     p1 = Process(target=do_work, args=(sl, 0, 1000))
...     p2 = Process(target=do_work, args=(sl, 1000, 2000))
...     p1.start()
...     p2.start()  # A multiprocessing.Pool might be more efficient
...     p1.join()
...     p2.join()   # Wait for all work to complete in both processes
...     total_result = sum(sl)  # Consolidate the partial results now in sl

with 語句中使用 SharedMemoryManager  對象的時候,使用這個管理器創(chuàng)建的共享內存塊會在 with 語句代碼塊結束后被釋放。

class multiprocessing.shared_memory.ShareableList(sequence=None, *, name=None)?

提供一個可修改的類 list 對象,其中所有值都存放在共享內存塊中。這限制了可被存儲在其中的值只能是 int, float, bool, str (每條數(shù)據(jù)小于10M), bytes (每條數(shù)據(jù)小于10M)以及 None 這些內置類型。它另一個顯著區(qū)別于內置 list 類型的地方在于它的長度無法修改(比如,沒有 append, insert 等操作)且不支持通過切片操作動態(tài)創(chuàng)建新的 ShareableList  實例。

sequence 會被用來為一個新的 ShareableList 填充值。 設為 None 則會基于唯一的共享內存名稱關聯(lián)到已經(jīng)存在的 ShareableList。

name 是所請求的共享內存的唯一名稱,與 SharedMemory 的定義中所描述的一致。 當關聯(lián)到現(xiàn)有的 ShareableList 時,則指明其共享內存塊的唯一名稱并將 sequence 設為 None

count(value)?

返回 value 出現(xiàn)的次數(shù)。

index(value)?

返回 value 首次出現(xiàn)的位置,如果 value 不存在, 則拋出 ValueError 異常。

format?

包含由所有當前存儲值所使用的 struct 打包格式的只讀屬性。

shm?

存儲了值的 SharedMemory 實例。

下面的例子演示了 ShareableList 實例的基本用法:

>>>
>>> from multiprocessing import shared_memory
>>> a = shared_memory.ShareableList(['howdy', b'HoWdY', -273.154, 100, None, True, 42])
>>> [ type(entry) for entry in a ]
[<class 'str'>, <class 'bytes'>, <class 'float'>, <class 'int'>, <class 'NoneType'>, <class 'bool'>, <class 'int'>]
>>> a[2]
-273.154
>>> a[2] = -78.5
>>> a[2]
-78.5
>>> a[2] = 'dry ice'  # Changing data types is supported as well
>>> a[2]
'dry ice'
>>> a[2] = 'larger than previously allocated storage space'
Traceback (most recent call last):
  ...
ValueError: exceeds available storage for existing str
>>> a[2]
'dry ice'
>>> len(a)
7
>>> a.index(42)
6
>>> a.count(b'howdy')
0
>>> a.count(b'HoWdY')
1
>>> a.shm.close()
>>> a.shm.unlink()
>>> del a  # Use of a ShareableList after call to unlink() is unsupported

下面的例子演示了一個、兩個或多個進程如何通過提供下層的共享內存塊名稱來訪問同一個 ShareableList:

>>>
>>> b = shared_memory.ShareableList(range(5))         # In a first process
>>> c = shared_memory.ShareableList(name=b.shm.name)  # In a second process
>>> c
ShareableList([0, 1, 2, 3, 4], name='...')
>>> c[-1] = -999
>>> b[-1]
-999
>>> b.shm.close()
>>> c.shm.close()
>>> c.shm.unlink()

The following examples demonstrates that ShareableList (and underlying SharedMemory) objects can be pickled and unpickled if needed. Note, that it will still be the same shared object. This happens, because the deserialized object has the same unique name and is just attached to an existing object with the same name (if the object is still alive):

>>>
>>> import pickle
>>> from multiprocessing import shared_memory
>>> sl = shared_memory.ShareableList(range(10))
>>> list(sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> deserialized_sl = pickle.loads(pickle.dumps(sl))
>>> list(deserialized_sl)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> sl[0] = -1
>>> deserialized_sl[1] = -2
>>> list(sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>> list(deserialized_sl)
[-1, -2, 2, 3, 4, 5, 6, 7, 8, 9]
>>>
>>> sl.shm.close()
>>> sl.shm.unlink()