2015-02-06 4 views
8

Ich habe eine Funktion, die einige Simulationen durchführt und gibt ein Array im String-Format.Python Multiprocessing - verfolgen den Prozess der pool.map Operation

Ich möchte die Simulation (die Funktion) für variierende Eingangsparameterwerte, über 10000 mögliche Eingabewerte, ausführen und die Ergebnisse in eine einzige Datei schreiben.

Ich verwende Multiprocessing, speziell pool.map Funktion , um die Simulationen parallel zu laufen.

Da der gesamte Prozess der Ausführung der Simulationsfunktion über 10000 mal eine sehr lange Zeit dauert, möchte ich wirklich den Prozess der gesamten Operation verfolgen.

Ich denke, das Problem in meinem aktuellen Code unten ist, dass pool.map die Funktion 10000 mal ohne Prozessverfolgung während dieser Vorgänge ausführt. Sobald die parallele Verarbeitung 10000 Simulationen (das können Stunden bis Tage sein) abgeschlossen ist, werde ich verfolgt, wenn 10000 Simulationsergebnisse in einer Datei gespeichert werden. Dies verfolgt nicht wirklich die Verarbeitung von pool.map.

Gibt es eine einfache Lösung für meinen Code, die Prozessverfolgung ermöglicht?

def simFunction(input): 
    # Does some simulation and outputs simResult 
    return str(simResult) 

# Parallel processing 

inputs = np.arange(0,10000,1) 

if __name__ == "__main__": 
    numCores = multiprocessing.cpu_count() 
    pool = multiprocessing.Pool(processes = numCores) 
    t = pool.map(simFunction, inputs) 
    with open('results.txt','w') as out: 
     print("Starting to simulate " + str(len(inputs)) + " input values...") 
     counter = 0 
     for i in t: 
      out.write(i + '\n') 
      counter = counter + 1 
      if counter%100==0: 
       print(str(counter) + " of " + str(len(inputs)) + " input values simulated") 
    print('Finished!!!!') 

Antwort

7

Wenn Sie eine iterierte map-Funktion verwenden, ist es ziemlich einfach, den Fortschritt zu verfolgen.

>>> from pathos.multiprocessing import ProcessingPool as Pool 
>>> def simFunction(x,y): 
... import time 
... time.sleep(2) 
... return x**2 + y 
... 
>>> x,y = range(100),range(-100,100,2) 
>>> res = Pool().imap(simFunction, x,y) 
>>> with open('results.txt', 'w') as out: 
... for i in x: 
...  out.write("%s\n" % res.next()) 
...  if i%10 is 0: 
...  print "%s of %s simulated" % (i, len(x)) 
... 
0 of 100 simulated 
10 of 100 simulated 
20 of 100 simulated 
30 of 100 simulated 
40 of 100 simulated 
50 of 100 simulated 
60 of 100 simulated 
70 of 100 simulated 
80 of 100 simulated 
90 of 100 simulated 

Oder Sie können eine asynchrone map verwenden. Hier werde ich die Dinge etwas anders machen, nur um es zu vermischen.

>>> import time 
>>> res = Pool().amap(simFunction, x,y) 
>>> while not res.ready(): 
... print "waiting..." 
... time.sleep(5) 
... 
waiting... 
waiting... 
waiting... 
waiting... 
>>> res.get() 
[-100, -97, -92, -85, -76, -65, -52, -37, -20, -1, 20, 43, 68, 95, 124, 155, 188, 223, 260, 299, 340, 383, 428, 475, 524, 575, 628, 683, 740, 799, 860, 923, 988, 1055, 1124, 1195, 1268, 1343, 1420, 1499, 1580, 1663, 1748, 1835, 1924, 2015, 2108, 2203, 2300, 2399, 2500, 2603, 2708, 2815, 2924, 3035, 3148, 3263, 3380, 3499, 3620, 3743, 3868, 3995, 4124, 4255, 4388, 4523, 4660, 4799, 4940, 5083, 5228, 5375, 5524, 5675, 5828, 5983, 6140, 6299, 6460, 6623, 6788, 6955, 7124, 7295, 7468, 7643, 7820, 7999, 8180, 8363, 8548, 8735, 8924, 9115, 9308, 9503, 9700, 9899] 

Bitte beachte, dass ich pathos.multiprocessing statt multiprocessing verwenden. Es ist nur eine Verzweigung von multiprocessing, die Sie map Funktionen mit mehreren Eingängen tun können, hat viel bessere Serialisierung und ermöglicht es Ihnen, map Anrufe überall auszuführen (nicht nur in __main__). Sie könnten multiprocessing verwenden, um das oben genannte auch zu tun, jedoch würde der Code sehr ein bisschen anders sein.

Entweder eine iterierte oder asynchrone map ermöglicht es Ihnen, den Code zu schreiben, den Sie für eine bessere Prozessverfolgung verwenden möchten.Übergeben Sie zum Beispiel eine eindeutige "ID" an jeden Job und sehen Sie sich an, welche zurückkommen oder ob jeder Job seine Prozess-ID zurückgibt. Es gibt viele Möglichkeiten, um Fortschritte und Prozesse zu verfolgen ... aber das obige sollte Ihnen einen Anfang geben.

können Sie pathos hier: https://github.com/uqfoundation

+0

vielen dank! – user32147

3

Es gibt keine "einfache Lösung". map geht es darum, Implementierungsdetails von Ihnen zu verbergen. Und in diesem Fall wünschen Sie Details. Das heißt, die Dinge werden definitionsgemäß etwas komplexer. Sie müssen das Kommunikationsparadigma ändern. Es gibt viele Möglichkeiten, dies zu tun.

