Ich habe ein Skript erstellt, das standardmäßig einen Multiprozess-Prozess erstellt; dann funktioniert es gut. Wenn mehrere Prozesse gestartet werden, beginnt es zu hängen und nicht immer am selben Ort. Das Programm hat ungefähr 700 Zeilen Code, also werde ich versuchen, zusammenzufassen, was vor sich geht. Ich möchte das Beste aus meinen Multi-Cores machen, indem ich die langsamste Aufgabe, die DNA-Sequenzen ausrichtet, parallelisiere. Dazu benutze ich das Subprozess-Modul, um ein Kommandozeilen-Programm aufzurufen: 'hmmsearch', das ich Sequenzen über/dev/stdin einspeisen kann, und dann lese ich die ausgerichteten Sequenzen durch/dev/stdout aus. Ich stelle mir vor, dass der Hang wegen dieser mehreren Subprozess-Instanzen auftritt, die von stdout/stdin lesen/schreiben, und ich kenne wirklich nicht den besten Weg, um darüber zu gehen ... Ich habe in os.fdopen gesucht (...) & os.tmpfile(), um temporäre Dateihandles oder Pipes zu erstellen, wo ich die Daten durchspülen kann. Allerdings habe ich noch nie zuvor & verwendet Ich kann mir nicht vorstellen, wie man das mit dem Unterprozessmodul macht. Idealerweise möchte ich die Festplatte komplett umgehen, da Pipes mit der Hochdurchsatz-Datenverarbeitung viel besser sind! Jede Hilfe mit diesem wäre super wunderbar !!Python Multiprocessing jeweils mit eigenen Subprozess (Kubuntu, Mac)
import multiprocessing, subprocess
from Bio import SeqIO
class align_seq(multiprocessing.Process):
def __init__(self, inPipe, outPipe, semaphore, options):
multiprocessing.Process.__init__(self)
self.in_pipe = inPipe ## Sequences in
self.out_pipe = outPipe ## Alignment out
self.options = options.copy() ## Modifiable sub-environment
self.sem = semaphore
def run(self):
inp = self.in_pipe.recv()
while inp != 'STOP':
seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time.
# HMM is a file location.
align_process = subprocess.Popen(['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
self.sem.acquire()
align_process.stdin.write(seq_record.format('fasta'))
align_process.stdin.close()
for seq in SeqIO.parse(align_process.stdout, 'stockholm'): # get the alignment output
self.out_pipe.send_bytes(seq.seq.tostring()) # send it to consumer
align_process.wait() # Don't know if there's any need for this??
self.sem.release()
align_process.stdout.close()
inp = self.in_pipe.recv()
self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles.
self.out_pipe.close()
eine während des Debuggens dafür ausgegeben haben, habe ich ein Problem gefunden, die immer da war und ist noch nicht ganz gelöst, aber haben einige andere Ineffizienzen im Prozess (Debugging) fixiert. Es gibt zwei anfängliche Feeder-Funktionen, diese align_seq-Klasse und einen Dateiparser parseHMM(), der eine positionsspezifische Bewertungsmatrix (PSM) in ein Wörterbuch lädt. Der Hauptmutterprozess vergleicht dann die Ausrichtung mit dem PSM, wobei ein Wörterbuch (von Wörterbüchern) als Zeiger auf die relevante Punktzahl für jeden Rest verwendet wird. Um die gewünschten Scores zu berechnen, habe ich zwei separate Multiprocessing.Process-Klassen, eine Klasse logScore(), die das Log Odds Ratio berechnet (mit math.exp()); Ich parallelisiere diesen; und es verbindet die berechneten Ergebnisse mit dem letzten Prozess, sumScore(), der nur diese Bewertungen (mit math.fsum) summiert, wobei die Summe und alle positionsspezifischen Bewertungen als Wörterbuch an den übergeordneten Prozess zurückgegeben werden. dh Queue.put ([Summe, {Rückstand Position: Position spezifische Punktzahl, ...}]) Ich finde dies außergewöhnlich verwirrend, um meinen Kopf herum (zu viele Warteschlangen!), So hoffe ich, dass die Leser es schaffen folgen ... Nachdem alle oben genannten Berechnungen durchgeführt wurden, gebe ich dann die Option, die kumulativen Bewertungen als tabulatorgetrennte Ausgabe zu speichern. Dies ist, wo es jetzt (seit letzter Nacht) manchmal bricht, da ich sicherstelle, dass es eine Punktzahl für jede Position ausdruckt, wo es eine Punktzahl geben sollte. Ich denke, dass aufgrund der Latenz (Computer Timings nicht synchron sein), manchmal, was zuerst in die Warteschlange für logScore gesetzt wird sumScore ersten nicht erreicht. Damit sumScore weiß, wann die Tally zurückgegeben und neu gestartet werden soll, lege ich 'endSEQ' in die Warteschlange für den letzten logScore-Prozess, der eine Berechnung durchgeführt hat. Ich dachte, dass es dann auch summieren sollte, aber das ist nicht immer so. nur manchmal bricht es. So bekomme ich jetzt keinen Deadlock mehr, sondern einen KeyError beim Drucken oder Speichern der Ergebnisse. Ich glaube, der Grund dafür, KeyError manchmal zu bekommen, liegt darin, dass ich für jeden logScore-Prozess eine Queue erstelle, die aber alle dieselbe Queue verwenden soll. Jetzt, wo ich so etwas wie: -
class logScore(multiprocessing.Process):
def __init__(self, inQ, outQ):
self.inQ = inQ
...
def scoreSequence(processes, HMMPSM, sequenceInPipe):
process_index = -1
sequence = sequenceInPipe.recv_bytes()
for residue in sequence:
.... ## Get the residue score.
process_index += 1
processes[process_index].inQ.put(residue_score)
## End of sequence
processes[process_index].inQ.put('endSEQ')
logScore_to_sumScoreQ = multiprocessing.Queue()
logScoreProcesses = [ logScore(multiprocessing.Queue() , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ]
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut)
während ich nur eine Queue erstellen sollte zwischen allen logScore Instanzen zu teilen. d. h.
logScore_to_sumScoreQ = multiprocessing.Queue()
scoreSeq_to_logScore = multiprocessing.Queue()
logScoreProcesses = [ logScore(scoreSeq_to_logScore , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ]
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut)
Danke für die Antwort! Also, obwohl ich mehrfach auf/dev/stdin &/dev/stdout referenziere, bekommt jeder Prozess seine eigene Instanz davon, und es gibt keine Chance, dass sie sich gegenseitig verwirren ?? Süss! Betrachtet man weiter die Fehler, nachdem das Programm (^ C) gelöscht wurde, so scheint es, als ob die Dead-Lock-Operation mit einem Multiprocessing.Queue auftritt. Ich habe eine Reihe von ihnen, die ich als eine Kaskade oder einen Knock-On-Effekt durch Senden von "endSEQ" und schließlich "STOP" vorstellen. Es dauerte lange genug, um letzten Monat mit einem Prozess zu debuggen. Versuchen, meinen Kopf zurück um es jetzt zu bekommen, :( Und ich glaube nicht, dass der Semaphor notwendig ist. Gegangen :) –
np! Die Dateien/dev/stdin &/dev/stdout werden den speziellen stdin & stdout-Dateideskriptoren zugeordnet, sodass sie kein zentrales Gerät darstellen. --- Ich sehe die Verwendung der Warteschlange im align_seq Kindprozess nicht, wird sie nur im Hauptprozess verwendet? Wenn ja, dann kannst du einfach queue.Queue verwenden. Sie sagen auch, Sie vermuten einen Deadlock - Sie konsumieren nicht bidirektional, sind Sie (main & child/ren)? –
Nochmals vielen Dank für die Klärung von stdin/stdout. Jetzt denke ich darüber nach, irgendwie offensichtlich in der Art, wie ich sie benutze, aber ich habe es nie zuvor explizit erwähnt. Du hast absolut recht; Die Warteschlangen befinden sich in einem separaten Teil des Programms, in dem meine Deadlocks zu erscheinen scheinen. Ich werde den Hauptpost aktualisieren, weil es viel mehr Multiprocessing zu dem Programm gibt, das ich ursprünglich nicht angegeben habe ... Ich werde jedoch zuerst auf die unten stehenden Posts antworten. Ich habe es jetzt so, dass es manchmal endet, und manchmal bricht es kurz vor dem Ende, was ich aufgrund der Latenz (??) vermute. time.sleep ist meine temporäre, dreckige Lösung –