2012-06-27 8 views
10

Ich muss herausfinden, wie Daten, die in node.js geschrieben werden, in Echtzeit in eine Datei geschrieben werden. Das Problem ist, dass Node ein sich schnell bewegendes Schiff ist, das es schwierig macht, die beste Methode zur Lösung eines Problems zu finden.Eine Datei in Echtzeit mit Node.js lesen

What I Want
zu tun habe ich ein Java-Prozess, der etwas tut, und dann die Ergebnisse dieser Sache zu schreiben tut es in einer Textdatei. Es dauert in der Regel zwischen 5 Minuten und 5 Stunden, da die Daten die ganze Zeit geschrieben werden und einige ziemlich hohe Durchsatzraten erreichen (ca. 1000 Zeilen/Sek.).

Ich möchte diese Datei in Echtzeit lesen, und dann mit Knoten aggregieren Sie die Daten und schreiben Sie es in einen Socket, wo es auf dem Client grafisch dargestellt werden kann.

Der Client, Grafiken, Sockets und Aggregation Logik sind alle fertig, aber ich bin verwirrt über die beste Methode zum Lesen der Datei.

Was habe ich versucht (oder zumindest gespielt mit)
FIFO - Ich kann meine Java-Prozess sagen zu einem Fifo und lesen Sie diese unter Verwendung von Knoten, das ist in der Tat zu schreiben, wie wir derzeit haben diese implemted Perl , aber weil alles andere im Knoten läuft, ist es sinnvoll, den Code zu portieren.

Unix Sockets - Wie oben.

fs.watchFile - wird das für das funktionieren, was wir brauchen?

fs.createReadStream - ist das besser als WatchFile?

fs & tail -f - scheint wie ein Hack.

Was eigentlich meine Frage ist
ich zum Einsatz von Unix-Sockets bin dazu neigt, scheint dies die schnellste Option. Aber hat Knoten bessere integrierte Funktionen zum Lesen von Dateien von den fs in Echtzeit?

Antwort

6

Wenn Sie die Datei als persistenter Speicher Ihrer Daten behalten wollen einen Verlust von Strom im Fall eines Systemabsturzes oder eines der Mitglieder in Ihrem Netzwerk zu verhindern Wenn Prozesse ablaufen, können Sie weiterhin in eine Datei schreiben und daraus lesen.

Wenn Sie diese Datei nicht als persistenten Speicher der produzierten Ergebnisse aus Ihrem Java-Prozess benötigen, dann ist die Verwendung eines Unix-Sockels viel besser für die Leichtigkeit und auch die Leistung.

fs.watchFile() ist nicht das, was Sie brauchen, weil es auf Dateistats arbeitet, da Dateisystem es meldet, und da Sie die Datei lesen möchten, wie es bereits geschrieben wird, ist das nicht was Sie wollen.

SHORT UPDATE: Ich bin sehr traurig, zu erkennen, dass, obwohl ich fs.watchFile() für die Verwendung von Datei-Statistik in vorherigem Absatz beschuldigt hatte, ich die gleiche Sache selbst in meinem Beispiel-Code unten getan hatte! Obwohl ich die Leser bereits gewarnt hatte, "mach's gut!" weil ich es in nur wenigen Minuten geschrieben hatte, ohne es gut zu testen; Dennoch kann es besser mit fs.watch() anstelle von watchFile oder fstatSync getan werden, wenn das zugrunde liegende System es unterstützt.

Zum Lesen/aus einer Datei zu schreiben, habe ich nur unten in meiner Pause zum Spaß geschrieben:

Test-fs-writer.js: [Sie brauchen das nicht, da Sie Datei in Ihrem Java schreiben Prozess]

var fs = require('fs'), 
    lineno=0; 

var stream = fs.createWriteStream('test-read-write.txt', {flags:'a'}); 

stream.on('open', function() { 
    console.log('Stream opened, will start writing in 2 secs'); 
    setInterval(function() { stream.write((++lineno)+' oi!\n'); }, 2000); 
}); 

Test-fs-reader.js: [Achten Sie darauf, das ist nur Demonstration, err Objekte überprüfen]

var fs = require('fs'), 
    bite_size = 256, 
    readbytes = 0, 
    file; 