Eins ist: Erstellen Sie eine Warteschlange zum Sammeln Ihrer Ergebnisse und lassen Sie Ihre Mitarbeiter Ergebnisse in diese Warteschlange einreihen. Sie können dann innerhalb eines Überwachungsthreads oder -prozesses die Warteschlange anzeigen und die Ergebnisse konsumieren, während sie eingehen. Während des Verbrauchs können Sie sie analysieren und eine Protokollausgabe generieren. Dies könnte der allgemeinste Weg sein, den Fortschritt zu verfolgen: Sie können auf eingehende Ergebnisse in jeder Art und Weise reagieren, in Echtzeit.

Ein einfacherer Weg könnte darin bestehen, Ihre Worker-Funktion geringfügig zu modifizieren und dort Log-Ausgaben zu generieren. Durch sorgfältige Analyse der Log-Ausgabe mit externen Tools (wie grep und wc), können Sie mit sehr einfachen Mitteln den Überblick behalten.

+1

danke. Könnten Sie bitte ein einfaches Beispiel geben? – user32147

3

Ich denke, was Sie brauchen, ist ein Protokolldatei.

Ich würde Ihnen empfehlen, das Logging Modul zu verwenden, das Teil der Python-Standardbibliothek ist. Aber leider Protokollierung ist nicht Multiprocessing-sicher. Sie können es also nicht direkt in Ihrer App verwenden.

Sie müssen also einen Multiprocessing-sicheren Log-Handler verwenden oder Ihren mit einer Queue implementieren oder zusammen mit dem Logging-Modul sperren.

Es gibt eine Menge Diskussion darüber in Stackoverflow. Diese zum Beispiel: How should I log while using multiprocessing in Python?

Wenn die meisten der CPU-Last in der Simulationsfunktion ist, und Sie werden nicht Logfolge verwenden, können Sie wahrscheinlich einen einfachen Verriegelungsmechanismus wie folgt verwenden:

import multiprocessing 
import logging 

from random import random 
import time 


logging.basicConfig(
    level=logging.DEBUG, 
    format='%(asctime)s %(process)s %(levelname)s %(message)s', 
    filename='results.log', 
    filemode='a' 
) 


def simulation(a): 
    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Simulating with %s" % a) 

    # simulation 
    time.sleep(random()) 
    result = a*2 

    # logging 
    with multiprocessing.Lock(): 
     logging.debug("Finished simulation with %s. Result is %s" % (a, result)) 

    return result 

if __name__ == '__main__': 

    logging.debug("Starting the simulation") 
    inputs = [x for x in xrange(100)] 
    num_cores = multiprocessing.cpu_count() 
    print "num_cores: %d" % num_cores 
    pool = multiprocessing.Pool(processes=num_cores) 
    t = pool.map(simulation, inputs) 
    logging.debug("The simulation has ended") 

Sie können "tail -f" Ihre Log-Datei beim Ausführen. Dies ist, was Sie sehen sollten:

2015-02-08 18:10:00,616 3957 DEBUG Starting the simulation 
2015-02-08 18:10:00,819 3961 DEBUG Simulating with 12 
2015-02-08 18:10:00,861 3965 DEBUG Simulating with 28 
2015-02-08 18:10:00,843 3960 DEBUG Simulating with 20 
2015-02-08 18:10:00,832 3959 DEBUG Simulating with 16 
2015-02-08 18:10:00,812 3958 DEBUG Simulating with 8 
2015-02-08 18:10:00,798 3963 DEBUG Simulating with 4 
2015-02-08 18:10:00,855 3964 DEBUG Simulating with 24 
2015-02-08 18:10:00,781 3962 DEBUG Simulating with 0 
2015-02-08 18:10:00,981 3961 DEBUG Finished simulation with 12. Result is 24 
2015-02-08 18:10:00,981 3961 DEBUG Simulating with 13 
2015-02-08 18:10:00,991 3958 DEBUG Finished simulation with 8. Result is 16 
2015-02-08 18:10:00,991 3958 DEBUG Simulating with 9 
2015-02-08 18:10:01,130 3964 DEBUG Finished simulation with 24. Result is 48 
2015-02-08 18:10:01,131 3964 DEBUG Simulating with 25 
2015-02-08 18:10:01,134 3964 DEBUG Finished simulation with 25. Result is 50 
2015-02-08 18:10:01,134 3964 DEBUG Simulating with 26 
2015-02-08 18:10:01,315 3961 DEBUG Finished simulation with 13. Result is 26 
2015-02-08 18:10:01,316 3961 DEBUG Simulating with 14 
2015-02-08 18:10:01,391 3961 DEBUG Finished simulation with 14. Result is 28 
2015-02-08 18:10:01,391 3961 DEBUG Simulating with 15 
2015-02-08 18:10:01,392 3963 DEBUG Finished simulation with 4. Result is 8 
2015-02-08 18:10:01,393 3963 DEBUG Simulating with 5 

Versucht auf Windows und Linux.

Hoffe, das hilft

+0

'multiprocessing.get_logger()' gibt einen funktionsbeschränkten Logger zurück, der durch Sperren geschützt ist, siehe https://docs.python.org/2/library/multiprocessing.html#logging –

+0

Ja, aber dies ist der Modul-Logger ... so altought können Sie es verwenden, wird Ihr Protokoll mit Modulebene Nachrichten gemischt: Versuchen Sie es und Sie werden Nachrichten wie folgt angezeigt: 2015-02-08 23: 47: 10,954 9288 DEBUG erstellt Semlock mit Handle 448 –

+0

Oh, du bist Richtig, ich habe es nie wirklich benutzt und die Dokumente zu schnell durchforstet. –

Verwandte Themen