2016-07-22 2 views
3

Ich bin eine Datenbank abfragen und die Ergebnisse als zeilenweise Datenstrom der Ereignisse 'db_row_receieved' abrufen. Ich versuche, diese Ergebnisse nach Unternehmens-ID zu gruppieren, bekomme aber keine Ausgabe über das Abonnement.RxJS-Gruppierung emittierte Ereignisse nodejs

Das db-Zeilenformat wird unten angezeigt.

// row 1 
    { 
     companyId: 50, 
     value: 200 
    } 
    // row 2 
    { 
     companyId: 50, 
     value: 300 
    } 
    // row 3 
    { 
     companyId: 51, 
     value: 400 
    } 

Code:

var source = Rx.Observable.fromEvent(eventEmitter, 'db_row_receieved'); 
var grouped = source.groupBy((x) => { return x.companyId; }); 
var selectMany = grouped.selectMany(x => x.reduce((acc, v) => { 
          return acc + v.value; 
          }, 0)); 

var subscription = selectMany.subscribe(function (obs) { 
         console.log("value: ", obs); 
        } 

Erwartete Ausgabe:

value: 500 // from the group with companyId 50 
value: 400 // from the group with companyId 51 

tatsächliche Ausgang: Abonnement nichts ausgibt, funktioniert aber bei der Verwendung von Rx.Observable.fromArray (EinArray)

Kann mir bitte jemand sagen, wo ich falsch gelaufen bin?

+0

Sind Sie sicher, dass 'eventEmitter' tatsächlich Ereignisse mit dem angegebenen Namen ausgibt. Der Rest des Codes sieht gut aus. –

+0

Haben Sie versucht, gruppierte Subscribe (data => {console.log (data);}); um zu sehen, ob die groupeBy funktioniert –

+0

@Yury, Ja, eventEmitter gibt eine Zeile aus. – user1145347

Antwort

1

Also das Problem ist, dass reduce wird nur einen einzelnen Wert erzeugen, wenn der zugrunde liegende Stream complete d. Da ein Ereignisemitter eine Art unendliche Quelle ist, ist er immer aktiv.

Werfen Sie einen Blick auf den folgenden Ausschnitt - das erste Beispiel wird abgeschlossen, das andere nicht.

const data = [ 
 
    {k: 'A', v: 1}, 
 
    {k: 'B', v: 10}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'B', v: 10}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'B', v: 10}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'A', v: 1}, 
 
    {k: 'A', v: 1}, 
 
]; 
 

 
Rx.Observable.from(data) 
 
    .concatMap(d => Rx.Observable.of(d).delay(100)) 
 
    .groupBy(d => d.k) 
 
    .mergeMap(group => group.reduce((acc, value) => { 
 
    acc.sum += value.v; 
 
    return acc; 
 
    }, {key: group.key, sum: 0})) 
 
    .do(d => console.log('RESULT', d.key, d.sum)) 
 
    .subscribe(); 
 
    
 
Rx.Observable.from(data) 
 
    .concatMap(d => Rx.Observable.of(d).delay(100)) 
 
    .merge(Rx.Observable.never()) // MERGIN NEVER IN 
 
    // .take(data.length) // UNCOMMENT TO MITIGATE NEVER 
 
    .groupBy(d => d.k) 
 
    .mergeMap(group => group.reduce((acc, value) => { 
 
    acc.sum += value.v; 
 
    return acc; 
 
    }, {key: group.key, sum: 0})) 
 
    .do(d => console.log('RESULT - NEVER - WILL NOT BE PRINTED', d)) 
 
    .subscribe();
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.0-beta.10/Rx.umd.js"></script>

Ich weiß nicht, Ihren spezifischen Anwendungsfall aber 2 häufigste, was in den Sinn kommt, sind:

  • Verwendung scan (wahrscheinlich mit debounce),
  • Einsatz takeUntil Wenn es ein Ereignis gibt, das das Ende des Undering-Streams anzeigt.