2014-09-30 10 views
5

Ich versuche RxJS zu verwenden, um ein Skript zu schreiben, um mehrere Hunderte von Protokolldateien zu verarbeiten, von denen jede ungefähr 1 GB ist. Das Skelett des Skripts sieht aus wieWie kann die Parallelität von flatMap eingeschränkt werden?

Rx.Observable.from(arrayOfLogFilePath) 
.flatMap(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Der Code funktioniert, merkt aber, dass der Filterschritt aller Protokolldateien gleichzeitig starten. Aus Sicht der E/A-Performance des Dateisystems ist es jedoch vorzuziehen, eine Datei nach der anderen zu verarbeiten (oder zumindest die Nebenläufigkeit auf einige Dateien zu beschränken, anstatt alle hunderten von Dateien gleichzeitig zu öffnen). Wie kann ich es in diesem Sinne "funktional reaktiv" umsetzen?

Ich hatte an Scheduler gedacht, konnte aber nicht herausfinden, wie es hier helfen kann.

+0

Ich habe die gleiche Frage, aber mit Rx.NET. Ist es möglich? http://stackoverflow.com/questions/37345516/limiting-concurrent-requests-using-rx-and-selectmany – SuperJMN

Antwort

12

Sie können .merge(maxConcurrent) verwenden, um die Parallelität zu begrenzen. Weil .merge(maxConcurrent) eine Metaobservable (beobachtbar von Observablen) in eine Observable abflacht, müssen Sie die .flatMap durch .map ersetzen, so dass die Ausgabe eine Metaobservable ("unflat") ist, dann rufen Sie .merge(maxConcurrent).

Rx.Observable.from(arrayOfLogFilePath) 
.map(function(logFilePath){ 
    return Rx.Node.fromReadStream(logFilePath) 
    .filter(filterLogLine) 
}) 
.merge(2) // 2 concurrent 
.groupBy(someGroupingFunc) 
.map(someFurtherProcessing) 
.subscribe(...) 

Dieser Code wurde nicht getestet (da ich Sie haben keinen Zugriff auf die Umwelt Entwicklung haben), aber das ist, wie Sie vorgehen. RxJS hat nicht viele Operatoren mit Parallelitätsparametern, aber Sie können fast immer tun, was Sie mit .merge(maxConcurrent) benötigen.

+1

Dies ist genau das, was ich versuche zu arbeiten. Ich habe eine Liste von 500 URLs zum Laden und möchte nicht alle Anfragen gleichzeitig starten. Ich habe Karte (5) benutzt, aber es funktioniert nicht ... Alle Anfragen werden gleichzeitig gemacht. – Roaders

+0

@Roaders hast du diese Lösung zu arbeiten? Ich versuche es genauso. Aber alle Anfragen werden gleichzeitig ausgelöst. Ich habe überall gegoogelt und nichts gefunden. – Diego

+0

Wenn Sie zum Beispiel einen asynchronen http-Aufruf durchführen, müssen Sie ihn in eine Rx.defer() umbrechen, damit Rx entscheiden kann, wann der Aufruf erfolgt (und es erneut versuchen, wenn es zum Beispiel fehlschlägt). – Roaders

0

Ich habe gerade ein ähnliches Problem mit RxJs 5 gelöst, also hoffe ich, dass die Lösung anderen mit einem ähnlichen Problem helfen kann.

// Simulate always processing 2 requests in parallel (when one is finished it starts processing one more), 
 
// retry two times, push error on stream if retry fails. 
 

 
//const Rx = require('rxjs-es6/Rx'); 
 

 
// -- Global variabel just to show that it works. -- 
 
let parallelRequests = 0; 
 
// -------------------------------------------------- 
 

 
function simulateRequest(req) { 
 
    console.log("Request " + req); 
 
    // --- To log retries --- 
 
    var retry = 0; 
 
    // ---------------------- 
 

 
    // Can't retry a promise, need to restart before the promise is made. 
 
    return Rx.Observable.of(req).flatMap(req => new Promise((resolve, reject) => { 
 

 
     var random = Math.floor(Math.random() * 2000); 
 
     // -- To show that it works -- 
 
     if (retry) { 
 
      console.log("Retrying request " + req + " ,retry " + retry); 
 
     } else { 
 

 
      parallelRequests++; 
 
     } 
 
     // --------------------------- 
 
     setTimeout(() => { 
 
      if (random < 900) { 
 
       retry++; 
 
       return reject(req + " !!!FAILED!!!"); 
 
      } 
 

 
      return resolve(req); 
 
     }, random); 
 
    })).retry(2).catch(e => Rx.Observable.of(e)); 
 
} 
 

 
Rx.Observable.range(1, 10) 
 
    .flatMap(e => simulateRequest(e), null, 2) 
 
    // -- To show that it works -- 
 
    .do(() => { 
 
     console.log("ParallelRequests " + parallelRequests); 
 
     parallelRequests--; 
 
    }) 
 
    // --------------------------- 
 
    .subscribe(e => console.log("Response from request " + e), e => console.log("Should not happen, error: " + e), e => console.log("Finished"));
<script src="https://npmcdn.com/@reactivex/[email protected]/dist/global/Rx.umd.js"></script>

Verwandte Themen