2016-11-11 2 views
0

Ich versuche, einen Cluster zu starten, der Dateien (neue Zeile JSON) aus Google Cloud-Speicher streamen und jede Zeile nach dem Abrufen von Daten aus MongoDB transformieren. Nachdem ich die Zeile transformiert habe, möchte ich sie in Googles BigQuery speichern - 10000 Zeilen gleichzeitig. All dies funktioniert gut, aber das Problem ist, dass die Rate, mit der die gestreamten Dateien verarbeitet werden, im Laufe der Zeit erheblich abnimmt.Node.js Lesbarer Stream verlangsamt sich im Laufe der Zeit, CPU-Auslastung sinkt

Ich habe die Knoten-Anwendung auf einem Server und Mongodb auf einem anderen Server eingerichtet. Beide 8-Core-Maschinen mit 30 GB RAM. Wenn das Skript ausgeführt wird, liegt die CPU-Auslastung für den Anwendungsserver und den mongodb-Server zunächst bei 70% -75%. Nach 30 Minuten sinkt die CPU-Auslastung auf 10% und schließlich auf 1%. Die Anwendung generiert keine Ausnahmen. Ich kann das Anwendungsprotokoll sehen und feststellen, dass es mit der Verarbeitung einiger Dateien fertig ist und neue Dateien zur Verarbeitung übernommen hat. Eine Hinrichtung kann etwas später als 15:00 Uhr und fast bis 17:20 Uhr beobachtet werden.

CPU Usage Disk Usage

var cluster = require('cluster'), 
    os = require('os'), 
    numCPUs = os.cpus().length, 
    async = require('async'), 
    fs = require('fs'), 
    google = require('googleapis'), 
    bigqueryV2 = google.bigquery('v2'), 
    gcs = require('@google-cloud/storage')({ 
     projectId: 'someproject', 
     keyFilename: __dirname + '/auth.json' 
    }), 
    dataset = bigquery.dataset('somedataset'), 
    bucket = gcs.bucket('somebucket.appspot.com'), 
    JSONStream = require('JSONStream'), 
    Transform = require('stream').Transform, 
    MongoClient = require('mongodb').MongoClient, 
    mongoUrl = 'mongodb://localhost:27017/bigquery', 
    mDb, 
    groupA, 
    groupB; 

var rows = [], 
    rowsLen = 0; 

function transformer() { 

    var t = new Transform({ objectMode: true }); 

    t._transform = function(row, encoding, cb) { 
     // Get some information from mongodb and attach it to the row 
     if (row) { 
      groupA.findOne({ 
       'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
      }, { 
       fields: { 'properties.OA_SA': 1, _id: 0 } 
      }, function(err, a) { 
       if (err) return cb(); 
       groupB.findOne({ 
        'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } } 
       }, { 
        fields: { 'properties.WZ11CD': 1, _id: 0 } 
       }, function(err, b) { 
        if (err) return cb(); 
        row.groupA = a ? a.properties.OA_SA : null; 
        row.groupB = b ? b.properties.WZ11CD : null; 

        // cache processed rows in memory 
        rows[rowsLen++] = { json: row }; 

        if (rowsLen >= 10000) { 
         // batch insert rows in bigquery table 
         // and free memory 
         log('inserting 10000') 
         insertRowsAsStream(rows.splice(0, 10000)); 
         rowsLen = rows.length; 
        } 

        cb(); 
       }); 
      }); 
     } else { 
      cb(); 
     } 
    }; 

    return t; 
} 

var log = function(str) { 
    console.log(str); 
} 

function insertRowsAsStream(rows, callback) { 
    bigqueryV2.tabledata.insertAll({ 
     "projectId": 'someproject', 
     "datasetId": 'somedataset', 
     "tableId": 'sometable', 
     "resource": { 
      "kind": "bigquery#tableDataInsertAllRequest", 
      "rows": rows 
     } 
    }, function(err, res) { 
     if (res && res.insertErrors && res.insertErrors.length) { 
      console.log(res.insertErrors[0].errors) 
      err = err || new Error(JSON.stringify(res.insertErrors)); 
     } 
    }); 
} 


function startStream(fileName, cb) { 
    // stream a file from Google cloud storage 
    var file = bucket.file(fileName), 
     called = false; 

    log(`Processing file ${fileName}`); 

    file.createReadStream() 
     .on('data', noop) 
     .on('end', function() { 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }) 
     .pipe(JSONStream.parse()) 
     .pipe(transformer()) 
     .on('finish', function() { 
      log('transformation ended'); 
      if (!called) { 
       called = true; 
       cb(); 
      } 
     }); 
} 

function processFiles(files, cpuIdentifier) { 
    if (files.length == 0) return; 
    var fn = []; 

    for (var i = 0; i < files.length; i++) { 
     fn.push(function(cb) { 
      startStream(files.pop(), cb); 
     }); 
    } 

    // process 3 files in parallel 
    async.parallelLimit(fn, 3, function() { 
     log(`child process ${cpuIdentifier} completed the task`); 
     fs.appendFile(__dirname + '/complete_count.txt', '1'); 
    }); 
} 

if (cluster.isMaster) { 
    for (var ii = 0; ii < numCPUs; ii++) { 
     cluster.fork(); 
    } 
} else { 
    MongoClient.connect(mongoUrl, function(err, db) { 
     if (err) throw (err); 
     mDb = db; 
     groupA = mDb.collection('groupageo'); 
     groupB = mDb.collection('groupbgeo'); 
     processFiles(files, process.pid); 
     // `files` is an array of file names 
     // each file is in newline json delimited format 
     // ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"] 
    }); 
} 
+0

Wie war RAM und Festplattennutzung? –

+0

Ich vermute, dass RAM war in Ordnung, weil ich nicht einen Speicherzuweisung/GC-Fehler erhalten habe. Warum sollte HD bei dieser Lösung eine Rolle spielen? –

+0

Keine Fehler bei der Speicherzuordnung zu bekommen bedeutet nicht, dass es kein Problem damit gibt. Eine übermäßige Verwendung von RAM wird die Auslagerungsspeicherauslastung auslösen, die wiederum die Festplatte verwendet. –

Antwort

0

Okay, ich habe die Ursache gefunden! Google APIs Node.js-Client-Bibliothek verwendet ein Modul namens "stream-events", das Streams 0.8 implementiert. Streams 0.8 steuern nicht die Rate, mit der sie das "Daten" -Ereignis ausgibt, basierend auf der Fähigkeit des Verbrauchers, Daten zu konsumieren. Die Ratensteuerung wurde in Streams 1.0 eingeführt. Das bedeutete im Wesentlichen, dass der lesbare Stream Daten in MongoDB mit einer Rate warf, die nicht verarbeitet werden konnte.

Lösung: Ich verwendete das Modul "Anfrage" anstelle der Google-Client-Bibliothek. Ich habe dem Request-Modul eine signierte URL zur Verfügung gestellt, die wiederum die Ergebnisse als Stream abruft, den ich in meinen Transformer pipen konnte.

Take Away: Überprüfen Sie immer die Module, die Sie für die verwendeten Stream-Versionen verwenden.

Verwandte Themen