2009-08-02 4 views
1

Hallo zusammen, ich habe ein paar Probleme mit der Fehlersuche in meinem Code. Bitte schauen Sie unter:Multiprocessing Debugging-Fehler

import globalFunc 
from globalFunc import systemPrint 
from globalFunc import out 
from globalFunc import debug 
import math 
import time 
import multiprocessing 

""" 
Somehow this is not working well 
""" 
class urlServerM(multiprocessing.Process): 

    """ 
    This calculates how much links get put into the priority queue 
    so to reach the level that we intend, for every query resultset, 
    we will put the a certain number of links into visitNext first, 
    and even if every resultSet is full, we will be able to achieve the link 
    level that we intended. The rest is pushed into another list where 
    if the first set of lists don't have max for every time, the remaining will 
    be spared on these links 
    """ 
    def getPriorityCounter(self, level, constraint): 
       return int(math.exp((math.log(constraint)/(level - 1)))) 


    def __init__(self, level, constraint, urlQ): 
     """limit is obtained via ngCrawler.getPriorityNum""" 
     multiprocessing.Process.__init__(self) 
     self.constraint = int(constraint) 
     self.limit = self.getPriorityCounter(level, self.constraint) 
     self.visitNext = [] 
     self.visitLater = [] 
     self._count = 0 
     self.urlQ = urlQ 


    """ 
    puts the next into the Queue 
    """ 
    def putNextIntoQ(self): 
     debug('putNextIntoQ', str(self.visitNext) + str(self.visitLater)) 
     if self.visitNext != []: 
      _tmp = self.visitNext[0] 
      self.visitNext.remove(_tmp) 
      self.urlQ.put(_tmp) 

     elif self.visitLater != []: 
      _tmp = self.visitLater[0] 
      self.visitLater.remove(_tmp) 
      self.urlQ.put(_tmp) 


    def run(self): 
     while True: 
      if self.hasNext(): 
       time.sleep(0.5) 
       self.putNextIntoQ() 
       debug('process', 'put something in Q already') 
      else: 
       out('process', 'Nothing in visitNext or visitLater, sleeping') 
       time.sleep(2) 
     return 


    def hasNext(self): 
     debug('hasnext', str(self.visitNext) + str(self.visitLater)) 
     if self.visitNext != []: 
      return True 
     elif self.visitLater != []: 
      return True 
     return False 


    """ 
    This function resets the counter 
    which is used to keep track of how much is already inside the 
    visitNext vs visitLater 
    """ 
    def reset(self): 
     self._count = 0 


    def store(self, linkS): 
     """Stores a link into one of these list""" 
     if self._count < self.limit: 
      self.visitNext.append(linkS) 
      debug('put', 'something is put inside visitNext') 
     else: 
      self.visitLater.append(linkS) 
      debug('put', 'something is put inside visitLater') 
     self._count += 1 



if __name__ == "__main__": 
    # def __init__(self, level, constraint, urlQ): 

    from multiprocessing import Queue 
    q = Queue(3) 
    us = urlServerM(3, 6000, q) 

    us.start() 
    time.sleep(2) 

    # only one thread will do this 
    us.store('http://www.google.com') 
    debug('put', 'put completed') 

    time.sleep(3) 

    print q.get_nowait() 
    time.sleep(3) 

Und dies ist der Ausgang

OUTPUT 
DEBUG hasnext: [][] 
[process] Nothing in visitNext or visitLater, sleeping 
DEBUG put: something is put inside visitNext 
DEBUG put: put completed 
DEBUG hasnext: [][] 
[process] Nothing in visitNext or visitLater, sleeping 
DEBUG hasnext: [][] 
[process] Nothing in visitNext or visitLater, sleeping 
Traceback (most recent call last): 
    File "urlServerM.py", line 112, in <module> 
    print q.get_nowait() 
    File "/usr/lib/python2.6/multiprocessing/queues.py", line 122, in get_nowait 
    return self.get(False) 
    File "/usr/lib/python2.6/multiprocessing/queues.py", line 104, in get 
    raise Empty 
Queue.Empty 
DEBUG hasnext: [][] 

Anscheinend finde ich wirklich seltsam. Nun, im Grunde, was dieser Code ist, dass wenn er in main() getestet wird, startet er den Prozess, und dann speichert er http://www.google.com in der Klasse visitNext, und dann möchte ich nur sehen, dass in die Warteschlange geschoben wird.

Aber nach der Ausgabe finde ich es extrem seltsam, dass, obwohl meine Klasse eine URL in der Klasse speichern abgeschlossen hat, hasNext nichts zeigt. Jeder Körper weiß warum? Ist dies der beste Weg, run() in eine kontinuierliche While-Schleife zu schreiben? Ist das wirklich notwendig? Ich versuche im Grunde, mit dem Modul Multiprocessing zu experimentieren, und ich habe einen Pool von Arbeitern (von multiprocessing.Pool), die diese URLs von dieser Klasse erhalten müssen (Single Point of Entry). Verwenden Sie eine Warteschlange am besten? Muss ich dies zu einem "Live" -Prozess machen, da jeder Arbeiter von der Queue aus fragt, und es sei denn, ich habe eine Möglichkeit, meinem urlServer zu signalisieren, etwas in die Queue zu legen, kann ich mir keinen weniger mühsamen Weg vorstellen.

+0

Schau, niemand wird wahrscheinlich so viel Code und Erklärung lesen, um sogar eine Chance zu haben, zu antworten. Kannst du das Problem nicht auf 10 Zeilen Code und 5 Zeilen Erklärung reduzieren ?! – ThomasH

Antwort

0

Sie verwenden Multiprocessing, so dass der Speicher nicht zwischen der Hauptausführung und Ihrem URL-Server geteilt wird.

I.e. Ich denke, das ist effektiv ein Noop: {us.store('http://www.google.com')}, denn wenn es im Haupt-Thread ausgeführt wird, ändert es nur die Haupt-Thread-Darstellung von {us}. Sie können bestätigen, dass sich die URL im Hauptthreadspeicher befindet, indem Sie vor {q.get_nowait()} aufrufen.

Um es funktionieren zu lassen, müssen Sie alle Listen, die Sie freigeben möchten, in Queue-s oder Pipe-s umwandeln. Alternativ können Sie Ihr Modell einfach in {threading} ändern und es sollte ohne Änderungen funktionieren (mehr oder weniger - Sie müssen sich um die Besuchslisten kümmern und Sie bekommen wieder GIL-Probleme).

(Und yeah - Bitte bearbeiten Sie Ihre Fragen nächstes Mal besser. Ich wusste, was Ihr Problem sein könnte, sobald ich "Multiprocessing" sah, aber sonst würde ich überhaupt nicht den Code ansehen ...)