Ich schreibe einen Watchdog für Prozesse in einer Testsuite. Ich muss feststellen, ob ein Test hängt.Ermittlung der Zeit seit der letzten Ausgabe des Prozesses - mit subprocess.Popen
Ich könnte einfach den Prozess mit subprocess.Popen(...)
starten, und Popen.wait(timeout=to)
oder Popen.poll()
verwenden und meinen eigenen Timer behalten. Die Tests unterscheiden sich jedoch stark in der Ausführungszeit, was es unmöglich macht, einen guten "Timeout" -Wert zu haben, der für alle Tests sinnvoll ist.
Ich habe festgestellt, dass ein guter Weg, um zu bestimmen, ob ein Test hängen geblieben ist, eine "Zeitüberschreitung" für das letzte Mal, dass der Prozess nichts ausgegeben hat. Zu diesem Zweck hielt ich
process = subprocess.Popen(args='<program>', stdout=subprocess.PIPE, stderr=subprocess.STDOUT, ...)
und Popen.communicate()
verwendet wird, zu bestimmen, wann stdout
und/oder stderr
nicht None
sind. Das Problem ist, dass Popen.communicate()
, ohne ein 'Timeout' wird nur warten, bis der Prozess beendet wird, und mit einem 'Timeout' wird eine TimeoutExpired
Ausnahme auslösen, aus dem ich nicht feststellen kann, ob etwas gelesen wurde. TimeoutExpired.output
ist leer, BTW.
Ich konnte nichts in der Dokumentation finden, die es ermöglicht, die "Lesevorgänge" manuell durchzuführen. Außerdem gibt es normalerweise eine Menge von Ausgaben aus dem Prozess, so dass es von Vorteil wäre, mit stdout=<open_file_descriptor>
zu beginnen, da ich keine Bedenken für überlaufende Pipe-Buffer haben würde.
Update/Lösung:
Popen.stdout
und Popen.stderr
gibt ein "lesbares Stream-Objekt", die man manuell verwenden können abfragen/auswählen und lesen. Ich landete mit select 'Polling Objects', die den poll()
Systemaufruf verwenden, wie unten:
import os
import select
import subprocess
p = subprocess.Popen(args="<program>", shell=True, universal_newlines=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
poll_obj = select.poll()
poll_obj.register(p.stdout, select.POLLIN)
poll_obj.register(p.stderr, select.POLLIN)
while p.poll() is None:
events = True
while events:
events = poll_obj.poll(10)
for fd, event in events:
if event & select.POLLIN:
print("STDOUT: " if fd == p.stdout.fileno() else "STDERR: ")
print(os.read(fd, 1024).decode())
# else some other error (see 'Polling Objects')
Windows oder Linux? – rrauenza
@rrauenza Linux – Ramon