2017-05-10 4 views
1

Ich habe eine schöne gerade Arbeitsrohr, wo die Aufgabe, die ich über Luigi auf der Befehlszeile ausführen löst alle erforderlichen Upstream-Daten holen und verarbeiten in seiner richtigen Reihenfolge bis es tröpfelt in meine Datenbank.Luigi Upstream-Task sollte einmal ausgeführt werden, um Eingabe für Satz von nachgelagerten Aufgaben zu erstellen

class IMAP_Fetch(luigi.Task): 
    """fetch a bunch of email messages with data in them""" 
    date = luigi.DateParameter() 
    uid = luigi.Parameter() 
… 
    def output(self): 
    loc = os.path.join(self.data_drop, str(self.date)) 
    # target for requested message 
    yield LocalTarget(os.path.join(loc, uid+".msg")) 

    def run(self): 
    # code to connect to IMAP server and run FETCH on given UID 
    # message gets written to self.output() 
… 

class RecordData(luigi.contrib.postgres.CopyToTable): 
    """copy the data in one email message to the database table""" 
    uid = luigi.Parameter() 
    date = luigi.DateParameter() 
    table = 'msg_data' 
    columns = [(id, int), …] 

    def requires(self): 
    # a task (not shown) that extracts data from one message 
    # which in turn requires the IMAP_Fetch to pull down the message 
    return MsgData(self.date, self.uid) 

    def rows(self): 
    # code to read self.input() and yield lists of data values 

Tolle Sachen. Unglücklicherweise spricht der erste Datenabruf mit einem entfernten IMAP-Server und jeder Abruf ist eine neue Verbindung und eine neue Abfrage: sehr langsam. Ich weiß, wie man alle einzelnen Nachrichtendateien in einer Sitzung (Aufgabeninstanz) bekommt. Ich verstehe nicht, wie ich die nachgelagerten Aufgaben so behalte, wie sie sind, indem ich jeweils eine Nachricht bearbeite, da die Aufgabe, die eine Nachricht benötigt, nur diese eine Nachricht auslöst, und nicht alle verfügbaren Nachrichten abruft. Ich entschuldige mich im Voraus für die fehlenden offensichtlichen Lösungen, aber es hat mich soweit verblüfft, wie ich meine nette, einfache, blöde Pfeife größtenteils so halten kann, wie es ist, aber den Trichter an der Spitze in einem Anruf alle Daten einsaugen zu lassen. Danke für Ihre Hilfe.

+0

Bitte fügen Sie einen Code, der zeigt, was Ihre Aufgaben tun und wie so können wir Ihnen helfen – matagus

+0

aktualisiert meinen ursprünglichen Beitrag wie angefordert – ib4u

Antwort

1

Was fehlt mir von Ihrer Erklärung ist, wo die Liste der uid Werte, die an die RecordData Aufgabe gesendet werden kommen aus. Für diese Erklärung nehme ich an, dass Sie eine Gruppe von uid Werten haben, die Sie in eine einzige ImapFetch Anfrage zusammenfassen möchten.

Ein möglicher Ansatz ist es, eine batch_id, zusätzlich zu definieren Ihre uid, wobei die batch_id auf die Gruppe von Nachrichten verweist in einer einzigen Sitzung holen möchte .. Wo die Zuordnung zwischen einem uid und einem batch_id gespeichert wird wie du willst. Es kann ein Parameter sein, der an die Pipeline übergeben oder extern gespeichert wird. Die Aufgabe, die Sie weggelassen haben, MsgData, die eine, deren requires Methode eine einzige ImapFetch Aufgabe mit einem uid Parameter im Moment zurückgibt, sollte stattdessen eine ImapFetch Aufgabe erfordern, die einen batch_id Parameter nimmt. Der erste Task ImapFetch, der von einer Task MsgData benötigt wird, ruft alle uid Werte ab, die mit diesem batch_id verknüpft sind, und ruft diese Nachrichten dann in einer einzigen Sitzung ab. Alle anderen MsgData Aufgaben würden benötigt (und warten auf) diese eine Charge ImapFetch zu vervollständigen, und dann würden sie alle in der Lage sein, auf ihrer individuellen Nachricht auszuführen, wie der Rest der Pipeline würde. Daher kann sich die Abstimmung der Chargengröße als wichtig für den gesamten Verarbeitungsdurchsatz erweisen.

Ein weiterer Nachteil ist, dass es Atom- Atom auf Chargenebene weniger ist eher als die einzelne Positionsebene, wie der Ansatz ImapFetch, wenn nur einer der uid Werte scheitern würde nicht erfolgreich abgerufen wurde. Ein zweiter Ansatz besteht darin, die Imap-Sitzung als mehr Singleton-Ressource pro Prozess (Worker) zu öffnen und die ImapFetch-Tasks die gleiche Sitzung wiederzuverwenden.

+0

Vielen Dank für Ihre klare, durchdachte Erklärung. Nachdem ich das hier gepostet habe, kam auch dein erster Vorschlag zu mir und du hast ihn für mich validiert. Ich mag auch die zweite Idee. Danke noch einmal. – ib4u

Verwandte Themen