2017-07-08 1 views
0

den folgenden Python3 Code gegeben, mit Gewinden:Probleme mit Python Threading und Code-im Allgemeinen

class main: 
    def __init__(self): 
     self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+") 
     self.hashlist = queue.Queue() 
     self.filelist = queue.Queue() 
     self.top = '/home/' 
     for y in range(12): 
      self.u = threading.Thread(target=self.md5hash) 
      self.u.daemon = True 
      self.u.start() 
     for x in range(4): 
      self.t = threading.Thread(target=self.threader) 
      self.t.daemon = True 
      self.t.start() 
     main.body(self) 

    def body(self): 
     start = time.time() 
     self.text.write("Time now is " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n") 
     for root, dirs, files in os.walk(self.top): 
      for f in files: 
       path = os.path.join(root,f) 
       self.filelist.put(path) 
     self.t.join() 
     self.u.join() 
     self.text.write("Total time taken  : " + str(time.time() - start) + "\n") 
     print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt") 

    def md5hash(self): 
     while True: 
      entry = self.filelist.get() 
      //hashing// 
      lists = finalhash + ',' + entry 
      self.hashlist.put(lists) 
      self.filelist.task_done() 

    def compare(self, hashed, path): 
     f = open(os.getcwd() + "/database.csv", 'r') 
     for row in f: 
      if row.split(':')[1] == hashed: 
       print("Suspicious File!") 
       print("Suspecfs: " + row.split(':')[2] + "File name : " + path) 

    def threader(self): 
     while True: 
      item = self.hashlist.get() 
      hashes = item.split(',')[0] 
      path = item.split(',')[1] 
      self.compare(hashes, path) 
      self.hashlist.task_done() 

main() 

Problem 1: In def body(self), gibt es die Linie self.text.write("Time now is ..."). Diese Zeile wird nicht in der erstellten Protokolldatei angezeigt.

Problem 2: In def compare(self, hashed, path) existiert eine Zeile, die "Verdächtige Datei" ausgibt. und die file path jedes Mal, wenn es eine Hash-Kollision gibt. Diese Zeile wird immer in der Reihenfolge ausgegeben, in der die 4 Threads t darüber streiten, wer zuerst drucken wird. Dazu denke ich, ich muss wissen, wie Python-Threads print Befehle nacheinander ausführen lassen, anstatt wie sie wollen - wie?

Problem 3: In def body(self) existieren Linien self.u.join() und self.t.join(). Der Befehl join() ist nach meinem besten Wissen ein Befehl, der darauf wartet, dass der Thread beendet wird, bevor er fortfährt. Die Threads enden beide nicht.

Weitere Informationen 1: Ich schreibe Multi-Threading, da ich den Code später in Multi-Processing konvertieren muss.

Zusätzliche Informationen 2: Bitte lassen Sie mich wissen, wenn ich irgendwelche Befehle/Syntax in meinem Code missverstanden habe, während Sie durchschimmern.

+2

Ihr Code scheint falsch formatiert zu sein - 'body',' md5hash', 'compare' und' threader' sind außerhalb von 'main' definiert, dh. als Funktion und nicht als Methode. Ist es ein Kopierfehler oder ist es so, wie Ihr Code wirklich aussieht? –

+0

Entschuldigung, es ist ein Formatierungsfehler –

+1

Zu 1: Sie schließen Ihr Protokoll nicht und spülen es nicht. Dies ist wahrscheinlich der Grund, warum Zeilen, die darauf geschrieben werden, nie in der Datei erscheinen. Versuchen Sie, 'self.text.flush()' nach dem Schreiben hinzuzufügen. –

Antwort

1

Problem 1: Sie schreiben in Ihrem Dateipuffer - nur auf die eigentliche Datei gespült wird, wenn der Puffer voll ist, wird die Datei-Handle geschlossen oder Sie rufen explizit flush() darauf (dh self.text.flush())

Problem 2: Entweder möchten Sie Ihren Code parallel ausführen (und es ist nicht, aber wir werden dazu kommen), aber Sie verlieren die Reihenfolge der Ausführung, oder Sie führen die Reihenfolge seriell aus. Wenn Sie mehrere Threads ausführen möchten, ist es wenig sinnvoll, sie nacheinander ausführen zu lassen, da Sie dann Ihren Code nicht parallel ausführen und alles im Hauptthread ausführen können.

Wenn Sie nur die Ausgabe zu STDOUT steuern möchten, vorausgesetzt, es stört nicht die Ausführung des Threads, können Sie erfassen, was Sie drucken möchten und am Ende unter einem Mutex (also nur einen Thread) ausgedruckt werden schreibt zu der Zeit) oder leitet es sogar zurück zum Hauptthread und lässt es den Zugriff auf STDOUT verwalten. Ein einfaches Mutex Beispiel wäre:

