Ich versuche, einen Cluster zu starten, der Dateien (neue Zeile JSON) aus Google Cloud-Speicher streamen und jede Zeile nach dem Abrufen von Daten aus MongoDB transformieren. Nachdem ich die Zeile transformiert habe, möchte ich sie in Googles BigQuery speichern - 10000 Zeilen gleichzeitig. All dies funktioniert gut, aber das Problem ist, dass die Rate, mit der die gestreamten Dateien verarbeitet werden, im Laufe der Zeit erheblich abnimmt.Node.js Lesbarer Stream verlangsamt sich im Laufe der Zeit, CPU-Auslastung sinkt
Ich habe die Knoten-Anwendung auf einem Server und Mongodb auf einem anderen Server eingerichtet. Beide 8-Core-Maschinen mit 30 GB RAM. Wenn das Skript ausgeführt wird, liegt die CPU-Auslastung für den Anwendungsserver und den mongodb-Server zunächst bei 70% -75%. Nach 30 Minuten sinkt die CPU-Auslastung auf 10% und schließlich auf 1%. Die Anwendung generiert keine Ausnahmen. Ich kann das Anwendungsprotokoll sehen und feststellen, dass es mit der Verarbeitung einiger Dateien fertig ist und neue Dateien zur Verarbeitung übernommen hat. Eine Hinrichtung kann etwas später als 15:00 Uhr und fast bis 17:20 Uhr beobachtet werden.
var cluster = require('cluster'),
os = require('os'),
numCPUs = os.cpus().length,
async = require('async'),
fs = require('fs'),
google = require('googleapis'),
bigqueryV2 = google.bigquery('v2'),
gcs = require('@google-cloud/storage')({
projectId: 'someproject',
keyFilename: __dirname + '/auth.json'
}),
dataset = bigquery.dataset('somedataset'),
bucket = gcs.bucket('somebucket.appspot.com'),
JSONStream = require('JSONStream'),
Transform = require('stream').Transform,
MongoClient = require('mongodb').MongoClient,
mongoUrl = 'mongodb://localhost:27017/bigquery',
mDb,
groupA,
groupB;
var rows = [],
rowsLen = 0;
function transformer() {
var t = new Transform({ objectMode: true });
t._transform = function(row, encoding, cb) {
// Get some information from mongodb and attach it to the row
if (row) {
groupA.findOne({
'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } }
}, {
fields: { 'properties.OA_SA': 1, _id: 0 }
}, function(err, a) {
if (err) return cb();
groupB.findOne({
'geometry': { $geoIntersects: { $geometry: { type: 'Point', coordinates: [row.lon, row.lat] } } }
}, {
fields: { 'properties.WZ11CD': 1, _id: 0 }
}, function(err, b) {
if (err) return cb();
row.groupA = a ? a.properties.OA_SA : null;
row.groupB = b ? b.properties.WZ11CD : null;
// cache processed rows in memory
rows[rowsLen++] = { json: row };
if (rowsLen >= 10000) {
// batch insert rows in bigquery table
// and free memory
log('inserting 10000')
insertRowsAsStream(rows.splice(0, 10000));
rowsLen = rows.length;
}
cb();
});
});
} else {
cb();
}
};
return t;
}
var log = function(str) {
console.log(str);
}
function insertRowsAsStream(rows, callback) {
bigqueryV2.tabledata.insertAll({
"projectId": 'someproject',
"datasetId": 'somedataset',
"tableId": 'sometable',
"resource": {
"kind": "bigquery#tableDataInsertAllRequest",
"rows": rows
}
}, function(err, res) {
if (res && res.insertErrors && res.insertErrors.length) {
console.log(res.insertErrors[0].errors)
err = err || new Error(JSON.stringify(res.insertErrors));
}
});
}
function startStream(fileName, cb) {
// stream a file from Google cloud storage
var file = bucket.file(fileName),
called = false;
log(`Processing file ${fileName}`);
file.createReadStream()
.on('data', noop)
.on('end', function() {
if (!called) {
called = true;
cb();
}
})
.pipe(JSONStream.parse())
.pipe(transformer())
.on('finish', function() {
log('transformation ended');
if (!called) {
called = true;
cb();
}
});
}
function processFiles(files, cpuIdentifier) {
if (files.length == 0) return;
var fn = [];
for (var i = 0; i < files.length; i++) {
fn.push(function(cb) {
startStream(files.pop(), cb);
});
}
// process 3 files in parallel
async.parallelLimit(fn, 3, function() {
log(`child process ${cpuIdentifier} completed the task`);
fs.appendFile(__dirname + '/complete_count.txt', '1');
});
}
if (cluster.isMaster) {
for (var ii = 0; ii < numCPUs; ii++) {
cluster.fork();
}
} else {
MongoClient.connect(mongoUrl, function(err, db) {
if (err) throw (err);
mDb = db;
groupA = mDb.collection('groupageo');
groupB = mDb.collection('groupbgeo');
processFiles(files, process.pid);
// `files` is an array of file names
// each file is in newline json delimited format
// ["1478854974993/000000000000.json","1478854974993/000000000001.json","1478854974993/000000000002.json","1478854974993/000000000003.json","1478854974993/000000000004.json","1478854974993/000000000005.json"]
});
}
Wie war RAM und Festplattennutzung? –
Ich vermute, dass RAM war in Ordnung, weil ich nicht einen Speicherzuweisung/GC-Fehler erhalten habe. Warum sollte HD bei dieser Lösung eine Rolle spielen? –
Keine Fehler bei der Speicherzuordnung zu bekommen bedeutet nicht, dass es kein Problem damit gibt. Eine übermäßige Verwendung von RAM wird die Auslagerungsspeicherauslastung auslösen, die wiederum die Festplatte verwendet. –