2013-06-11 18 views
14

Ich probiere Multiprozessor-Programmierung mit Python aus. Nehmen Sie einen Divide and Conquer-Algorithmus wie zum Beispiel Fibonacci. Der Programmablauf der Ausführung verzweigt sich wie ein Baum und wird parallel ausgeführt. Mit anderen Worten, wir haben ein Beispiel für nested parallelism.Verschachtelte Parallelität in Python

Von Java habe ich ein Threadpoolmuster verwendet, um Ressourcen zu verwalten, da das Programm sehr schnell verzweigen und zu viele kurzlebige Threads erstellen kann. Ein einzelner statischer (gemeinsamer) Threadpool kann über ExecutorService instanziiert werden.

Ich würde das gleiche für Pool erwarten, aber es scheint, dass Pool object is not to be globally shared. Wenn Sie beispielsweise den Pool mit multiprocessing.Manager.Namespace() freigeben, wird der Fehler angezeigt.

Pool-Objekte können nicht zwischen Prozessen oder gebeizt

Ich habe einen 2-teiligen Frage übergeben werden:

  1. Was ich hier fehlt; Warum sollte kein Pool zwischen Prozessen geteilt werden?
  2. Was ist ein Muster für die Implementierung verschachtelter Parallelität in Python? Wenn möglich, rekursive Struktur beibehalten und nicht für Iteration handeln.

from concurrent.futures import ThreadPoolExecutor 

def fibonacci(n): 
    if n < 2: 
     return n 
    a = pool.submit(fibonacci, n - 1) 
    b = pool.submit(fibonacci, n - 2) 
    return a.result() + b.result() 

def main(): 
    global pool 

    N = int(10) 
    with ThreadPoolExecutor(2**N) as pool: 
     print(fibonacci(N)) 

main() 

Java

public class FibTask implements Callable<Integer> { 

    public static ExecutorService pool = Executors.newCachedThreadPool(); 
    int arg; 

    public FibTask(int n) { 
     this.arg= n; 
    } 

    @Override 
    public Integer call() throws Exception { 
     if (this.arg > 2) { 
      Future<Integer> left = pool.submit(new FibTask(arg - 1)); 
      Future<Integer> right = pool.submit(new FibTask(arg - 2)); 
      return left.get() + right.get(); 
     } else { 
      return 1; 
     } 

    } 

    public static void main(String[] args) throws Exception { 
     Integer n = 14; 
     Callable<Integer> task = new FibTask(n); 
     Future<Integer> result =FibTask.pool.submit(task); 
     System.out.println(Integer.toString(result.get())); 
     FibTask.pool.shutdown();    
    }  

} 

Ich bin mir nicht sicher, ob es hier ankommt, aber ich bin der Unterschied zwischen "Prozess" und "Faden" zu ignorieren; Für mich bedeuten beide "virtualisierter Prozessor". Mein Verständnis ist, der Zweck eines Pools ist die gemeinsame Nutzung eines "Pools" oder Ressourcen. Laufende Aufgaben können eine Anfrage an den Pool stellen. Wenn parallele Tasks in anderen Threads ausgeführt werden, können diese Threads zurückgewonnen und neuen Tasks zugewiesen werden. Es macht für mich keinen Sinn, das Teilen des Pools zu verbieten, so dass jeder Thread seinen eigenen neuen Pool instanziieren muss, da dies den Zweck eines Thread-Pools zu besiegen scheint.

+0

Warum brauchen Sie es global zu teilen?Kannst du das nicht alles in einem Namespace/Klasse enthalten? –

+2

@InbarRose Das Problem besteht darin, dass in einer rekursiven Funktion, die den rekursiven Aufruf in einem anderen Prozess ausführt, der Pool verzweigt ist und auch vom Subprozess aufgerufen wird. Dies verursacht Probleme mit den Warteschlangen, daher funktioniert es nicht. Jedenfalls möchte ich betonen, dass Sie in Java * threads * verwenden. Bei Threads gibt es keine Probleme, da das Pool-Objekt nicht verzweigt wird. Ich glaube, dass die Verwendung eines Prozesspools in Java zu mehr oder weniger demselben Verhalten führen würde. – Bakuriu

+0

@InbarRose Ich habe auch versucht, 'Pool' als Klasseninstanz und statische Variable zu enthalten, aber immer noch dasselbe Problem zu erreichen. Zum Beispiel mit 'Pool' und den rekursiven Aufrufen innerhalb einer einzelnen Klasse, aber dies führt immer noch zum selben Problem:> Pool-Objekte können nicht zwischen Prozessen übergeben werden ... –

Antwort

3

1) Was fehlt mir hier; Warum sollte kein Pool zwischen Prozessen geteilt werden?

Nicht alle Objekt/Instanzen sind aufsammelbare/serializable, in diesem Fall Pool verwendet threading.lock die nicht aufsammelbare ist:

>>> import threading, pickle 
>>> pickle.dumps(threading.Lock()) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
[...] 
    File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
    raise TypeError, "can't pickle %s objects" % base.__name__ 
TypeError: can't pickle lock objects 

oder besser:

>>> import threading, pickle 
>>> from concurrent.futures import ThreadPoolExecutor 
>>> pickle.dumps(ThreadPoolExecutor(1)) 
Traceback (most recent call last): 
    File "<stdin>", line 1, in <module> 
    File "/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 1374, in dumps 
    Pickler(file, protocol).dump(obj) 
    File 
[...] 
"/usr/local/Cellar/python/2.7.3/Frameworks/Python.framework/Versions/2.7/lib/python2.7/pickle.py", line 306, in save 
     rv = reduce(self.proto) 
     File "/Users/rafael/dev/venvs/general/bin/../lib/python2.7/copy_reg.py", line 70, in _reduce_ex 
     raise TypeError, "can't pickle %s objects" % base.__name__ 
    TypeError: can't pickle lock objects 

