2017-03-05 10 views
1

Ich versuche, den folgenden Code zu teilen, um Multiprozessing in Python zu ermöglichen, und es wird wirklich eine frustrierende Aufgabe für mich - ich bin neu in Multiprocessing und habe die Dokumentation und so viele Beispiele wie möglich gelesen finde aber immer noch keine Lösung, die es auf allen CPU-Kernen gleichzeitig funktionieren lässt.Multiprocessing iterable in Python

Ich möchte die Iterables in Viertel teilen und es den Test parallel berechnen lassen.

Mein einziger Thread Beispiel:

import itertools as it 
import numpy as np 

wmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 
pmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 

plines1 = it.product(wmod[0],wmod[1],wmod[2]) 
plines2 = it.product(pmod[0],pmod[1],pmod[2]) 

check = .915 
result = [] 

for count, (A,B) in enumerate(zip(plines1,plines2)): 
    pass 

    test = (sum(B)+10)/(sum(A)+12) 
    if test > check: 
     result = np.append(result,[A,B]) 
print('results: ',result) 

Ich weiß, dies ist ein sehr kleines Beispiel aus einem Paar von 3x3 Matrizen, aber ich mochte es zu einem Paar von Matrizen anzuwenden, die größer sind, und nehmen zu eine Stunde zu berechnen. Ich freue mich über jeden gegebenen Rat.

+0

Nun, zum einen würde ich 'result = np.append (result, [A, B])' aus dem Inneren der Schleife nehmen. Warum verwenden Sie hier überhaupt ein "numpy" Array statt einer "Liste"? Das Anhängen auf diese Weise ist bei einem Array gegenüber einer Liste sehr ineffizient. Das Seltsame ist, dass Sie 'result = []' auch verwenden ... –

+0

Für Skalierbarkeit und Effizienz habe ich mich entschieden, numpy zu verwenden. Wie gesagt, die 3x3-Matrizen sind nur für das Beispiel. Und die for-Schleife ist eine Iteration, sie speichert keine Daten, wenn ich sie nicht irgendwie abrufe. –

+0

Ja, aber 'numpy' macht Ihren Code nicht magisch skalierbarer. Wenn Sie so "numpy" verwenden, haben Sie den gegenteiligen Effekt. –

Antwort

0

Ich würde vorschlagen, Warteschlangen zu verwenden, um Ihre iterables auszugeben. Etwas wie das:

import multiprocessing as mp 
import numpy as np 
import itertools as it 


def worker(in_queue, out_queue): 
    check = 0.915 
    for a in iter(in_queue.get, 'STOP'): 
     A = a[0] 
     B = a[1] 
     test = (sum(B)+10)/(sum(A)+12) 
     if test > check: 
      out_queue.put([A,B]) 
     else: 
      out_queue.put('') 

if __name__ == "__main__": 
    wmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 
    pmod = np.array([[0,1,2],[3,4,5],[6,7,3]]) 

    plines1 = it.product(wmod[0],wmod[1],wmod[2]) 
    plines2 = it.product(pmod[0],pmod[1],pmod[2]) 

    # determine length of your iterator 
    counts = 26 

    # setup iterator 
    it = zip(plines1,plines2) 

    in_queue = mp.Queue() 
    out_queue = mp.Queue() 

    # setup workers 
    numProc = 2 
    process = [mp.Process(target=worker, 
          args=(in_queue, out_queue), daemon=True) for x in range(numProc)] 

    # run processes 
    for p in process: 
     p.start() 

    results = [] 
    control = True 

    # fill queue and get data 
    # code fills the queue until a new element is available in the output 
    # fill blocks if no slot is available in the in_queue 
    for idx in range(counts): 
     while out_queue.empty() and control: 
      # fill the queue 
      try: 
       in_queue.put(next(it), block=True) 
      except StopIteration: 
       # signals for processes stop 
       for p in process: 
        print('stopping') 
        in_queue.put('STOP') 
       control = False 
       break 
     results.append(out_queue.get(timeout=10)) 

    # wait for processes to finish 
    for p in process: 
     p.join() 

    print(results) 

    print('finished') 

Allerdings müssten Sie zuerst feststellen, wie lange Ihre Aufgabenliste sein wird.

+0

Ich versuche, den gesamten Code zu verstehen, bevor ich ihn in mein Projekt implementiere, aber wenn ich versuche, das Beispiel auszuführen, erhalte ich die Fehlermeldung, dass ein Objekt int nicht iterierbar ist. –

+0

Können Sie darauf hinweisen, über welche Zeile er sich beschwert? Vielleicht ist es die "Worker" -Funktion, die das Problem verursacht. Versuchen Sie, einen einfachen Druckbefehl in Ihren Test einzugeben – RaJa

+0

Ich habe meine Antwort mit einem Arbeitsbeispiel oder Ihrer Testfunktion korrigiert. Es läuft ohne Fehler auf meinem Python 3.5. – RaJa