2017-12-19 1 views
0

Dies ist mit meinem earlier problem, die ich noch an der Lösung arbeiten. Im Wesentlichen brauche ich das inverse Design von ProcessPoolExecutor, wo ich viele Abfrageprozesse und einen Worker habe, der Ergebnisse in Stapeln berechnet und zurücksendet.Inverse von ProcessPoolExecutor in Python

Das Senden der Arbeitsaufgaben ist mit einer gemeinsam genutzten Warteschlange einfach, aber ich habe immer noch keine gute Lösung, um alle Ergebnisse an die richtigen Threads der richtigen Prozesse zu senden.

Antwort

1

Ich denke, es ist am sinnvollsten, eine separate multiprocessing.pipe für jeden Abfrageprozess zu haben. Der Worker-Prozess wartet auf ein verfügbares Element in einer beliebigen Pipe, und die Warteschlange löscht und verarbeitet sie und verfolgt, aus welcher Pipe sie stammt. Wenn es Zeit ist, Daten zurückzusenden, führt es die Ergebnisse in die richtige Pipe ein.

Hier ist ein einfaches Beispiel:

#!/usr/bin/env python3 

import multiprocessing as mp 

def worker(pipes): 
    quit = [False] * len(pipes) 
    results = [''] * len(pipes) 

    # Wait for all workers to send None before quitting 
    while not all(quit): 
     ready = mp.connection.wait(pipes) 
     for pipe in ready: 

      # Get index of query proc's pipe 
      i = pipes.index(pipe) 

      # Receive and "process" 
      obj = pipe.recv() 
      if obj is None: 
       quit[i] = True 
       continue 
      result = str(obj) 
      results[i] += result 

      # Send back to query proc 
      pipes[i].send(result) 
    print(results) 


def query(pipe): 
    for i in 'do some work': 
     pipe.send(i) 
     assert pipe.recv() == i 
    pipe.send(None) # Send sentinel 

if __name__ == '__main__': 
    nquery_procs = 8 
    work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs))) 

    query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes] 
    for p in query_procs: 
     p.start() 
    worker(work_pipes) 
    for p in query_procs: 
     p.join() 

Alternativ können Sie jeder Abfrage Prozess eine ID-Nummer geben (die gerade seine Pfeife Index sein könnte), und jede Anfrage muss ein Tupel sein, die (id_num, data) ist. Das geht nur um den Worker-Prozess herum, der pipes.index(pipe) auf jeder Schleife macht, also bin ich mir nicht sicher, wie viel es dich kauft.

+0

Danke! Ich habe tatsächlich einen Threadpool für jeden Prozess, also denke ich, dass ich für jeden Thread eine Pipe machen muss. Gibt es einen erheblichen Mehraufwand, um für jede Anforderung (mehrere hundert/Sek.) Eine neue Pipe() zu erstellen und diese nur in der Anforderungsleitung zu senden? Oder müsste es ein Manager sein(). Queue(), um pickbar zu sein? In diesem Fall, was bedeutet es sogar zu sagen, Multiprozessing.Quee ist prozesssicher ... – Akababa

+1

@Akababa Sie müssten Profil um sicher zu sein, aber ich würde wetten, dass das Erstellen einer neuen Leitung für jede Anfrage teuer ist . Vor allem, wenn jeder Thread mehrere Anfragen stellt, würde ich sie alle am Anfang erstellen. – bnaecker

+0

Schnelle Folge: Ich habe es funktioniert mit einem Pool von gemeinsamen Warteschlangen, knallte einen und legte es in das Arbeitselement. Da das "Beizen" nur einen Zeiger auf den gemeinsamen Speicher beinhaltet, sollte dies effizient sein, oder? – Akababa