Wenn Sie denken, Darüber hinaus macht es Sinn, dass eine Sperre ein Semaphor-Primitiv ist, das vom Betriebssystem verwaltet wird (da Python native Threads verwendet). Die Möglichkeit, diesen Objektstatus innerhalb der Python-Laufzeit zu picken und zu speichern, würde wirklich nichts Sinnvolles bringen, da sein wahrer Zustand vom Betriebssystem beibehalten wird.

2) Was ist ein Muster für die Implementierung verschachtelter Parallelität in Python?Wenn möglich, eine rekursive Struktur beibehalten wird, und der Handel nicht für Iteration

nun für das Prestige, alles, was ich oben nicht erwähnte nicht wirklich zu Ihrem Beispiel anwenden, da Sie verwenden Gewinde (ThreadPoolExecutor) und nicht die Prozesse (ProcessPoolExecutor), so dass keine Datenfreigabe über den Prozess hinweg stattfinden muss.

Ihr Java-Beispiel scheint nur effizienter zu sein, da der verwendete Thread-Pool (CachedThreadPool) neue Threads nach Bedarf erstellt, während die Python-Executor-Implementierungen beschränkt sind und eine explizite maximale Anzahl von Threads (max_workers) erfordern. Es gibt ein bisschen Syntaxunterschiede zwischen den Sprachen, die Sie auch abzuschütteln scheinen (statische Instanzen in Python sind im Grunde alles, was nicht explizit im Bereich liegt), aber im Grunde würden beide Beispiele genau die gleiche Anzahl von Threads erzeugen, um ausgeführt zu werden. Zum Beispiel, hier ist ein Beispiel eine ziemlich naive CachedThreadPoolExecutor Implementierung in Python mit:

from concurrent.futures import ThreadPoolExecutor 

class CachedThreadPoolExecutor(ThreadPoolExecutor): 
    def __init__(self): 
     super(CachedThreadPoolExecutor, self).__init__(max_workers=1) 

    def submit(self, fn, *args, **extra): 
     if self._work_queue.qsize() > 0: 
      print('increasing pool size from %d to %d' % (self._max_workers, self._max_workers+1)) 
      self._max_workers +=1 

     return super(CachedThreadPoolExecutor, self).submit(fn, *args, **extra) 

pool = CachedThreadPoolExecutor() 

def fibonacci(n): 
    print n 
    if n < 2: 
     return n 
    a = pool.submit(fibonacci, n - 1) 
    b = pool.submit(fibonacci, n - 2) 
    return a.result() + b.result() 

print(fibonacci(10)) 

Tuning Leistung:

Ich suche in gevent deuten stark darauf hin, da es Ihnen hohe Parallelität ohne den Thread Overhead geben wird. Dies ist nicht immer der Fall, aber Ihr Code ist eigentlich das Aushängeschild für die Nutzung von Gevent. Hier ein Beispiel:

import gevent 

def fibonacci(n): 
    print n 
    if n < 2: 
     return n 
    a = gevent.spawn(fibonacci, n - 1) 
    b = gevent.spawn(fibonacci, n - 2) 
    return a.get() + b.get() 

print(fibonacci(10)) 

völlig unwissenschaftlich, aber auf meinem Computer über den Code läuft 9x schneller als seine Gewinde gleichwertig.

Ich hoffe, das hilft.

+1

gevent gibt Ihnen keine Parallelität. –

+0

Richtig, keine rechnerische Parallelität, aber die ursprüngliche Frage war nicht, den gewählten Algorithmus zu ändern, sondern stattdessen ein gemeinsames Muster, um es zu verbessern. –

+0

Keine Änderung des Algorithmus erforderlich: Im Beispiel wird Arbeit bereits in unabhängige Teilaufgaben aufgeteilt. Alles, was benötigt wird, ist ein Substrat, das die Aufgaben tatsächlich parallel ausführt (d. H. Keine Nebenläufigkeitslösung wie gevent). –

0

1. Was fehlt mir hier? Warum sollte kein Pool zwischen Prozessen geteilt werden?

Im Allgemeinen können OS-Threads unabhängig von der Sprache überhaupt nicht zwischen Prozessen ausgetauscht werden.

Sie können den Zugriff auf den Pool-Manager mit Arbeitsprozessen teilen, aber das ist wahrscheinlich keine gute Lösung für irgendein Problem; siehe unten.

2. Was ist ein Muster für die Implementierung verschachtelter Parallelität in Python? Wenn möglich, rekursive Struktur beibehalten und nicht für Iteration handeln.

Dies hängt sehr von Ihren Daten ab.

In CPython lautet die allgemeine Antwort eine Datenstruktur, die effiziente parallele Operationen implementiert. Ein gutes Beispiel hierfür sind die optimierten Array-Typen NumPy: here ist ein Beispiel für die Verwendung, um einen großen Array-Vorgang auf mehrere Prozessorkerne aufzuteilen.

Die Fibonacci-Funktion, die durch blockierende Rekursion implementiert wird, ist jedoch für jeden Worker-Pool-basierten Ansatz besonders geeignet: fib (N) wird viel Zeit damit verbringen, N Worker zu binden, die nur auf andere Worker warten. Es gibt viele andere Möglichkeiten, sich der Fibonacci-Funktion zu nähern (z. B. CPS, um die Blockierung zu beseitigen und eine konstante Anzahl von Arbeitern zu füllen), aber es ist wahrscheinlich besser, Ihre Strategie auf der Grundlage der tatsächlichen Probleme als Beispiele zu entscheiden so was.