Ich verwende async queue, um eine große Menge an Daten zu verarbeiten. Die Warteschlange funktioniert gut, bis ich versuche, ein Update in der Datenbank mit MongoDB findOneAndUpdate Methode durchzuführen.Warum werden meine Callbacks nicht mit einer asynchronen Warteschlange ausgelöst?
Ich etablieren zunächst die Warteschlange, und starten Sie Objekte, um sie von einem Node-Stream drängen:
//Create queue to process all items
let q = async.queue(processLink, 2);
// Create Node Stream
let createStream = function() {
let stream = fs.createReadStream(LinkData, {encoding: 'utf8'});
let parser = JSONStream.parse('RECDATA.*');
return stream.pipe(parser);
};
//Listen to 'data' event on stream and add object to queue
createStream().on('data', function(link){
q.push(link)
});
Hier ist meine Aufgabe, Funktion, ‚processLink‘. Es ist hier, dass ich Probleme habe, das Problem aufzuspüren. Immer wenn der Callback findOneAndUpdate ausgelöst wird, tritt er in einen der Bedingungsblöcke ein und ich erhalte die Nachricht an der Konsole abgemeldet, aber wenn ich den asynchronen Callbackprozess Complete() aufruft, wird die Aufgabe nicht wie erwartet beendet.
Wie der Titel sagt, warum werden meine Async-Callbacks nicht alle Aufgaben abgeschlossen?
function processLink(link, processComplete){
if(_.includes(link.URL, 'www.usda.gov') && _.includes(link.URL, '?recid=')){
let url_items = _.split(link.URL, '=',2);
let facilityOrgID = url_items[1];
let update = {$push: {"links": link}};
if(_.isNumber(parseInt(facilityOrgID)) && facilityOrgID.length > 4){
Facility.findOneAndUpdate({facilityOrgID: parseInt(facilityOrgID)}, update, (err, result) => {
if(err !== null){
console.log("Error:",err);
return processComplete(err); /** NOT FIRING **/
} else if(err === null && result !== null){
console.log("Link added to:", result.name);
return processComplete(); /** NOT FIRING **/
}else if(err === null && result === null){
console.log('Facility not in database');
processComplete(); /** NOT FIRING **/
}else{
console.log('Something has gone terrible wrong');
}
});
}else{
console.log("Invalid facilityID");
return processComplete();
}
}else{
console.log('Link Discarded:', link.URL);
processComplete(); /** Fires normally **/
}
}