我正在嘗試使用多處理模塊在串列物件實體上并行運行相同的方法。
我發現的最接近的問題是“并行使用多處理將方法應用到物件串列”。但是,那里給出的解決方案似乎不適用于我的問題。
這是我要實作的目標的示例:
class Foo:
def __init__(self):
self.bar = None
def put_bar(self):
self.bar = 1.0
if __name__ == "__main__":
instances = [Foo() for _ in range(100)]
for instance in instances:
instance.put_bar()
# correctly prints 1.0
print(instances[0].bar)
但是,嘗試將其與多處理模塊并行化時,變數bar不受影響:
import os
from multiprocessing import Pool
class Foo:
def __init__(self):
self.bar = None
def put_bar(self):
self.bar = 1.0
def worker(instance):
return instance.put_bar()
if __name__ == "__main__":
instances = [Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
pool.map(worker, (instance for instance in instances))
# prints None
print(instances[0].bar)
非常感謝任何幫助找出錯誤步驟的幫助。
uj5u.com熱心網友回復:
您可以從您的類創建托管Foo物件,就像multiprocessing.managers.SyncManager通過呼叫創建的實體一樣multiptocessing.Manager()可以創建某些托管物件,例如 alist或dict. 回傳的是一個可在行程間共享的特殊代理物件。當在這樣的代理上進行方法呼叫時,方法的名稱及其引數通過管道或套接字發送到管理器創建的行程,并在管理器地址空間中駐留的實際物件上呼叫指定的方法。實際上,您正在執行類似于遠程方法呼叫的操作。這顯然比直接在物件上操作要慢得多,但如果必須的話,你也必須這樣做。您的編碼示例有點過于人為,沒有太多選擇。
因此,我將稍微修改您的示例,以便Foo.put_bar接受一個引數,并且您的輔助函式worker將put_bar根據一些計算確定要傳遞給什么值。這樣,用作引數的值將Foo.put_bar回傳到主行程,主行程執行所有實體的實際更新:
不使用具有特殊代理的托管物件的示例
import os
from multiprocessing import Pool
class Foo:
def __init__(self):
self.bar = None
def put_bar(self, value):
self.bar = value
def worker(instance):
# Code to compute a result omitted.
# We will for demo purposes always use 1.0:
return 1.0
if __name__ == "__main__":
instances = [Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
# (instance for instance in instances) instead of instances below
# doesn't accomplish anything:
for idx, result in enumerate(pool.map(worker, instances)):
instances[idx].put_bar(result)
# prints 1.0
print(instances[0].bar)
使用托管物件的示例
import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
class Foo:
def __init__(self):
self.bar = None
def put_bar(self, value):
self.bar = value
def worker(instance):
# Code to compute a result omitted.
# We will for demo purposes always use 1.0:
return instance.put_bar(1.0)
# If we did not need to expose attributes such as bar, then we could
# let Python automatically generate a proxy that would expose just the
# methods. But here we do need to access directly the `bar` attribute.
# The alternative would be for Foo to define method get_bar that returns
# self.bar.
class FooProxy(NamespaceProxy):
_exposed_ = ('__getattribute__', '__setattr__', '__delattr__', 'put_bar', 'bar')
def put_bar(self, value):
return self._callmethod('put_bar', args=(value,))
class FooManager(BaseManager):
pass
if __name__ == "__main__":
FooManager.register('Foo', Foo, FooProxy)
with FooManager() as manager:
instances = [manager.Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
# (instance for instance in instances) instead of instances below
# doesn't accomplish anything:
pool.map(worker, instances)
# We must do all access to the proxy while the manager process
# is still running, i.e. before this block is exited:
# prints 1.0
print(instances[0].bar)
在沒有特殊代理的情況下使用托管物件的示例
這里我們不需要直接訪問托管物件的屬性,因為我們已經定義了方法get_bar:
import os
from multiprocessing import Pool
from multiprocessing.managers import NamespaceProxy, BaseManager
class Foo:
def __init__(self):
self._bar = None
def put_bar(self, value):
self._bar = value
def get_bar(self):
return self._bar
def worker(instance):
# Code to compute a result omitted.
# We will for demo purposes always use 1.0:
return instance.put_bar(1.0)
class FooManager(BaseManager):
pass
if __name__ == "__main__":
FooManager.register('Foo', Foo)
with FooManager() as manager:
instances = [manager.Foo() for _ in range(100)]
with Pool(os.cpu_count()) as pool:
# (instance for instance in instances) instead of instances below
# doesn't accomplish anything:
pool.map(worker, instances)
# We must do all access to the proxy while the manager process
# is still running, i.e. before this block is exited:
# prints 1.0
print(instances[0].get_bar())
轉載請註明出處,本文鏈接:https://www.uj5u.com/qita/535072.html
標籤:Python哎呀多重处理
