Ich denke, es ist am sinnvollsten, eine separate multiprocessing.pipe
für jeden Abfrageprozess zu haben. Der Worker-Prozess wartet auf ein verfügbares Element in einer beliebigen Pipe, und die Warteschlange löscht und verarbeitet sie und verfolgt, aus welcher Pipe sie stammt. Wenn es Zeit ist, Daten zurückzusenden, führt es die Ergebnisse in die richtige Pipe ein.
Hier ist ein einfaches Beispiel:
#!/usr/bin/env python3
import multiprocessing as mp
def worker(pipes):
quit = [False] * len(pipes)
results = [''] * len(pipes)
# Wait for all workers to send None before quitting
while not all(quit):
ready = mp.connection.wait(pipes)
for pipe in ready:
# Get index of query proc's pipe
i = pipes.index(pipe)
# Receive and "process"
obj = pipe.recv()
if obj is None:
quit[i] = True
continue
result = str(obj)
results[i] += result
# Send back to query proc
pipes[i].send(result)
print(results)
def query(pipe):
for i in 'do some work':
pipe.send(i)
assert pipe.recv() == i
pipe.send(None) # Send sentinel
if __name__ == '__main__':
nquery_procs = 8
work_pipes, query_pipes = zip(*(mp.Pipe() for _ in range(nquery_procs)))
query_procs = [mp.Process(target=query, args=(pipe,)) for pipe in query_pipes]
for p in query_procs:
p.start()
worker(work_pipes)
for p in query_procs:
p.join()
Alternativ können Sie jeder Abfrage Prozess eine ID-Nummer geben (die gerade seine Pfeife Index sein könnte), und jede Anfrage muss ein Tupel sein, die (id_num, data)
ist. Das geht nur um den Worker-Prozess herum, der pipes.index(pipe)
auf jeder Schleife macht, also bin ich mir nicht sicher, wie viel es dich kauft.
Danke! Ich habe tatsächlich einen Threadpool für jeden Prozess, also denke ich, dass ich für jeden Thread eine Pipe machen muss. Gibt es einen erheblichen Mehraufwand, um für jede Anforderung (mehrere hundert/Sek.) Eine neue Pipe() zu erstellen und diese nur in der Anforderungsleitung zu senden? Oder müsste es ein Manager sein(). Queue(), um pickbar zu sein? In diesem Fall, was bedeutet es sogar zu sagen, Multiprozessing.Quee ist prozesssicher ... – Akababa
@Akababa Sie müssten Profil um sicher zu sein, aber ich würde wetten, dass das Erstellen einer neuen Leitung für jede Anfrage teuer ist . Vor allem, wenn jeder Thread mehrere Anfragen stellt, würde ich sie alle am Anfang erstellen. – bnaecker
Schnelle Folge: Ich habe es funktioniert mit einem Pool von gemeinsamen Warteschlangen, knallte einen und legte es in das Arbeitselement. Da das "Beizen" nur einen Zeiger auf den gemeinsamen Speicher beinhaltet, sollte dies effizient sein, oder? – Akababa