2017-10-19 1 views
2

Ich habe einen Datenstrom, mit schnell eingehenden Daten. Ich möchte sie in eine Datenbank einfügen, indem ich Ordnung halte. Ich habe eine Datenbank, die eine Zusage zurückgibt, die gelöst wird, wenn eine Einfügung erfolgreich ist.RxJs Puffer bis Datenbank einfügen (Versprechen)

Ich möchte einen Rx-Stream erstellen, der die neuen Daten puffert, bis die gepufferten Daten eingefügt werden.

Wie kann ich das tun?

+0

Und was ist das Problem? Es gibt die Operatoren 'buffer',' bufferToggle' oder 'bufferWhen'. – martin

+0

Das Problem ist, dass ich keine Ahnung habe, wie man sie benutzt. Versuche herauszufinden, weiß aber noch nicht. –

+1

Verwenden Sie 'concatMap' und geben Sie das Versprechen aus der Projektfunktion zurück. 'concatMap' wird die Pufferung für Sie übernehmen, aber in RxJS gibt es keinen Gegendruck. Wenn Ihre Daten also schneller ankommen als Sie schreiben können, werden Sie den Speicher voll auslasten. – cartant

Antwort

2

Ich glaube, um genau das zu bekommen, was Sie wünschen, würden Sie Ihren eigenen Betreiber erstellen müssen. Brechen von RxJS bekommen Sie können leicht so etwas wie (Warnung, nicht getestet) ...

export class BusyBuffer<T> { 
    private itemQueue = new Subject<T>(); 
    private bufferTrigger = new Subject<{}>(); 
    private busy = false; 

    constructor(consumerCallback: (items: T[]) => Promise<void>) { 
    this.itemQueue.buffer(this.bufferTrigger).subscribe(items => { 
     this.busy = true; 
     consumerCallback(items).then(() => { 
     this.busy = false; 
     this.bufferTrigger.next(null); 
     }); 
    }); 
    } 

    submitItem(item: T) { 
    this.itemQueue.next(item); 
    if(!busy) { 
     this.bufferTrigger.next(null); 
    } 
    } 

} 

, die dann verwendet werden können, wie

let busyBuffer = new BusyBuffer<T>(items => { 
    return database.insertRecords(items); 
}); 
items.subscribe(item => busyBuffer.submitItem(item)); 

Es ist nicht genau, obwohl rein reaktiv ist und jemand kann in der Lage sein, sich etwas besseres auszudenken.

+2

Danke! Ich habe das gleiche erstellt, aber ich hoffe, dass jemand mit einer reinen reaktiven Lösung kommen kann :) –

+0

Kein Schweiß, viel Glück. Ich dachte daran, ein beliebiges Besetzt/freies Signal von der Datenbank zu nehmen und es zurück in die Puffermethode zu leiten, aber Sie müssten auch die Logik hinzufügen, die der Puffer sofort ausgeben sollte, wenn die Datenbank nichts tut. – Pace

+0

Akzeptiere meine Antwort nicht, wenn du etwas Besseres willst. Es macht mir nichts aus. – Pace

Verwandte Themen