Ich glaube, um genau das zu bekommen, was Sie wünschen, würden Sie Ihren eigenen Betreiber erstellen müssen. Brechen von RxJS bekommen Sie können leicht so etwas wie (Warnung, nicht getestet) ...
export class BusyBuffer<T> {
private itemQueue = new Subject<T>();
private bufferTrigger = new Subject<{}>();
private busy = false;
constructor(consumerCallback: (items: T[]) => Promise<void>) {
this.itemQueue.buffer(this.bufferTrigger).subscribe(items => {
this.busy = true;
consumerCallback(items).then(() => {
this.busy = false;
this.bufferTrigger.next(null);
});
});
}
submitItem(item: T) {
this.itemQueue.next(item);
if(!busy) {
this.bufferTrigger.next(null);
}
}
}
, die dann verwendet werden können, wie
let busyBuffer = new BusyBuffer<T>(items => {
return database.insertRecords(items);
});
items.subscribe(item => busyBuffer.submitItem(item));
Es ist nicht genau, obwohl rein reaktiv ist und jemand kann in der Lage sein, sich etwas besseres auszudenken.
Und was ist das Problem? Es gibt die Operatoren 'buffer',' bufferToggle' oder 'bufferWhen'. – martin
Das Problem ist, dass ich keine Ahnung habe, wie man sie benutzt. Versuche herauszufinden, weiß aber noch nicht. –
Verwenden Sie 'concatMap' und geben Sie das Versprechen aus der Projektfunktion zurück. 'concatMap' wird die Pufferung für Sie übernehmen, aber in RxJS gibt es keinen Gegendruck. Wenn Ihre Daten also schneller ankommen als Sie schreiben können, werden Sie den Speicher voll auslasten. – cartant