2014-12-05 3 views
27

Ich versuche, eine Anwendung zu schreiben, die eine Funktion gleichzeitig mit einem multiprocessing.Pool anwendet. Ich möchte, dass diese Funktion eine Instanzmethode ist (also kann ich sie in verschiedenen Unterklassen anders definieren). Dies scheint nicht möglich zu sein; wie ich an anderer Stelle gelernt habe, anscheinend bound methods can't be pickled. Warum funktioniert das Starten einer multiprocessing.Process mit einer gebundenen Methode als Ziel? Der folgende Code:Warum kann ich eine Instanzmethode an Multiprocessing übergeben.Prozess, aber kein Multiprocessing.Pool?

import multiprocessing 

def test1(): 
    print "Hello, world 1" 

def increment(x): 
    return x + 1 

class testClass(): 
    def process(self): 
     process1 = multiprocessing.Process(target=test1) 
     process1.start() 
     process1.join() 
     process2 = multiprocessing.Process(target=self.test2) 
     process2.start() 
     process2.join() 

    def pool(self): 
     pool = multiprocessing.Pool(1) 
     for answer in pool.imap(increment, range(10)): 
      print answer 
     print 
     for answer in pool.imap(self.square, range(10)): 
      print answer 

    def test2(self): 
     print "Hello, world 2" 

    def square(self, x): 
     return x * x 

def main(): 
    c = testClass() 
    c.process() 
    c.pool() 

if __name__ == "__main__": 
    main() 

erzeugt diese Ausgabe:

Hello, world 1 
Hello, world 2 
1 
2 
3 
4 
5 
6 
7 
8 
9 
10 

Exception in thread Thread-2: 
Traceback (most recent call last): 
    File "C:\Python27\Lib\threading.py", line 551, in __bootstrap_inner 
    self.run() 
    File "C:\Python27\Lib\threading.py", line 504, in run 
    self.__target(*self.__args, **self.__kwargs) 
    File "C:\Python27\Lib\multiprocessing\pool.py", line 319, in _handle_tasks 
    put(task) 
PicklingError: Can't pickle <type 'instancemethod'>: attribute lookup __builtin__.instancemethod failed 

Warum können Prozesse handhaben gebunden Methoden, aber nicht Pools?

+0

Es ist, weil sie nicht von 'Pickle' serialisiert werden können. Wenn Sie in python2.7 sein müssen, und Sie Ihren Code so arbeiten lassen müssen, wie er ist ... sollten Sie einen Fork von 'multiprocessing' verwenden, der Instanzmethoden ersticken kann und einen' Pool' einlegen kann. Sehen Sie sich 'pathos.multiprocessing' an, das Sie in dem Stack-Overflow-Link finden, den Sie im obigen Beitrag zitiert haben. –

+0

Genauer gesagt, dieser Link zeigt, wie Instanzmethoden in 2.x trivialerweise in einem 'Pool' serialisiert werden können: http://stackoverflow.com/a/21345273/2379433 –

+0

Muss es instance method sein? Können Sie die Klassenmethode verwenden? Ich habe es versucht und hat gut für mich gearbeitet. –

Antwort

20

Das pickle Modul kann in der Regel nicht beizen Instanzmethoden:

>>> import pickle 
>>> class A(object): 
... def z(self): print "hi" 
... 
>>> a = A() 
>>> pickle.dumps(a.z) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 224, in dump 
    self.save(obj) 
    File "/usr/local/lib/python2.7/pickle.py", line 306, in save 
    rv = reduce(self.proto) 
    File "/usr/local/lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
    raise TypeError, "can't pickle %s objects" % base.__name__ 
TypeError: can't pickle instancemethod objects 

jedoch das multiprocessing Modul has a custom Pickler that adds some code to enable this feature:

# 
# Try making some callable types picklable 
# 

from pickle import Pickler 
class ForkingPickler(Pickler): 
    dispatch = Pickler.dispatch.copy() 

    @classmethod 
    def register(cls, type, reduce): 
     def dispatcher(self, obj): 
      rv = reduce(obj) 
      self.save_reduce(obj=obj, *rv) 
     cls.dispatch[type] = dispatcher 

def _reduce_method(m): 
    if m.im_self is None: 
     return getattr, (m.im_class, m.im_func.func_name) 
    else: 
     return getattr, (m.im_self, m.im_func.func_name) 
