2011-01-06 5 views
0

Ich habe ein Skript erstellt, das standardmäßig einen Multiprozess-Prozess erstellt; dann funktioniert es gut. Wenn mehrere Prozesse gestartet werden, beginnt es zu hängen und nicht immer am selben Ort. Das Programm hat ungefähr 700 Zeilen Code, also werde ich versuchen, zusammenzufassen, was vor sich geht. Ich möchte das Beste aus meinen Multi-Cores machen, indem ich die langsamste Aufgabe, die DNA-Sequenzen ausrichtet, parallelisiere. Dazu benutze ich das Subprozess-Modul, um ein Kommandozeilen-Programm aufzurufen: 'hmmsearch', das ich Sequenzen über/dev/stdin einspeisen kann, und dann lese ich die ausgerichteten Sequenzen durch/dev/stdout aus. Ich stelle mir vor, dass der Hang wegen dieser mehreren Subprozess-Instanzen auftritt, die von stdout/stdin lesen/schreiben, und ich kenne wirklich nicht den besten Weg, um darüber zu gehen ... Ich habe in os.fdopen gesucht (...) & os.tmpfile(), um temporäre Dateihandles oder Pipes zu erstellen, wo ich die Daten durchspülen kann. Allerdings habe ich noch nie zuvor & verwendet Ich kann mir nicht vorstellen, wie man das mit dem Unterprozessmodul macht. Idealerweise möchte ich die Festplatte komplett umgehen, da Pipes mit der Hochdurchsatz-Datenverarbeitung viel besser sind! Jede Hilfe mit diesem wäre super wunderbar !!Python Multiprocessing jeweils mit eigenen Subprozess (Kubuntu, Mac)

import multiprocessing, subprocess 
from Bio import SeqIO 

class align_seq(multiprocessing.Process): 
    def __init__(self, inPipe, outPipe, semaphore, options): 
     multiprocessing.Process.__init__(self) 
     self.in_pipe = inPipe   ## Sequences in 
     self.out_pipe = outPipe  ## Alignment out 
     self.options = options.copy() ## Modifiable sub-environment 
     self.sem = semaphore 

    def run(self): 
     inp = self.in_pipe.recv() 
     while inp != 'STOP': 
      seq_record , HMM = inp # seq_record is only ever one Bio.Seq.SeqRecord object at a time. 
            # HMM is a file location. 
      align_process = subprocess.Popen(['hmmsearch', '-A', '/dev/stdout', '-o',os.devnull, HMM, '/dev/stdin'], shell=False, stdin=subprocess.PIPE, stdout=subprocess.PIPE) 
      self.sem.acquire() 
      align_process.stdin.write(seq_record.format('fasta')) 
      align_process.stdin.close() 
      for seq in SeqIO.parse(align_process.stdout, 'stockholm'): # get the alignment output 
       self.out_pipe.send_bytes(seq.seq.tostring()) # send it to consumer 
      align_process.wait() # Don't know if there's any need for this?? 
      self.sem.release() 
      align_process.stdout.close() 
      inp = self.in_pipe.recv() 
     self.in_pipe.close() #Close handles so don't overshoot max. limit on number of file-handles. 
     self.out_pipe.close() 

