2017-03-05 5 views
0

Ich versuche, eine JSON-Datei mit multiprocessing zu ändern. Ich wäre in der Lage, den JSON in Chunks aufzuteilen, so dass jeder Prozess nur Zugriff auf einen bestimmten Teil des JSON hat und diesen verändert (es ist also garantiert, dass keine zwei Prozesse dasselbe Attribut modifizieren wollen). Meine Frage ist, wie kann ich das JSON-Objekt zwischen Prozessen freigeben, so dass die Änderungen auf dem ursprünglichen Objekt widergespiegelt werden? Ich weiß, dass multiprocessing das Objekt als Kopie übergibt, also müsste ich eine Manager() verwenden, aber wie genau kann ich das tun? Zur Zeit habe ichPython Multiprocessing - JSON über mehrere Prozesse ändern

def parallelUpdateJSON(datachunk): 
    for feature in datachunk: 
     #modify chunk 

def writeGeoJSON(): 
    with open('geo.geojson') as f: 
     data = json.load(f) 
    pool = Pool() 
    for i in range(0, mp.cpu_count())): 
     #chunk data into a list, so I get listofchunks = [chunk1, chunk2, etc.,] 
     #where chunk1 = data[0:chunksize], chunk2 = data[chunksize:2*chunksize] etc. 
    pool.map(parallelUpdateJSON, listofchunks) 
    pool.close() 
    pool.join() 
    with open('test_parallel.geojson', 'w') as outfile: 
     json.dump(data, outfile) 

Aber natürlich diese übergibt die Brocken als Kopien, so dass die ursprüngliche data Objekt nicht geändert hat bekommen. Wie kann ich es so machen, dass data tatsächlich von den Prozessen geändert wird? Vielen Dank!

+0

Sie benötigen eine Warteschlange zu verwenden, in dem Sie die JSON-Objekte setzen zu ändern, und eine andere Warteschlange, in der die Prozesse wird das Ergebnis – BlackBear

Antwort

-1

Es ist wahrscheinlich eine bessere Idee, synchronen Zugriff auf Ihre Ausgabedatei zu vermeiden. Es wird viel einfacher sein, nur N partielle Ausgaben zu produzieren und sie zu einer Eigenschaft Ihres json-Objekts zusammenzufügen. Dann können Sie dieses Objekt in eine Datei ablegen.

def process(work): 
    return str(work[::-1]) 

if __name__ == "__main__": 
    p = Pool() 
    structure = json.loads(""" 
    { "list": 
     [ 
      "the quick brown fox jumped over the lazy dog", 
      "the quick brown dog jumped over the lazy fox" 
     ] 
    } 
    """) 
    structure["results"] = p.map(process, structure["list"]) 
    #print(json.dumps(structure)) 
    with open("result.json", "w") as f: 
     json.dump(structure, f) 
+0

Dank gesagt, ich auch 'Rückkehr datachunk' in' parallelUpdateJSON' haben würde Recht? –

+0

Auch bekomme ich 'TypeError: 'ApplyResult' Objekt ist nicht iterable dafür. –

+0

Wahrscheinlich, weil die [Dokumentation] (https://docs.python.org/2/library/multiprocessing.html#multiprocessing.pool.multiprocessing.Pool.apply) angibt, dass "func" nur in einem der Worker des. Ausgeführt wird pool ", so dass die Funktion nicht allen Chunks zugeordnet wird, sondern nur ein einzelner Prozess gestartet wird. –

Verwandte Themen