ForkingPickler.register(type(ForkingPickler.save), _reduce_method) 

du replizieren kann das copy_reg Modul es für sich selbst zu sehen, arbeiten :

>>> import copy_reg 
>>> def _reduce_method(m): 
...  if m.im_self is None: 
...   return getattr, (m.im_class, m.im_func.func_name) 
...  else: 
...   return getattr, (m.im_self, m.im_func.func_name) 
... 
>>> copy_reg.pickle(type(a.z), _reduce_method) 
>>> pickle.dumps(a.z) 
"c__builtin__\ngetattr\np0\n(ccopy_reg\n_reconstructor\np1\n(c__main__\nA\np2\nc__builtin__\nobject\np3\nNtp4\nRp5\nS'z'\np6\ntp7\nRp8\n." 

Wenn Sie Process.start verwenden, um einen neuen Prozess unter Windows zum Laichen, it pickles all the parameters you passed to the child process using this custom ForkingPickler:

# 
# Windows 
# 

else: 
    # snip... 
    from pickle import load, HIGHEST_PROTOCOL 

    def dump(obj, file, protocol=None): 
     ForkingPickler(file, protocol).dump(obj) 

    # 
    # We define a Popen class similar to the one from subprocess, but 
    # whose constructor takes a process object as its argument. 
    # 

    class Popen(object): 
     ''' 
     Start a subprocess to run the code of a process object 
     ''' 
     _tls = thread._local() 

     def __init__(self, process_obj): 
      # create pipe for communication with child 
      rfd, wfd = os.pipe() 

      # get handle for read end of the pipe and make it inheritable 
      ... 
      # start process 
      ... 

      # set attributes of self 
      ... 

      # send information to child 
      prep_data = get_preparation_data(process_obj._name) 
      to_child = os.fdopen(wfd, 'wb') 
      Popen._tls.process_handle = int(hp) 
      try: 
       dump(prep_data, to_child, HIGHEST_PROTOCOL) 
       dump(process_obj, to_child, HIGHEST_PROTOCOL) 
      finally: 
       del Popen._tls.process_handle 
       to_child.close() 

Beachten Sie den Abschnitt „Informationen für das Kind schicken“. Es verwendet die dump-Funktion, die ForkingPickler verwendet, um die Daten zu picken, was bedeutet, dass Ihre Instanzmethode gebeizt werden kann.

Jetzt, wenn Sie Methoden auf multiprocessing.Pool verwenden, um eine Methode an einen untergeordneten Prozess zu senden, verwendet es multiprocessing.Pipe, um die Daten zu pickle. In Python 2.7 ist multiprocessing.Pipe in C implementiert, and calls pickle_dumps directly, so dass es die ForkingPickler nicht ausnutzt. Das bedeutet, dass das Beizen der Instanzmethode nicht funktioniert.

Wenn Sie jedoch copy_reg verwenden, um die instancemethod Typ zu registrieren, anstatt eine benutzerdefinierte Pickler, alle Versuche Beizen betroffen sein werden. So können Sie verwenden, ermöglichen Instanzmethoden Beizen, auch über Pool:

import multiprocessing 
import copy_reg 
import types 

def _reduce_method(m): 
    if m.im_self is None: 
     return getattr, (m.im_class, m.im_func.func_name) 
    else: 
     return getattr, (m.im_self, m.im_func.func_name) 
copy_reg.pickle(types.MethodType, _reduce_method) 

def test1(): 
    print("Hello, world 1") 

def increment(x): 
    return x + 1 

class testClass(): 
    def process(self): 
     process1 = multiprocessing.Process(target=test1) 
     process1.start() 
     process1.join() 
     process2 = multiprocessing.Process(target=self.test2) 
     process2.start() 
     process2.join() 

    def pool(self): 
     pool = multiprocessing.Pool(1) 
     for answer in pool.imap(increment, range(10)): 
      print(answer) 
     print 
     for answer in pool.imap(self.square, range(10)): 
      print(answer) 

    def test2(self): 
     print("Hello, world 2") 

    def square(self, x): 
     return x * x 

def main(): 
    c = testClass() 
    c.process() 
    c.pool() 

if __name__ == "__main__": 
    main() 

Ausgang:

