2017-02-13 1 views
0

Ich möchte mehr über das RxJS-Konzept erfahren. Hier ist das Problem, mit dem ich gerade kämpfe. Ich möchte die Zugriffe auf einen DB mit Async-Aufrufen abstrahieren und die Zugriffe synchronisieren.Async-Aktionen mit RxJS synchronisieren

Kann ich einen Strom von Aktionen haben,
- das async tun nennt
db - ich will die nächste Aktion verzögert werden, bis die vorherige Aktion abgeschlossen ist
- der Anrufer der Maßnahme wird eine beobachtbare erhalten für das Ergebnis der asynchronen Aktionen.

Beispiel:

Benutzer der Klasse ruft Action1: Lesen DB Artikel berechnen nächsten Zustand (zB Erhöhung Feld), schreiben Sie an DB
dann ..
Benutzer der Klasse ruft nächsten Aktionen (Action2), aber Action1 ist noch in Bearbeitung.
Aktion2: DB lesen (darf nicht gestartet werden, bevor Aktion1: Schreiben beendet ist)

Wie kann das mit RxJS + Typescript gemacht werden?

Frank

/////////////////////////////////

Inzwischen habe ich dieses Code:

import * as Rx from 'rxjs'; 

var actionQueue = new Rx.Subject<() => Rx.Observable<any>>(); 
actionQueue 
    .concatMap(v => v()) 
    .subscribe(v => {}); 

// example action with result type number 
function action1 (v : number) : Rx.Observable<number> { 
    console.log(':: action1: ', v); 
    var res = new Rx.Subject<number>(); 
    actionQueue.next(() => { 
    console.log('>> action1: ', v); 
    setTimeout(()=>{ 
     console.log('<< action1: ', v); 
     res.next(v); 
     res.complete(); 
    }, 500); 
    return res; 
    }); 
    return res; 
} 

// some actions enqueue now, after 700+2500ms 
action1(11).subscribe(v => console.log('XX action1: ', v)); 
action1(22).subscribe(v => console.log('XX action1: ', v)); 
action1(33).subscribe(v => console.log('XX action1: ', v)); 

setTimeout(()=>{ 
    action1(44).subscribe(v => console.log('XX action1: ', v)); 
}, 700); 

setTimeout(()=>{ 
    action1(55).subscribe(v => console.log('XX action1: ', v)); 
}, 2500); 

Die Ausgabe zeigt, dass es sequentielle Sachen tut.
Wie Typoskript/js noob ... hat dieser Code Tücken? Gibt es einen eleganteren Weg?

Frank

Antwort

0

Wie über den delayWhen() Operator?

// Observable wrapping action1. 
const obsAction1 = Observable.create(observer => { 
    // Read DB item 
    // Calculate next state 
    // Write to DB 
    // Then: 
    observer.complete(); 
}); 

// Private observable wrapping action2. 
// DO NOT subscribe to it directly. 
const _obsAction2 = Observable.create(observer => { 
    // Read DB 
    // Then: 
    observer.complete(); 
}); 

// Public observable wrapping action2 AND delayed by action 1. 
// This is what the client code should subscribe to. 
const obsAction2 = _obsAction2.delayWhen(obsAction1); 

Jetzt raubend der Code die Observablen:

obsAction1.subscribe(val => console.log(val)); 

// Values will be received only when `obsAction1` emits or completes. 
obsAction2.subscribe(val => console.log(val));