2017-10-14 1 views
1

Ich möchte einige Prozesse einrichten, die eine Eingabe nehmen und verarbeiten und das Ergebnis dieses Ergebnisses ist eine andere Aufgabe, die ich behandelt werden möchte. Im Wesentlichen führt jede Aufgabe zu null oder mehreren neuen Aufgaben (des gleichen Typs), schließlich werden alle Aufgaben keine neuen Aufgaben ergeben.Verarbeitung beenden, wenn alle Prozesse versuchen, aus der Warteschlange zu kommen und die Warteschlange leer ist?

Ich dachte, eine Warteschlange wäre dafür gut, also habe ich eine Eingabewarteschlange und eine Ergebniswarteschlange, um die Aufgaben hinzuzufügen, die nichts Neues ergeben. Zu einem bestimmten Zeitpunkt ist die Warteschlange möglicherweise leer, es kann jedoch mehr hinzugefügt werden, wenn ein anderer Prozess an einer Aufgabe arbeitet.

Daher möchte ich nur, dass es beendet wird, wenn alle Prozesse gleichzeitig versuchen, aus der Eingabewarteschlange zu kommen.

Ich bin völlig neu sowohl für Python Multiprocessing und Multiprozessing im Allgemeinen.

Edited einen grundlegenden Überblick über hinzufügen, was ich meine:

class Consumer(Process): 
    def __init__(self, name): 
     super().__init__(name=name) 

    def run(): 
     # This is where I would have the task try to get a new task off of the 
     # queue and then calculate the results and put them into the queue 
     # After which it would then try to get a new task and repeat 

     # If this an all other processes are trying to get and the queue is 
     # empty That is the only time I know that everything is complete and can 
     # continue 
     pass 

def start_processing(): 
    in_queue = Queue() 
    results_queue = Queue() 
    consumers = [Consumer(str(i)) for i in range(cpu_count())] 

    for i in consumers: 
     i.start() 

    # Wait for the above mentioned conditions to be true before continuing 
+0

An diesem Punkt habe ich nur die absolute Skelett Code weil dies wie etwas scheint, ich brauche aus zu arbeiten, bevor ich vorwärts zu bewegen. Grundsätzlich habe ich die Prozesse und die Warteschlangen erstellt. –

+0

Ich habe einen Übersichtscode hinzugefügt –

Antwort

1

Die JoinableQueue entworfen wurde, um diesen Zweck zu passen. Beitreten einer JoinableQueue wird blockiert, bis Aufgaben in Arbeit sind.

Sie können es wie folgt verwenden: Der Hauptprozess wird eine bestimmte Anzahl von Worker-Prozessen hervorbringen, die ihnen die JoinableQueue zuweisen. Die Worker-Prozesse verwenden die Warteschlange, um neue Aufgaben zu erstellen und zu konsumieren. Der Hauptprozess wartet, indem er die Warteschlange einrichtet, bis keine weiteren Aufgaben ausgeführt werden. Danach werden die Worker-Prozesse beendet und beendet.

Ein sehr vereinfachtes Beispiel (Pseudo-Code):

def consumer(queue): 
    for task in queue.get(): 
     results = process_task(task) 

     if 'more_tasks' in results: 
      for new_task in results['more_tasks']: 
       queue.put(new_task) 

     # signal the queue that a task has been completed 
     queue.task_done() 

def main(): 
    queue = JoinableQueue() 

    processes = start_processes(consumer, queue) 

    for task in initial_tasks: 
     queue.put(task) 

    queue.join() # block until all work is done 

    terminate_processes(processes) 
Verwandte Themen