2013-06-21 2 views
12

ich dieses Problem in Python habe:eine Warteschlange Abfüll- und in Multiprozessor-Python-Verwaltung

  • ich eine Warteschlange von URLs, die ich von Zeit zu Zeit wird
  • wenn die Warteschlange gefüllt überprüfen muß bis , ich brauche jedes Element in der Warteschlange durch einen einzigen Prozess (Multiprocessing)

ich bisher verarbeitet werden muß

  • Jedes Element in der Warteschlange zu verarbeiten verwalten diese „von Hand“ wie folgt zu erreichen:

    while 1: 
         self.updateQueue() 
    
         while not self.mainUrlQueue.empty(): 
          domain = self.mainUrlQueue.get() 
    
          # if we didn't launched any process yet, we need to do so 
          if len(self.jobs) < maxprocess: 
           self.startJob(domain) 
           #time.sleep(1) 
          else: 
           # If we already have process started we need to clear the old process in our pool and start new ones 
           jobdone = 0 
    
           # We circle through each of the process, until we find one free ; only then leave the loop 
           while jobdone == 0: 
            for p in self.jobs : 
             #print "entering loop" 
             # if the process finished 
             if not p.is_alive() and jobdone == 0: 
              #print str(p.pid) + " job dead, starting new one" 
              self.jobs.remove(p) 
              self.startJob(domain) 
              jobdone = 1 
    

    Aber das führt zu Tonnen von Problemen und Fehlern. Ich fragte mich, ob ich mit einem Prozesspool nicht besser geeignet wäre. Was wäre der richtige Weg?

    Allerdings ist meine Warteschlange oft leer, und es kann in einer Sekunde mit 300 Elementen gefüllt werden, also bin ich mir nicht sicher, wie ich die Dinge hier machen soll.

  • Antwort

    20

    Sie können die Blockierungsfunktionen von queue verwenden, um mehrere Prozesse beim Start zu spawnen (mit multiprocessing.Pool) und sie schlafen zu lassen, bis einige Daten in der Warteschlange zur Verarbeitung verfügbar sind. Wenn Ihr nicht vertraut mit dem, könnten Sie versuchen, zu „spielen“ mit diesem einfachen Programm:

    import multiprocessing 
    import os 
    import time 
    
    the_queue = multiprocessing.Queue() 
    
    
    def worker_main(queue): 
        print os.getpid(),"working" 
        while True: 
         item = queue.get(True) 
         print os.getpid(), "got", item 
         time.sleep(1) # simulate a "long" operation 
    
    the_pool = multiprocessing.Pool(3, worker_main,(the_queue,)) 
    #       don't forget the coma here^
    
    for i in range(5): 
        the_queue.put("hello") 
        the_queue.put("world") 
    
    
    time.sleep(10) 
    

    Getestet mit Python 2.7.3 auf Linux

    Diese drei Prozesse spawnen (zusätzlich von der Elternprozess). Jedes Kind führt die worker_main-Funktion aus. Es ist eine einfache Schleife, die bei jeder Iteration ein neues Element aus der Warteschlange erhält. Arbeiter werden blockiert, wenn nichts bereit ist zu verarbeiten.

    Beim Start alle 3 Prozess wird schlafen, bis die Warteschlange mit einigen Daten gefüttert wird. Wenn Daten verfügbar sind, erhält einer der wartenden Mitarbeiter diesen Artikel und beginnt ihn zu verarbeiten. Danach versucht es, ein anderes Element aus der Warteschlange zu bekommen, wartet wieder auf, wenn nichts verfügbar ist ...

    +0

    dies funktioniert nicht auf Windows in Python 2.7.4, müssen Sie die if __name__ = '__main__' Teil und Sie sollten die the_queue als dritter Parameter in die multiprocessing.Pool-Funktion übergeben, sonst erhält die worker_main nicht die Daten – jhexp

    +0

    Ich bin auch daran interessiert, wie dieses Stück Code arbeiten. Wenn ich es so laufe, wie es ist, wird es ausgeführt, aber es druckt nichts, wahrscheinlich weil die worker_main die Daten nicht empfängt. Aber wenn ich the_queue als dritten Parameter übergeben habe, habe ich TypeError: worker_main() Argument nach * muss eine Sequenz sein, nicht Queue – ziky90

    +0

    @ ziky90 Sie haben wahrscheinlich das Koma in '(queue,)' vergessen. Ich habe den Code bearbeitet, um einen Kommentar hinzuzufügen, der auf diese mögliche Fehlerquelle hinweist. –

    Verwandte Themen