PRINT_MUTEX = threading.Lock() 

def compare(self, hashed, path): # never mind the inefficiency, we'll get to that later 
    out = [] # hold our output buffer 
    with open(os.getcwd() + "/database.csv", 'r') as f: 
     for row in f: 
      row = row.split(':') 
      if row[1] == hashed: 
       out.append("Suspicious File!") 
       out.append("Suspecfs: " + row[2] + "File name : " + path) 
    if out: 
     with self.PRINT_MUTEX: # use a lock to print out the results 
      print("\n".join(out)) 

Dies deshalb, um nicht den Faden Ausführung hält (noch sollten Sie versuchen, mindestens Sie den Zweck ‚parallel‘ Ausführung besiegen wollen), aber zumindest die Fäden ausgeben ihre compare Ergebnisse nacheinander, anstatt ihre Ergebnisse zu verstreuen. Wenn Sie den Hauptthread/das Prozesssteuerelement STDOUT haben möchten, insbesondere, weil Sie dies in einen Multi-Processing-Code umwandeln möchten, überprüfen Sie this answer.

Problem 3: Ihre Threads verlassen nie, weil sie in einer while True Schleife stecken - bis Sie davon losbrechen, laufen die Threads weiter. Ich weiß nicht, was der Grund dafür ist, wie Sie den Code strukturiert haben, aber wenn Sie versuchen, Dateiauflistung (Hauptthread), Lesen, Hashing (md5hash threads) und Vergleiche (threader threads) parallel zu machen, möchten Sie das vermutlich tun Beenden Sie den Hashvorgang, wenn keine weiteren Dateien vorhanden sind, und beenden Sie den Vergleich, wenn keine weiteren Hashes vorhanden sind.Um dies zu tun, können Sie nicht wirklich Queue.task_done() verwenden, da es da ist, um anderen "Listenern" zu signalisieren (wenn sie durch einen Queue.join() Anruf blockiert sind, den Sie nicht haben), dass Sie mit Änderungen der Warteschlange fertig sind.

Sie sollen ein threading.Event Signal dafür verwenden, aber wenn Sie es queue.Queue nur halten möchten, können Sie eine spezielle Eigenschaft erstellen das Ende der Warteschlange zu bezeichnen und sie dann in einer Warteschlange platzieren, wenn es nichts mehr zu verarbeiten, dann Lass deine Threads ihre Schleifen verlassen, wenn sie auf diese spezielle Eigenschaft treffen. Lassen Sie uns zuerst einen großen Überblick in Ihrem Code beheben - Sie speichern überhaupt keinen Verweis auf Ihre Threads, Sie überschreiben ihn mit dem letzten Thread, sodass Sie den Ausführungsfluss nicht wirklich steuern können - statt den letzten Thread-Verweis in einem zu speichern Variable, speichern Sie alle Referenzen in einer Liste. Auch, wenn Sie für alles warten wirst nicht schließen Daemon-Threads verwenden:

def __init__(self): 
    self.text = open(os.getcwd()+"/FileScanLogs.txt", "a+") # consider os.path.join() 
    self.hashlist = queue.Queue() 
    self.filelist = queue.Queue() 
    self.hashers = [] # hold the md5hash thread references 
    self.comparators = [] # hold the threader thread references 
    self.top = '/home/' 
    for _ in range(12): # you might want to consider a ThreadPool instead 
     t = threading.Thread(target=self.md5hash) 
     t.start() 
     self.hashers.append(t) 
    for _ in range(4): 
     t = threading.Thread(target=self.threader) 
     t.start() 
     self.comparators.append(t) 
    main.body(self) 

