0

Ich mache eine Simulation, in der ich die Snapshots des Zustandsvektors speichern möchte, und berechne es für verschiedene Parameter. Ich habe zwei Kontrollparameter, die ich scannen soll (p und a im Beispiel unten). Ich speichere daher die Simulationsergebnisse für eine netCDF4-Datei, in der zwei der Dimensionen für die zwei Steuerparameter sind. Wenn ich die Simulation für einen Parametersatz ausführe, wird die Datei korrekt gespeichert, aber wenn ich versuche, apply_async von multiprocessing auszuführen, ist das netCDF4 am Ende des Prozesses nicht zugänglich.Python Multiprocessing mit parallelen Speichern in netCDF4 Datei

Mein vollständiger Code ist an diesen github repository, aber im Grunde, was ich versuche, dies zu tun ist:

import multiprocessing as mp 
import time as timer 
import netCDF4 
import numpy as np 
def run_sim_for_p_a(p,a,pstep,astep,step,max_time,u0,fname): 
    time_ar=np.arange(0,max_time,step) 
    u = np.ones((len(time_ar),1024)) 
    u[0]=u0 
    print "Calculating for p,a:",p,a 
    for i,t in enumerate(time_ar[1:]): 
     u[i+1] = u[i]*np.cos(t)*np.sin(a)*np.sin(p) 
    for tstep,t in enumerate(time_ar): 
     save_p_a_snapshot(fname,pstep,astep,tstep,p,a,t,u[tstep]) # save the results into the netCDF4 file 

def apply_async_and_save_grid(pmin,pmax,fname, 
           Np=10,Na=10, 
           step=None,max_time=500.0,numproc=10): 
    start = timer.time() 
    setup_p_a_scan(fname) # Setup a netCDF4 file for the simulations 
    if step is None: 
     step=max_time 
    p_range = np.linspace(pmin,pmax,Np) 
    init = np.random.random((1024)) 
    a_range = np.linspace(0,1,Na) 
    availble_cpus = int(available_cpu_count() - 2) 
    numproc=min(numproc,availble_cpus) 
    print "Using",numproc," processors" 
    pool = mp.Pool(processes=numproc) 
    for i,p in enumerate(p_range): 
     for j,a in enumerate(a_range): 
      pool.apply_async(run_sim_for_p_a, 
          args = (p,a,i,j,step,max_time,init,fname)) 
    pool.close() 
    pool.join() 
    print "Took ",timer.time()-start 
if __name__=="__main__": 
    apply_async_and_save_grid(1.0,2.0,"test",Np=2,Na=4,step=1.0,max_time=10) 
+1

Sie haben mehrere Prozesse asynchron, nicht * Kommunikation *, Zugriff auf die gleiche Datei. Könnte diesen Beitrag sehen, [Keynote on Concurrency] (https://www.youtube.com/watch?v=9zinZmE3Ogk) – wwii

+0

Ja, die Prozesse kommunizieren nicht zwischen ihnen, aber speichern Sie die Ergebnisse in der gleichen Datei. – Ohm

Antwort

1

Es gibt mindestens zwei mögliche Ansätze:

Sie könnten jeden Arbeitsprozess Schreib haben Die Ergebnisse werden in einer eigenen netCDF4-Datei gespeichert und das Hauptprogramm wird zusammengeführt, nachdem alle Worker fertig sind.

Ich bin nicht vertraut mit dem netCDF-Format, aber unter der Annahme, dass es möglich ist, an solche Dateien anzuhängen, ist eine andere Möglichkeit, eine multiprocessing.Lock vor dem Start apply_async zu erstellen. Diese Sperre sollte zu den Parametern für den Arbeitsprozess hinzugefügt werden. Der Arbeitsprozess sollte acquire das Schloss, öffnen Sie die Netcdf-Datei, schreiben Sie es und schließen Sie es. Dann sollte es release das Schloss sein. Dadurch wird sichergestellt, dass jeweils nur ein Prozess in die netCDF-Datei geschrieben wird.

bearbeiten: Siehe die Antwort auf this question, wie man ein Lock mit einem Pool zu behandeln.

+0

Hervorragend, ich musste entweder einen Manager oder eine globale Sperre hinzufügen, da ich einen Pool von Arbeitern verwende, also habe ich auf diesen Thread verwiesen - https://stackoverflow.com/questions/25557686/python-sharing-a- Lock-Between-Prozesse – Ohm

+0

@Ohm Excellent finden! Es gibt einige Feinheiten w.r.t. ein 'Pool' und' Lock', die ich nicht realisiert hatte. Ich habe meine Antwort aktualisiert. –