2015-05-26 5 views
5

Ich habe einen Stream, den ich verarbeiten, indem ich auf die Ereignisse data, error und end Ereignisse abhöre, und ich rufe eine Funktion auf, jedes data Ereignis im ersten Stream zu verarbeiten. Natürlich ruft die Funktion, die die Daten verarbeitet, andere Rückrufe auf, was sie asynchron macht. Wie beginne ich mehr Code auszuführen, wenn die Daten im Stream verarbeitet werden? Das Abhören des end Ereignisses im Stream bedeutet NICHT, dass die asynchronen data Verarbeitungsfunktionen beendet sind.Wie kann sichergestellt werden, dass der asynchrone Code ausgeführt wird, nachdem ein Stream verarbeitet wurde?

Wie kann ich sicherstellen, dass die Stream-Datenverarbeitungsfunktionen beendet sind, wenn ich meine nächste Anweisung ausfühle? Hier

ein Beispiel:

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { 
    var self = this; 
    var promises = []; 
    accountStream 
    .on('data', function (account) { 
     migrateAccount.bind(self)(account, finishMigration); 
    }) 
    .on('error', function (err) { 
     return console.log(err); 
    }) 
    .on('end', function() { 
     console.log("Finished updating account stream (but finishMigration is still running!!!)"); 
     callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running! 
    }); 
} 

var migrateAccount = function (oldAccount, callback) { 
    executeSomeAction(oldAccount, function(err, newAccount) { 
    if (err) return console.log("error received:", err); 
    return callback(newAccount); 
    }); 
} 

var finishMigration = function (newAccount) { 
    // some code that is executed asynchronously... 
} 

Wie stelle ich sicher, dass callThisOnlyAfterAllAccountsAreMigrated aufgerufen wird, nachdem der Strom verarbeitet worden ist?

Kann dies mit Versprechungen getan werden? Kann es mit durchgehenden Streams gemacht werden? Ich arbeite mit Nodejs, daher könnte die Bezugnahme auf andere npm-Module hilfreich sein.

Antwort

2

Wie Sie gesagt haben, ist das Abhören des end Ereignisses im Stream alleine nutzlos. Der Stream weiß nicht, was Sie mit den Daten in Ihrem Handler "data" machen. Sie müssten also Code schreiben, um den Status migrateAccount zu verfolgen.

Wenn ich es wäre, würde ich diesen ganzen Abschnitt umschreiben. Wenn Sie das Ereignis readable mit .read() in Ihrem Stream verwenden, können Sie so viele Elemente gleichzeitig lesen, wie Sie möchten. Wenn das eins ist, kein Problem. Wenn es 30 ist, großartig. Der Grund, warum Sie dies tun, ist so, dass Sie nicht über die Arbeit mit den Daten aus dem Stream hinausgehen. Wie es gerade ist, wenn AccountStream schnell ist, wird Ihre Anwendung zweifellos irgendwann abstürzen.

Wenn Sie ein Element aus einem Stream lesen und mit der Arbeit beginnen, nehmen Sie das Versprechen, das Sie erhalten (verwenden Sie Bluebird oder ähnliches) und werfen Sie es in ein Array. Wenn das Versprechen gelöst ist, entfernen Sie es aus dem Array. Wenn der Stream endet, fügen Sie einen .done() Handler an .all() (im Grunde eine große Versprechen aus jeder noch im Array Versprechen).

Sie können auch einen einfachen Zähler für laufende Jobs verwenden.

+0

Für mich ist das 'end' Ereignis scheint ziemlich nützlich, als dass der Punkt, wo Sie beginnen müssen schauen, wie viele Arbeitsplätze noch laufen , so dass Sie den letzten Rückruf auslösen können, wenn keine weiteren übrig sind. – Bergi

+0

@Bergi Guter Punkt, ich habe geklärt, worauf ich hinauswollte. – Brad

+1

@Brad Ich denke, dass Ihre Lösung, die Versprechungen und das "lesbare" Ereignis verwendet, effektiver ist als meine obige Lösung. Würde es Ihnen etwas ausmachen, ein Codierungsbeispiel hinzuzufügen? Versprechen sind ein wenig schwierig, so dass es hilfreich wäre, Versprechungen in Aktion auf den Code der Frage zu sehen ... – modulitos

1

ein durch Strom (NPM through2 Modul) Verwendung, löste ich dieses Problem mit dem folgenden Code, der das asynchrone Verhalten steuert:

var through = require('through2').obj; 
function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { 
    var self = this; 
    var promises = []; 
    accountStream.pipe(through(function(account, _, next) { 
    migrateAccount.bind(self)(account, finishMigration, next); 
    })) 
    .on('data', function (account) { 
    }) 
    .on('error', function (err) { 
     return console.log(err); 
    }) 
    .on('end', function() { 
     console.log("Finished updating account stream"); 
     callThisOnlyAfterAllAccountsAreMigrated(); 
    }); 
} 

var migrateAccount = function (oldAccount, callback, next) { 
    executeSomeAction(oldAccount, function(err, newAccount) { 
    if (err) return console.log("error received:", err); 
    return callback(newAccount, next); 
    }); 
} 

var finishMigration = function (newAccount, next) { 
    // some code that is executed asynchronously, but using 'next' callback when migration is finished... 
} 
1

Es ist viel einfacher, wenn Sie Ströme über Versprechungen behandeln.

Kopiert von here, ein Beispiel, das spex Bibliothek verwendet:

var spex = require('spex')(Promise); 
var fs = require('fs'); 

var rs = fs.createReadStream('values.txt'); 

function receiver(index, data, delay) { 
    return new Promise(function (resolve) { 
     console.log("RECEIVED:", index, data, delay); 
     resolve(); // ok to read the next data; 
    }); 
} 

spex.stream.read(rs, receiver) 
    .then(function (data) { 
     // streaming successfully finished; 
     console.log("DATA:", data); 
    }, function (reason) { 
     // streaming has failed; 
     console.log("REASON:", reason); 
    }); 
Verwandte Themen