eine während des Debuggens dafür ausgegeben haben, habe ich ein Problem gefunden, die immer da war und ist noch nicht ganz gelöst, aber haben einige andere Ineffizienzen im Prozess (Debugging) fixiert. Es gibt zwei anfängliche Feeder-Funktionen, diese align_seq-Klasse und einen Dateiparser parseHMM(), der eine positionsspezifische Bewertungsmatrix (PSM) in ein Wörterbuch lädt. Der Hauptmutterprozess vergleicht dann die Ausrichtung mit dem PSM, wobei ein Wörterbuch (von Wörterbüchern) als Zeiger auf die relevante Punktzahl für jeden Rest verwendet wird. Um die gewünschten Scores zu berechnen, habe ich zwei separate Multiprocessing.Process-Klassen, eine Klasse logScore(), die das Log Odds Ratio berechnet (mit math.exp()); Ich parallelisiere diesen; und es verbindet die berechneten Ergebnisse mit dem letzten Prozess, sumScore(), der nur diese Bewertungen (mit math.fsum) summiert, wobei die Summe und alle positionsspezifischen Bewertungen als Wörterbuch an den übergeordneten Prozess zurückgegeben werden. dh Queue.put ([Summe, {Rückstand Position: Position spezifische Punktzahl, ...}]) Ich finde dies außergewöhnlich verwirrend, um meinen Kopf herum (zu viele Warteschlangen!), So hoffe ich, dass die Leser es schaffen folgen ... Nachdem alle oben genannten Berechnungen durchgeführt wurden, gebe ich dann die Option, die kumulativen Bewertungen als tabulatorgetrennte Ausgabe zu speichern. Dies ist, wo es jetzt (seit letzter Nacht) manchmal bricht, da ich sicherstelle, dass es eine Punktzahl für jede Position ausdruckt, wo es eine Punktzahl geben sollte. Ich denke, dass aufgrund der Latenz (Computer Timings nicht synchron sein), manchmal, was zuerst in die Warteschlange für logScore gesetzt wird sumScore ersten nicht erreicht. Damit sumScore weiß, wann die Tally zurückgegeben und neu gestartet werden soll, lege ich 'endSEQ' in die Warteschlange für den letzten logScore-Prozess, der eine Berechnung durchgeführt hat. Ich dachte, dass es dann auch summieren sollte, aber das ist nicht immer so. nur manchmal bricht es. So bekomme ich jetzt keinen Deadlock mehr, sondern einen KeyError beim Drucken oder Speichern der Ergebnisse. Ich glaube, der Grund dafür, KeyError manchmal zu bekommen, liegt darin, dass ich für jeden logScore-Prozess eine Queue erstelle, die aber alle dieselbe Queue verwenden soll. Jetzt, wo ich so etwas wie: -

class logScore(multiprocessing.Process): 
    def __init__(self, inQ, outQ): 
     self.inQ = inQ 
     ... 

def scoreSequence(processes, HMMPSM, sequenceInPipe): 
    process_index = -1 
    sequence = sequenceInPipe.recv_bytes() 
    for residue in sequence: 
     .... ## Get the residue score. 
     process_index += 1 
     processes[process_index].inQ.put(residue_score) 
    ## End of sequence 
    processes[process_index].inQ.put('endSEQ') 


logScore_to_sumScoreQ = multiprocessing.Queue() 
logScoreProcesses = [ logScore(multiprocessing.Queue() , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ] 
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut) 

während ich nur eine Queue erstellen sollte zwischen allen logScore Instanzen zu teilen. d. h.

logScore_to_sumScoreQ = multiprocessing.Queue() 
scoreSeq_to_logScore = multiprocessing.Queue() 
logScoreProcesses = [ logScore(scoreSeq_to_logScore , logScore_to_sumScoreQ) for i in xrange(options['-ncpus']) ] 
sumScoreProcess = sumScore(logScore_to_sumScoreQ, scoresOut) 

Antwort

2

Das ist nicht ganz so, wie Pipelining funktioniert ...aber dein Geist zu setzen zu erleichtern, hier ist ein Auszug aus dem subprocess documentation:

stdin, stdout und stderr geben Sie die ausgeführten Programme Standardeingabe, Standardausgabe und Standardfehler Datei-Handles, respectively. Gültige Werte sind PIPE, eine vorhandene Datei Descriptor (eine positive Ganzzahl), ein vorhandenes Dateiobjekt und keine. PIPE zeigt an, dass eine neue Pipe für das Kind erstellt werden soll. Mit Keine wird keine Umleitung auftreten; Die Dateihandles des Kindes werden von dem übergeordneten geerbt.

Die wahrscheinlichsten Fehlerbereiche sind die Kommunikation mit dem Hauptprozess oder die Verwaltung des Semaphors. Vielleicht gehen Statusübergänge/Synchronisation aufgrund eines Fehlers nicht wie erwartet vor? Ich schlage das Debuggen vor, indem ich nach jedem blockierenden Aufruf Protokoll-/Druckanweisungen vor & hinzufüge - wo du mit dem Hauptprozess kommunizierst und wo du den Semaphor erwirbst/frei gibst, um einzuschränken, wo die Dinge falsch gelaufen sind.