fs.open('test-read-write.txt', 'r', function(err, fd) { file = fd; readsome(); }); 

function readsome() { 
    var stats = fs.fstatSync(file); // yes sometimes async does not make sense! 
    if(stats.size<readbytes+1) { 
     console.log('Hehe I am much faster than your writer..! I will sleep for a while, I deserve it!'); 
     setTimeout(readsome, 3000); 
    } 
    else { 
     fs.read(file, new Buffer(bite_size), 0, bite_size, readbytes, processsome); 
    } 
} 

function processsome(err, bytecount, buff) { 
    console.log('Read', bytecount, 'and will process it now.'); 

    // Here we will process our incoming data: 
     // Do whatever you need. Just be careful about not using beyond the bytecount in buff. 
     console.log(buff.toString('utf-8', 0, bytecount)); 

    // So we continue reading from where we left: 
    readbytes+=bytecount; 
    process.nextTick(readsome); 
} 
!

Sie können die Verwendung von nextTick vermeiden und stattdessen direkt readsome() anrufen. Da wir hier immer noch synchron arbeiten, ist es in keiner Hinsicht notwendig. Ich mag es einfach. : P

EDIT von Oliver Lloyd

Nehmen wir das Beispiel oben, aber es zu lesen CSV-Daten erstrecken gibt:

var lastLineFeed, 
    lineArray; 
function processsome(err, bytecount, buff) { 
    lastLineFeed = buff.toString('utf-8', 0, bytecount).lastIndexOf('\n'); 

    if(lastLineFeed > -1){ 

     // Split the buffer by line 
     lineArray = buff.toString('utf-8', 0, bytecount).slice(0,lastLineFeed).split('\n'); 

     // Then split each line by comma 
     for(i=0;i<lineArray.length;i++){ 
      // Add read rows to an array for use elsewhere 
      valueArray.push(lineArray[i].split(',')); 
     } 

     // Set a new position to read from 
     readbytes+=lastLineFeed+1; 
    } else { 
     // No complete lines were read 
     readbytes+=bytecount; 
    } 
    process.nextTick(readFile); 
} 
+0

Dies ist ein gutes Beispiel, das meine Frage direkt anspricht. Es erfordert jedoch eine Verbesserung, nur eine Zeile zu einer Zeit zu verarbeiten, aber das ist wohl eine gute Sache. Knoten Mangel an vorhandenen fs-Schnittstelle bedeutet, dass es vollständig anpassbar ist, so dass selbst wenn ich extra Code schreiben muss, kann ich genau das erreichen, was ich brauche. –

+0

Ich habe das obige Beispiel erweitert, um mit einer CSV-Datei zu arbeiten. –

+0

Dies funktioniert absolut, wenn es als Knoten ausgeführt wird, aber wie kann ich diesen Code in app.js einfügen und das Ergebnis in HTML-Seite bekommen? – sand

4

Warum denken Sie, tail -f ist ein Hack?

Während ich herausgefunden habe, dass ich ein gutes Beispiel gefunden habe, würde ich etwas ähnliches tun. Echtzeit-Online-Aktivitätsmonitor Beispiel mit node.js und WebSocket:
http://blog.new-bamboo.co.uk/2009/12/7/real-time-online-activity-monitor-example-with-node-js-and-websocket

Genau diese Antwort vollständig zu machen, schrieb ich Ihnen ein Beispiel-Code, der unter 0.8.0 laufen würde - (der HTTP-Server ist ein Hack vielleicht).

Ein Kind Prozess wird mit Schwanz gelaicht läuft, und da ein Kind-Prozess ein EventEmitter mit drei Ströme ist (wir stdout in unserem Fall verwenden) können Sie nur mit on

Dateiname die einen Listener hinzu: tailServer.js

Nutzung: node tailServer /var/log/filename.log

var http = require("http"); 
var filename = process.argv[2]; 


if (!filename) 
    return console.log("Usage: node tailServer filename"); 

var spawn = require('child_process').spawn; 
var tail = spawn('tail', ['-f', filename]); 

http.createServer(function (request, response) { 
    console.log('request starting...'); 

    response.writeHead(200, {'Content-Type': 'text/plain' }); 

    tail.stdout.on('data', function (data) { 
     response.write('' + data);     
    }); 
}).listen(8088); 

