2016-11-02 4 views
0

Ich habe ein paar Beispiele für While True-Worker geschrieben, die Daten aus einer Warteschlange abrufen. Im Prinzip sollten sich die Produzenten wie folgt verhalten, aber in Wirklichkeit sind es Daten von mehreren SSL-Socket-Feeds, daher sind die Produzenten hier für Simulationszwecke.Multiprozessing: Reaktionsverhalten vs CPU-Auslastung bei Verwendung mehrerer während der Warteschlange queue.get() Worker

Brüche von Millisekunden sind in meinem Fall wichtig, und ich kämpfe, wie man bestimmt, welche der folgenden Alternativen (oder eine andere Alternative) die beste ist. Ich habe verstanden, dass das Time Slicing die Reaktionsfähigkeit beeinflussen kann. Wenn ich versuche, den Code unten auszuführen, funktionieren alle Beispiele gut, wenn es um die CPU-Last geht, außer Alternative 5, die 100% meiner CPU verbraucht.

Dies ist meine Schlussfolgerung ist, dass das Blockieren .get() isst weniger CPU% als eine non-blocking .get() gegeben, dass die non-blocking .get() -Schleife ist eng mit sehr kurzen Schlaf festgelegt.

Mein Ziel ist es, Code zu implementieren, der wenig CPU-Zeit verbraucht und auf neue Updates reagiert. h., ich möchte, dass einer der Arbeiter innerhalb von 0,1 Millisekunden ab dem Zeitpunkt arbeitet, an dem ein neuer Gegenstand in der Schlange ankommt.

Beispielcode geschrieben in Python 2.7 für Windows 7 (beachten Sie, dass ich eine 24-Core-Maschine bin mit, so dass die Zahl der Arbeitnehmer, nach unten angepasst haben könnte sein):

import multiprocessing 
import os 
import time 
import Queue 


class MyClass: 
    def __init__(self): 
     self.the_queue = multiprocessing.Queue() 
     self.printerq = multiprocessing.Queue() 

    def producer(self): 
     i=0 
     while True: 
      self.the_queue.put(["hello",i],False) 
      i=i+1 
      time.sleep(1) 

    'alternative 1' 
    ''' 
    def worker_main(self): 
     while True: 
      try: 
       item = self.the_queue.get(timeout=0.1) 
      except Queue.Empty: 
       time.sleep(0.0001) 
      else: 
       self.printerq.put([item,os.getpid()],False) 
    ''' 

    'alternative 2' 
    ''' 
    def worker_main(self): 
     while True: 
      if not self.the_queue.empty(): 
       item = self.the_queue.get() 
       #print os.getpid(), "got", item 
       self.printerq.put([item,os.getpid()],False) 
      time.sleep(0.0001) 
    ''' 

    'alternative 3' 

    def worker_main(self): 
     while True: 
      item = self.the_queue.get() 
      self.printerq.put([item,os.getpid()],False) 

    'alternative 4' 
    ''' 
    def worker_main(self): 
     while True: 
      item = self.the_queue.get() 
      self.printerq.put([item,os.getpid()],False) 
      time.sleep(0.0001) 
    ''' 

    'alternative 5 eats CPU 100%' 
    ''' 
    def worker_main(self): 
     while True: 
      try: 
       item = self.the_queue.get(False) 
      except Queue.Empty: 
       time.sleep(0.0001) 
      else: 
       self.printerq.put([item,os.getpid()],False) 
       time.sleep(0.0001) 
    ''' 

    def printer(self): 
     while True: 
      stuff=self.printerq.get() 
      print stuff 


if __name__=='__main__': 
    mc=MyClass() 

    process_printer = multiprocessing.Process(target=mc.printer, args=()) 
    process_printer.start() 

    for i in range(100): 
     process_window = multiprocessing.Process(target=mc.worker_main, args=()) 
     process_window.start() 
     time.sleep(0.1) 

    for i in range(100): 
     process_producer = multiprocessing.Process(target=mc.producer, args=()) 
     process_producer.start() 
     time.sleep(0.1) 
+0

"Bruchteile von Millisekunden sind in meinem Fall wichtig" - sind Sie sicher, dass Python das richtige Werkzeug für den Job ist? –

+0

Ich hoffe, dass es für den Zweck funktioniert ... – user7104212

+0

Ich hoffe, dass es für den Zweck arbeiten wird ... Ich habe meine Berechnungszeit auf ca. 0,07ms festgelegt. Als ich mit der Programmierung begann, dauerte es ungefähr 6-7 Sekunden für jedes Update. Wenn ich sicherstellen kann, dass die Anwendung ausreichend reaktionsfähig ist und https-Anfragen von den Mitarbeitern innerhalb von 0,1 ms senden kann, würde ich wissen, dass die Reaktionsfähigkeit meiner Systeme bei etwa 0,2 ms liegen würde. – user7104212

Antwort

0

Sie sind höchstwahrscheinlich gehen Holen Sie die beste Leistung, indem Sie alle time.sleep() Anrufe entfernen. Ich würde empfehlen, queue.get() zu verwenden, die ein beschäftigtes warten und Kill-Flags verwenden, um Ihre Prozesse zu beenden. Ihre Arbeiter sollte wie folgt aussehen:

def worker_main(self): 
    while True: 
     item = self.the_queue.get() 
     # Checks for kill flag 
     if item == None: 
      break 
     self.printerq.put([item,os.getpid()],False) 

Diese Methode erfordert, dass Ihr Produzent None in self.the_queue setzen. Da Sie jedoch mehrere Produzenten haben, möchten Sie vielleicht die Arbeiter so modifizieren, dass sie von jedem Produzenten eine Kill-Flagge erhalten. Etwas wie:

def worker_main(self): 
    kill_flags = 0   
    # Make sure to pass in num_producers 
    while kill_flags < num_producers: 
     item = self.the_queue.get() 
     # Checks for kill flag 
     if item == None: 
      kill_flag += 1 
      continue 
     self.printerq.put([item,os.getpid()],False) 

Soweit Ihre Produzenten gehen, ein ähnlicher Ansatz funktionieren würde, wenn sie fertig produzieren:

def producer(self): 
    i=0 
    while (some_condition_to_stop_producing): 
     self.the_queue.put(["hello",i],False) 
     i=i+1 
     time.sleep(1) 
    # Pass in the number of workers 
    for i in range(num_workers): 
     self.the_queue.put(None) 

Vergessen Sie nicht, dass Sie auch den Drucker ändern müssen beenden wenn du fertig bist, Dinge in printerq zu setzen.

+0

Danke Navidad20. Sowohl für die Antwort als auch für die Bearbeitung. Das war die Lösung, auf die ich mich auch einließ. Die eine Sache, auf die ich gespannt bin, ist, wie diese Art von Lösung die Reaktionsfähigkeit in anderen Prozessen/Threads beeinflusst, wie die CPU versteht, dass sie zwischen ihnen wechseln sollte, und die Wartezeit des Systems, bevor ein Wechsel bei Windows 7 stattfindet. Multiprozessing ist ein bisschen Schwierig, wenn es darum geht, die Laufzeit eines Algo ... – user7104212

+0

Haben Sie in Betracht gezogen, jede Ihrer Funktionen zeitlich zu steuern, um zu sehen, welche am längsten dauern? Die Funktion "mulitprocessing.cpu_count" kann Ihnen die Anzahl der ausgeführten logischen Kerne anzeigen und Ihnen so die entsprechende Anzahl von Prozessen zuweisen, dass keine Worker/Producer darauf warten, einen Prozess zu erhalten. – Navidad20

Verwandte Themen