Auch ich bin neugierig - ist der Semaphor absolut notwendig?

+0

Danke für die Antwort! Also, obwohl ich mehrfach auf/dev/stdin &/dev/stdout referenziere, bekommt jeder Prozess seine eigene Instanz davon, und es gibt keine Chance, dass sie sich gegenseitig verwirren ?? Süss! Betrachtet man weiter die Fehler, nachdem das Programm (^ C) gelöscht wurde, so scheint es, als ob die Dead-Lock-Operation mit einem Multiprocessing.Queue auftritt. Ich habe eine Reihe von ihnen, die ich als eine Kaskade oder einen Knock-On-Effekt durch Senden von "endSEQ" und schließlich "STOP" vorstellen. Es dauerte lange genug, um letzten Monat mit einem Prozess zu debuggen. Versuchen, meinen Kopf zurück um es jetzt zu bekommen, :( Und ich glaube nicht, dass der Semaphor notwendig ist. Gegangen :) –

+0

np! Die Dateien/dev/stdin &/dev/stdout werden den speziellen stdin & stdout-Dateideskriptoren zugeordnet, sodass sie kein zentrales Gerät darstellen. --- Ich sehe die Verwendung der Warteschlange im align_seq Kindprozess nicht, wird sie nur im Hauptprozess verwendet? Wenn ja, dann kannst du einfach queue.Queue verwenden. Sie sagen auch, Sie vermuten einen Deadlock - Sie konsumieren nicht bidirektional, sind Sie (main & child/ren)? –

+0

Nochmals vielen Dank für die Klärung von stdin/stdout. Jetzt denke ich darüber nach, irgendwie offensichtlich in der Art, wie ich sie benutze, aber ich habe es nie zuvor explizit erwähnt. Du hast absolut recht; Die Warteschlangen befinden sich in einem separaten Teil des Programms, in dem meine Deadlocks zu erscheinen scheinen. Ich werde den Hauptpost aktualisieren, weil es viel mehr Multiprocessing zu dem Programm gibt, das ich ursprünglich nicht angegeben habe ... Ich werde jedoch zuerst auf die unten stehenden Posts antworten. Ich habe es jetzt so, dass es manchmal endet, und manchmal bricht es kurz vor dem Ende, was ich aufgrund der Latenz (??) vermute. time.sleep ist meine temporäre, dreckige Lösung –

1

Ich wollte auch einfache Aufgaben parallelisieren und dafür habe ich ein kleines Python-Skript erstellt. Sie können einen Blick auf: http://bioinf.comav.upv.es/psubprocess/index.html

Ist ein wenig allgemeiner als was Sie wollen, aber für einfache Aufgaben ist sehr einfach zu bedienen. Es könnte dich zumindest etwas interessieren.

Jose Blanca

+0

Schön aussehende Website und eine beeindruckende Liste von Publikationen! Ja, einige Inspirationen und Beispiele wären schön (!), Da das Multiprocessing-Modul ein Biest ist, mit dem ich bisher nur an der Oberfläche kratze!Ich werde sicher einen Blick auf Psubprocess werfen, wenn ich eine Chance bekomme; An diesem Wochenende werde ich mehr schreiben als programmieren, Termine und Treffen am Montag ... Danke für den Link! –

0

Es ist ein Deadlock in subprocess sein könnte, haben Sie versucht, mit der Methode kommunizieren anstatt warten? http://docs.python.org/library/subprocess.html

+0

Ich vermeide die Verwendung von kommunizieren, weil kommunizieren wird Deadlock, wenn der Puffer gefüllt wird, was es sehr leicht mit großen Sequenz-Alignments tut (auch wenn die bufsize = -1 Option), so lese ich normalerweise die Ausgabe, wie es produziert wird. Eine meiner bevorzugten Methoden bestand darin, die Popen-Instanz an eine separate Funktion zu übergeben, die ich für das Parsing der .stdout von jedem beliebigen aufgerufenen Programm verwende. In der Elternfunktion überprüfe ich den Rückkehrcode, und wenn das nicht Null ist, dann lese ich den stderr. –