Jetzt können wir die main.body() Methode so ändern, dass es die oben genannten speziellen Werte an das Ende unserer Warteschlangen fügt hinzu, so dass Arbeitsthreads wissen, wann man aufhören:

QUEUE_CLOSE = object() # a 'special' object to denote end-of-data in our queues 

def body(self): 
    start = time.time() 
    self.text.write("Time: " + time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "\n") 
    for root, dirs, files in os.walk(self.top): 
     for f in files: 
      path = os.path.join(root, f) 
      self.filelist.put(path) 
    self.filelist.put(self.QUEUE_CLOSE) # no more files, signal the end of the filelist 
    for t in self.hashers: # let's first wait for our hashing threads to exit 
     t.join() 
    # since we're not going to be receiving any new hashes, we can... 
    self.hashlist.put(self.QUEUE_CLOSE) # ... signal the end of the hashlist as well 
    for t in self.comparators: # let's wait for our comparator threads to exit 
     t.join() 
    self.text.write("Total: " + str(time.time() - start) + "\n") 
    self.text.close() # close the log file (this will also flush the previous content) 
    print("Log file is created as " + os.getcwd() + "/FileScanLogs.txt") 

Und deshalb brauchen wir die worker-Threads zu ändern, um zu beenden, wenn sie das Ende der Warteschlange auftreten:

def md5hash(self): 
    while self.filelist: 
     entry = self.filelist.get() 
     if entry is self.QUEUE_CLOSE: # end of queue encountered 
      self.filelist.put(self.QUEUE_CLOSE) # put it back for the other threads 
      break # break away the processing 
     finalhash = whatever_is_your_hash_code(entry) 
     lists = finalhash + ',' + entry 
     self.hashlist.put(lists) 

def threader(self): 
    while True: 
     item = self.hashlist.get() 
     if item is self.QUEUE_CLOSE: # end of queue encountered 
      self.hashlist.put(self.QUEUE_CLOSE) # put it back for the other threads 
      break # break away the queue 
     hashes = item.split(',')[0] 
     path = item.split(',')[1] 
     self.compare(hashes, path) 

Jetzt, wenn Sie es ausführen, vorausgesetzt, Ihr nicht aufgelisteter Hashing-Teil funktioniert ordnungsgemäß, sollte schließlich alles beendet werden.

Neben dem komplizierten Aufbau, Sie eine Sache, sollte auf jeden Fall tun, ist die main.compare() Methode optimieren weg - da die CSV-Datei während der Ausführung ändert sich nicht (und wenn ja, sollten Sie es sich im Speicher befindlichen Griff) Laden der gesamten CSV und Durchschleifen für jeden Hash der Dateien, die Sie vergleichen möchten, ist lächerlich. Laden Sie die gesamte CSV-Datei als hash<=>whateverdict, und führen Sie dann stattdessen Vergleiche an Ort und Stelle durch (d. H. if hashed in your_map).

Und zuletzt, wie ich oben erwähnt habe, Zeit, hmm, Regen auf Ihrer Parade - all das war umsonst! Aufgrund der gefürchteten GIL wird keiner Ihrer Threads hier parallel ausgeführt (eigentlich funktioniert nur das Laden von Dateien in gewissem Umfang, aber jeder Vorteil wird wahrscheinlich durch die Zeit vereitelt, die zum Hashing der Daten benötigt wird). Sie laufen zwar als separate, ehrliche System-Threads, aber die GIL stellt sicher, dass nur einer dieser Threads gleichzeitig ausgeführt wird. Daher ist dieser Code sehr wahrscheinlich langsamer, als wenn Sie alles in einem Thread ausführen würden. Dies wird Ihnen im Prozess des Multiprocessing nicht viel helfen, entweder weil Sie keinen lokalen Instanzstatus teilen können (naja, Sie können, überprüfen Sie this answer, aber es ist nur ein wichtiger PITA und die meiste Zeit nicht wert, den Ärger zu durchlaufen).