Ich möchte mehr über das RxJS-Konzept erfahren. Hier ist das Problem, mit dem ich gerade kämpfe. Ich möchte die Zugriffe auf einen DB mit Async-Aufrufen abstrahieren und die Zugriffe synchronisieren.Async-Aktionen mit RxJS synchronisieren
Kann ich einen Strom von Aktionen haben,
- das async tun nennt
db - ich will die nächste Aktion verzögert werden, bis die vorherige Aktion abgeschlossen ist
- der Anrufer der Maßnahme wird eine beobachtbare erhalten für das Ergebnis der asynchronen Aktionen.
Beispiel:
Benutzer der Klasse ruft Action1: Lesen DB Artikel berechnen nächsten Zustand (zB Erhöhung Feld), schreiben Sie an DB
dann ..
Benutzer der Klasse ruft nächsten Aktionen (Action2), aber Action1 ist noch in Bearbeitung.
Aktion2: DB lesen (darf nicht gestartet werden, bevor Aktion1: Schreiben beendet ist)
Wie kann das mit RxJS + Typescript gemacht werden?
Frank
/////////////////////////////////
Inzwischen habe ich dieses Code:
import * as Rx from 'rxjs';
var actionQueue = new Rx.Subject<() => Rx.Observable<any>>();
actionQueue
.concatMap(v => v())
.subscribe(v => {});
// example action with result type number
function action1 (v : number) : Rx.Observable<number> {
console.log(':: action1: ', v);
var res = new Rx.Subject<number>();
actionQueue.next(() => {
console.log('>> action1: ', v);
setTimeout(()=>{
console.log('<< action1: ', v);
res.next(v);
res.complete();
}, 500);
return res;
});
return res;
}
// some actions enqueue now, after 700+2500ms
action1(11).subscribe(v => console.log('XX action1: ', v));
action1(22).subscribe(v => console.log('XX action1: ', v));
action1(33).subscribe(v => console.log('XX action1: ', v));
setTimeout(()=>{
action1(44).subscribe(v => console.log('XX action1: ', v));
}, 700);
setTimeout(()=>{
action1(55).subscribe(v => console.log('XX action1: ', v));
}, 2500);
Die Ausgabe zeigt, dass es sequentielle Sachen tut.
Wie Typoskript/js noob ... hat dieser Code Tücken? Gibt es einen eleganteren Weg?
Frank