console.log('Server running at http://127.0.0.1:8088/'); 
+0

Meine Sorge mit tail -f ist, dass es erfordert, dass der Lesevorgang aktiv sein, bevor die Datei geschrieben wird, wenn es nicht Daten ist hat verloren. Mein Anwendungsfall ist so, dass das Lesen lange nach dem Schreiben von Daten passieren kann. +1 für das update auf 0.8, obwohl dies eine gute Lösung dafür ist, wo das Schreiben und Lesen von der gleichen Quelle gesteuert wird. –

+0

watchFile ist ebenfalls ereignisgesteuert, aber laut Dokumentation nicht stabil. Das obige Beispiel ändert Dateiänderungen durch Abfragen im High-Level-Code. Für mich sieht das wie ein Hack aus. Aber solange es für dich funktioniert, ist es gut das zu tun. Sonst könnten Sie die Datei "anfassen", wenn sie nicht existiert und Sie würden keine Daten verlieren, und Sie könnten Dateizeilen mit 'wc -l message.text | awk '{print $ 1}' 'und handle es nach' tail -f -n' – vik

1

dieses Modul ist eine Implementierung des Prinzips @hasanyasin schlägt vor:

https://github.com/felixge/node-growing-file

+0

Danke, es sieht so aus, als würde es hier gut funktionieren und die anderen Projekte von felixge sind solide, also bin ich froh, dieses Modul auszuprobieren. –

0

Ich nahm die Antwort von @hasanyasin und wickelte es ein in ein modulares Versprechen. Die Grundidee ist, dass Sie eine Datei und eine Handler-Funktion übergeben, die etwas mit dem Stringed-Buffer tut, der aus der Datei gelesen wird. Wenn die Handler-Funktion wahr zurückgibt, wird die Datei nicht mehr gelesen. Sie können auch eine Zeitüberschreitung festlegen, die das Lesen verhindert, wenn der Handler nicht schnell genug wahr zurückgibt.

Der Promiler wird true zurückgeben, wenn die resolve() wegen Zeitüberschreitung aufgerufen wurde, andernfalls wird false zurückgegeben.

Siehe unten für Anwendungsbeispiel.

// https://stackoverflow.com/a/11233045 

var fs = require('fs'); 
var Promise = require('promise'); 

class liveReaderPromiseMe { 
    constructor(file, buffStringHandler, opts) { 
     /* 
      var opts = { 
       starting_position: 0, 
       byte_size: 256, 
       check_for_bytes_every_ms: 3000, 
       no_handler_resolution_timeout_ms: null 
      }; 
     */ 

     if (file == null) { 
      throw new Error("file arg must be present"); 
     } else { 
      this.file = file; 
     } 

     if (buffStringHandler == null) { 
      throw new Error("buffStringHandler arg must be present"); 
     } else { 
      this.buffStringHandler = buffStringHandler; 
     } 

     if (opts == null) { 
      opts = {}; 
     } 

     if (opts.starting_position == null) { 
      this.current_position = 0; 
     } else { 
      this.current_position = opts.starting_position; 
     } 

     if (opts.byte_size == null) { 
      this.byte_size = 256; 
     } else { 
      this.byte_size = opts.byte_size; 
     } 

     if (opts.check_for_bytes_every_ms == null) { 
      this.check_for_bytes_every_ms = 3000; 
     } else { 
      this.check_for_bytes_every_ms = opts.check_for_bytes_every_ms; 
     } 

     if (opts.no_handler_resolution_timeout_ms == null) { 
      this.no_handler_resolution_timeout_ms = null; 
     } else { 
      this.no_handler_resolution_timeout_ms = opts.no_handler_resolution_timeout_ms; 
     } 
    } 


    startHandlerTimeout() { 
     if (this.no_handler_resolution_timeout_ms && (this._handlerTimer == null)) { 
      var that = this; 
      this._handlerTimer = setTimeout(
       function() { 
        that._is_handler_timed_out = true; 
       }, 
       this.no_handler_resolution_timeout_ms 
      ); 
     } 
    } 

    clearHandlerTimeout() { 
     if (this._handlerTimer != null) { 
      clearTimeout(this._handlerTimer); 
      this._handlerTimer = null; 
     } 
     this._is_handler_timed_out = false; 
    } 

