2013-07-18 7 views
5

Ich habe viel gesucht und nicht gefunden, wie ich die Ausgabe von einem laufenden Python-Subprozess in Tornado erhalten. Was ich will ist etwas wie die Travis CI. In einer Admin-Seite starte ich den Job, der Server erhält die Anfrage und startet einen Subprozess. Dieser Subprozess führt ein Data Mining durch und füttert einen String-Puffer mit einigen Protokollen. Ich werde diese Protokolle mit einem Ajax mit settimeout oder einem Websocket bekommen und diese Protokolle auf der Seite ausgeben. Selbst wenn der Benutzer die Seite schließt und später zu ihr zurückkehrt, wird es die Protokolle geben, die normalerweise aktualisiert werden. Nun, Travis ist wirklich sehr ähnlich.Ausgabe von einem Python-Subprozess-Job in Tornado

Antwort

3

Dieser Blog-Eintrag zeigt einen Weg, dies zu tun: Fehler- und Standardausgaben http://stefaanlippens.net/python-asynchronous-subprocess-pipe-reading

Wesentlichen die Post zeigt, wie Deadlock zu verhindern, wenn die Ausgabe eines Prozesses Lesen von asynchronen Lesen. Sie können den Befehl producer von __main__ ersetzen, um den Befehl auszuführen, der Ihnen gefällt, und die print-Anweisungen mit Code, um die Ausgabe in Tornado zu verarbeiten.

Update: Ich habe folgendes enthalten, falls das Blog nach unten wird genommen:

... was ist, wenn Sie durch die Linie der Standardausgabe und Fehlerzeile lesen möchten, zum Beispiel weil Sie überwachen möchten ein längerer Prozess? Auf das Web können Sie viele Lösungen finden, mit unterschiedlichen Graden von Komplexität, Abstraktion und Abhängigkeiten . Eine Lösung (mit begrenztem Code und ohne Abhängigkeiten außerhalb der Standardbibliothek) besteht darin, die Pipes in separaten Threads zu lesen, sodass eine Pipe keine anderen blockieren kann.

Der folgende Code zeigt eine Beispielimplementierung. Das Skript wird so eingerichtet, dass es sowohl für den übergeordneten als auch für den untergeordneten Prozess verwendet wird.

Für den Child-Prozess: Wenn er mit 'produzieren' Argument aufgerufen wird, führt er die produce() -Funktion, die nur einige Zeilen zufällig auf Standardausgabe und Standardfehler rendert. Zwischen den Zeilen gibt es eine Berührung der Verzögerung simulieren einen länger laufenden Prozess. Der übergeordnete Prozess (Skript ohne Argumente aufgerufen), implementiert in der Funktion consume(), ruft das gleiche Skript im "untergeordneten Modus" als Unterprozess auf und überwacht seine Ausgabe Zeile für Zeile, ohne in voraus zu wissen, aus welcher Zeile jede Zeile wird Kommen Sie.

Die AsynchronousFileReader-Klasse ist für die Threads, die die Standardausgabe- und Fehlerrohren asynchron lesen und jede Zeile in eine -Warteschlange stellen. Der Haupt-Thread kann dann den Unterprozess überwachen, indem er die Zeilen überwacht, wie sie in den Warteschlangen eingehen.

import sys 
import subprocess 
import random 
import time 
import threading 
import Queue 

class AsynchronousFileReader(threading.Thread): 
    ''' 
    Helper class to implement asynchronous reading of a file 
    in a separate thread. Pushes read lines on a queue to 
    be consumed in another thread. 
    ''' 

    def __init__(self, fd, queue): 
     assert isinstance(queue, Queue.Queue) 
     assert callable(fd.readline) 
     threading.Thread.__init__(self) 
     self._fd = fd 
     self._queue = queue 

    def run(self): 
     '''The body of the tread: read lines and put them on the queue.''' 
     for line in iter(self._fd.readline, ''): 
      self._queue.put(line) 

    def eof(self): 
     '''Check whether there is no more content to expect.''' 
     return not self.is_alive() and self._queue.empty() 

def consume(command): 
    ''' 
    Example of how to consume standard output and standard error of 
    a subprocess asynchronously without risk on deadlocking. 
    ''' 

    # Launch the command as subprocess. 
    process = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE) 

    # Launch the asynchronous readers of the process' stdout and stderr. 
    stdout_queue = Queue.Queue() 
    stdout_reader = AsynchronousFileReader(process.stdout, stdout_queue) 
    stdout_reader.start() 
    stderr_queue = Queue.Queue() 
    stderr_reader = AsynchronousFileReader(process.stderr, stderr_queue) 
    stderr_reader.start() 

    # Check the queues if we received some output (until there is nothing more to get). 
    while not stdout_reader.eof() or not stderr_reader.eof(): 
     # Show what we received from standard output. 
     while not stdout_queue.empty(): 
      line = stdout_queue.get() 
      print 'Received line on standard output: ' + repr(line) 

     # Show what we received from standard error. 
     while not stderr_queue.empty(): 
      line = stderr_queue.get() 
      print 'Received line on standard error: ' + repr(line) 

     # Sleep a bit before asking the readers again. 
     time.sleep(.1) 

    # Let's be tidy and join the threads we've started. 
    stdout_reader.join() 
    stderr_reader.join() 

    # Close subprocess' file descriptors. 
    process.stdout.close() 
    process.stderr.close() 

def produce(items=10): 
    ''' 
    Dummy function to randomly render a couple of lines 
    on standard output and standard error. 
    ''' 
    for i in range(items): 
     output = random.choice([sys.stdout, sys.stderr]) 
     output.write('Line %d on %s\n' % (i, output)) 
     output.flush() 
     time.sleep(random.uniform(.1, 1)) 

if __name__ == '__main__': 
    # The main flow: 
    # if there is an command line argument 'produce', act as a producer 
    # otherwise be a consumer (which launches a producer as subprocess). 
    if len(sys.argv) == 2 and sys.argv[1] == 'produce': 
     produce(10) 
    else: 
     consume(['python', sys.argv[0], 'produce']) 
+1

Die Seite 404 :( –

+1

zurückkehrt Ich habe den Blog bei zitiert dies wieder passiert. – funkotron