2017-08-09 1 views
3

Problem: Ich möchte Rxjs Zusammensetzung (Funktionskette) erstellen, die Pufferwerte von einem Observable bis bestimmten Ereignis auftreten würde, dann alle gepufferten Werte synchron und dann Pufferung bis zum nächsten Ereignis .RxJS: maising einen Puffer emittieren nach bestimmten Ereignis

Ich verwende diese Funktion, um alle HTTP-Anfragen zu sammeln, die warten müssen, bis meine Anwendung einen Aufruf zur Autorisierung macht. Und dann führe all diese Anfragen aus. (Es ist in Angular4 HttpClient Interceptor implementiert), das ist mein Anwendungsfall, aber ich suche generell nach Lösungen, wie man solche Rx-Ketten erstellt.

Warum Rxjs Puffer ist nicht enoyugh. Von dem, was ich lese und getestet Puffer erfordert entweder genaue Zeitrahmen, oder im Falle eines Schedulers anstelle von Zeit als Parameter, wird es wieder Scheduler nach Erkennen der "Event" Propagation des letzten Schedulers. Und ich möchte, dass es wie folgt funktioniert: Wenn eine erste Anfrage erscheint, starte ich einen Puffer und abonniere den Scheduler, nachdem der Scheduler sendet, stoppe die Pufferung, gebe alle gepufferten Werte zurück und warte bis die nächste neue Anfrage gemacht wird, die Pufferung wieder zu starten um den Scheduler erneut zu starten.

Gerade jetzt meine Lösung verwendet das ist Helper Object entweder nicht definiert oder meine beobachtbaren, mit Code etwa wie folgt:

private observable: Observable<boolean>; 

makeRequest(): Observable<boolean> { 
    if (this.observable !== void 0) { 
     return this.observable; 
    } else { 
     this.observable = this.authenticationReuqest() 
      .share() 
      .finally(() => this.observable = void 0); 

     return this.observable; 
    } 
} 

Auf diese Weise Art ich meine Anfragen puffern, durch maiking sie .delay(), bis dasselbe Multicast-Observable emittiert, und nachdem es ausgestrahlt wird, reinige ich es einfach auf (obwohl es keine Notwendigkeit für das Abmelden gibt, da es sich schließlich aufräumt, also nach Abschluss oder Fehler).

Wenn jemand eine Idee oder ein Muster hat, wie man diese Lösung durch reine Rxjs ersetzen kann, bin ich interessiert. Ich habe das Gefühl, dass eine Kombination aus Buffer und Zip es möglich machen könnte, obwohl ich die genaue Lösung nicht finden kann.

Dank Tomasz

+0

Der Operator 'buffer()' wird seinem Notifier nicht erneut zugeordnet. Ich denke, was Sie hier beschreiben, ist genau, wie der "Puffer" -Operator gerade funktioniert. – martin

+0

@martin wie in der Dokumentation http://reactivex.io/documentation/operators/buffer angegeben.HTML-Puffer, wenn ein Parameter beschreibendes Zeitfenster (entweder Zeit oder eine andere beobachtbare Zeit) Zeitfenster nacheinander erscheinen lässt. Meine zweite beobachtbare ist Http Anfrage. Ich möchte während des Wartens auf diese Anfrage puffern. Ich möchte diese Anfrage senden, wenn ich den ersten ungepufferten Wert und die Pufferwerte bekomme, bis diese HTTP-Anfrage endet. –

+0

Was ich glaube Puffer ist zu tun, ist ein Zeitrahmen von Observable definiert, aber es ermöglicht keine Interaktion zwischen gepufferter Quelle und dieser Zeitrahmen beobachtbar. Wenn Sie ein genaues Beispiel dafür haben, wie man es einfach mit Pufferfunktionalität implementiert, wäre ich mehr als glücklich, es zu sehen. –

Antwort

0

sollten Sie wahrscheinlich bufferWhen verwenden: es Wert aus einer beobachtbaren puffert, bis ein zweiter emittiert.

+0

Unerwarteter PufferWenn nicht, out of the Box, wieder zu zweiten beobachtbaren nach Neuanmeldung (ungepufferte) Anfrage, Ich würde willkürliche Zeit, nach denen ich wieder abonnieren, um die zweite, oder neu abonnieren sofort nachdem es den vorherigen Anruf beendet. Ich suche nach einem faulen Muster, um den zweiten nur einen pro Puffer zu nennen und nur wenn es nicht in bestimmten Zeiträumen benötigt wird. –

0

ich in genau dem gleichen Problem lief, habe ich Veranstaltungen wie diese

enter image description here

I kick off Laden Ereignisse früh kommen, so dass wir nicht warten müssen. Aber das Problem ist, erst nach einem Punkt (der "Rdy" steht für Ready) möchte ich diese Ereignisse verarbeiten. Also muss das fertige Ereignis zuerst kommen. Ich muss die Werte halten, bis sie fertig sind und sie erneut ausgeben. Wie dies

enter image description here

Hier ist, was ich tue, ich benutze multicast die gleiche Zeichnung in Downstream zu teilen. Erstellen Sie ein bereites Observable in der Selektorfunktion, das das erste ready-Ereignis filtert. Dann füge das bereit beobachtbare und den Puffer der Multicast-Ereignisse zusammen, nimm eins und switchMap von Array zu Werten zurück. Schließlich fassen Sie die Multicast-Ereignisse zusammen. Hier ist der Code

const events = [ 
    'ev1', 
    'ev2', 
    'ready', 
    'ev3', 
    'ev4', 
    'ev5' 
] 
Rx.Observable 
    .interval(500) 
    .take(events.length) 
    .map(i => events[i]) 
    .multicast(
    () => new Rx.Subject(), 
     events$ => { 
      const ready$ = events$ 
      .filter(event => event === 'ready') 
      .take(1) 
      return Rx.Observable 
      .merge(
       ready$, 
       events$ 
        .buffer(ready$) 
        .take(1) 
        .switchMap(events => Rx.Observable.from(events)) 
      ) 
       .concat(events$) 
     } 
) 

Oder Sie können es zu finden und führen Sie es auf RxViz hier https://rxviz.com/v/j8nNpe8N Es ist nicht schön, aber es funktioniert. Ich bin immer noch auf der Suche nach einem besseren Weg, eine Idee?

Verwandte Themen