    isHandlerTimedOut() { 
     return !!this._is_handler_timed_out; 
    } 


    fsReadCallback(err, bytecount, buff) { 
     try { 
      if (err) { 
       throw err; 
      } else { 
       this.current_position += bytecount; 
       var buff_str = buff.toString('utf-8', 0, bytecount); 

       var that = this; 

       Promise.resolve().then(function() { 
        return that.buffStringHandler(buff_str); 
       }).then(function(is_handler_resolved) { 
        if (is_handler_resolved) { 
         that.resolve(false); 
        } else { 
         process.nextTick(that.doReading.bind(that)); 
        } 
       }).catch(function(err) { 
        that.reject(err); 
       }); 
      } 
     } catch(err) { 
      this.reject(err); 
     } 
    } 

    fsRead(bytecount) { 
     fs.read(
      this.file, 
      new Buffer(bytecount), 
      0, 
      bytecount, 
      this.current_position, 
      this.fsReadCallback.bind(this) 
     ); 
    } 

    doReading() { 
     if (this.isHandlerTimedOut()) { 
      return this.resolve(true); 
     } 

     var max_next_bytes = fs.fstatSync(this.file).size - this.current_position; 
     if (max_next_bytes) { 
      this.fsRead((this.byte_size > max_next_bytes) ? max_next_bytes : this.byte_size); 
     } else { 
      setTimeout(this.doReading.bind(this), this.check_for_bytes_every_ms); 
     } 
    } 


    promiser() { 
     var that = this; 
     return new Promise(function(resolve, reject) { 
      that.resolve = resolve; 
      that.reject = reject; 
      that.doReading(); 
      that.startHandlerTimeout(); 
     }).then(function(was_resolved_by_timeout) { 
      that.clearHandlerTimeout(); 
      return was_resolved_by_timeout; 
     }); 
    } 
} 


module.exports = function(file, buffStringHandler, opts) { 
    try { 
     var live_reader = new liveReaderPromiseMe(file, buffStringHandler, opts); 
     return live_reader.promiser(); 
    } catch(err) { 
     return Promise.reject(err); 
    } 
}; 

Dann den obigen Code wie folgt verwenden:

var fs = require('fs'); 
var path = require('path'); 
var Promise = require('promise'); 
var liveReadAppendingFilePromiser = require('./path/to/liveReadAppendingFilePromiser'); 

var ending_str = '_THIS_IS_THE_END_'; 
var test_path = path.join('E:/tmp/test.txt'); 

var s_list = []; 
var buffStringHandler = function(s) { 
    s_list.push(s); 
    var tmp = s_list.join(''); 
    if (-1 !== tmp.indexOf(ending_str)) { 
     // if this return never occurs, then the file will be read until no_handler_resolution_timeout_ms 
     // by default, no_handler_resolution_timeout_ms is null, so read will continue forever until this function returns something that evaluates to true 
     return true; 
     // you can also return a promise: 
     // return Promise.resolve().then(function() { return true; }); 
    } 
}; 

var appender = fs.openSync(test_path, 'a'); 
try { 
    var reader = fs.openSync(test_path, 'r'); 
    try { 
     var options = { 
      starting_position: 0, 
      byte_size: 256, 
      check_for_bytes_every_ms: 3000, 
      no_handler_resolution_timeout_ms: 10000, 
     }; 

     liveReadAppendingFilePromiser(reader, buffStringHandler, options) 
     .then(function(did_reader_time_out) { 
      console.log('reader timed out: ', did_reader_time_out); 
      console.log(s_list.join('')); 
     }).catch(function(err) { 
      console.error('bad stuff: ', err); 
     }).then(function() { 
      fs.closeSync(appender); 
      fs.closeSync(reader); 
     }); 

     fs.write(appender, '\ncheck it out, I am a string'); 
     fs.write(appender, '\nwho killed kenny'); 
     //fs.write(appender, ending_str); 
    } catch(err) { 
     fs.closeSync(reader); 
     console.log('err1'); 
     throw err; 
    } 
} catch(err) { 
    fs.closeSync(appender); 
     console.log('err2'); 
    throw err; 
}