Hello, world 1 
Hello, world 2 
GOT (0, 0, (True, 1)) 
GOT (0, 1, (True, 2)) 
GOT (0, 2, (True, 3)) 
GOT (0, 3, (True, 4)) 
GOT (0, 4, (True, 5)) 
1GOT (0, 5, (True, 6)) 

GOT (0, 6, (True, 7)) 
2 
GOT (0, 7, (True, 8)) 
3 
GOT (0, 8, (True, 9)) 
GOT (0, 9, (True, 10)) 
4 
5 
6 
7 
8 
9 
10 

GOT (1, 0, (True, 0)) 
0 
GOT (1, 1, (True, 1)) 
1 
GOT (1, 2, (True, 4)) 
4 
GOT (1, 3, (True, 9)) 
9 
GOT (1, 4, (True, 16)) 
16 
GOT (1, 5, (True, 25)) 
25 
GOT (1, 6, (True, 36)) 
36 
GOT (1, 7, (True, 49)) 
49 
GOT (1, 8, (True, 64)) 
64 
GOT (1, 9, (True, 81)) 
81 
GOT None 

Beachten Sie auch, dass in Python 3.x kann pickle native Instanzmethode Typen Beize, so dass keiner von diesem Zeug zählt mehr. :)

+0

Danke für den Vorschlag; Ich bin überrascht, dass das Multiprocessing-Modul dies nicht bereits implementiert. Ihre genaue Lösung funktioniert nicht für mich, weil es das Beizen der Instanz beinhaltet, an die die Methode gebunden ist, was andere Probleme verursacht, aber sie hat mich in die richtige Richtung gelenkt. Ich definiere stattdessen die Methoden, die während des Multiprocessings auf der obersten Ebene eines Moduls ausgeführt werden, um beide Probleme zu umgehen und das gewünschte Verhalten zu erhalten. – dpitch40

7

Hier ist eine Alternative, die ich manchmal verwende, und es funktioniert in Python2.x:

Sie ein Top-Level „alias“ der Art zu Instanzmethoden erstellen, die ein Objekt, deren Instanzmethoden annehmen mögen Sie in einem Pool laufen, und haben sie die Instanzmethoden für Sie rufen:

import functools 
import multiprocessing 

def _instance_method_alias(obj, arg): 
    """ 
    Alias for instance method that allows the method to be called in a 
    multiprocessing pool 
    """ 
    obj.instance_method(arg) 
    return 

class MyClass(object): 
    """ 
    Our custom class whose instance methods we want to be able to use in a 
    multiprocessing pool 
    """ 

    def __init__(self): 
     self.my_string = "From MyClass: {}" 

    def instance_method(self, arg): 
     """ 
     Some arbitrary instance method 
     """ 

     print(self.my_string.format(arg)) 
     return 

# create an object of MyClass 
obj = MyClass() 

# use functools.partial to create a new method that always has the 
# MyClass object passed as its first argument 
_bound_instance_method_alias = functools.partial(_instance_method_alias, obj) 

# create our list of things we will use the pool to map 
l = [1,2,3] 

# create the pool of workers 
pool = multiprocessing.Pool() 

# call pool.map, passing it the newly created function 
pool.map(_bound_instance_method_alias, l) 

# cleanup 
pool.close() 
pool.join() 

Dieser Code erzeugt diese Ausgabe:

Von MyClass: 1
Von MyClass: 2
Von MyClass: 3

Eine Einschränkung besteht darin, dass Sie dies nicht für Methoden verwenden können, die das Objekt ändern. Jeder Prozess erhält eine Kopie des Objekts, für das er die Methoden aufruft, sodass Änderungen nicht an den Hauptprozess weitergegeben werden. Wenn Sie das Objekt nicht von den Methoden, die Sie aufrufen, ändern müssen, kann dies eine einfache Lösung sein.

+1

Danke für die Post, macht Sinn für mich nach dem Tauchen in Beize etc und das funktioniert für mich. Python 3 wird (letztendlich) diese Lücke schließen. Prost! – leomelzer

4

Hier ist ein einfacher Weg, in Python 2 zu arbeiten, wickeln Sie einfach die ursprüngliche Instanzmethode. Funktioniert gut unter MacOSX und Linux, funktioniert nicht unter Windows, getestet Python 2.7

Verwandte Themen