2017-01-19 2 views
11

Ich Observablen in ein Array wie solche drängen ...RXJS für alle Observablen in einem Array Warten Sie abzuschließen (oder Fehler)

var tasks$ = []; 
tasks$.push(Observable.timer(1000)); 
tasks$.push(Observable.timer(3000)); 
tasks$.push(Observable.timer(10000)); 

Ich möchte eine beobachtbare dass aussendet, wenn alle Aufgaben abgeschlossen $. Denken Sie daran, dass Aufgaben in der Praxis keine bekannte Anzahl von Observablen haben.

Ich habe versucht Observable.zip(tasks$).subscribe(), aber das scheint zu scheitern, wenn es nur 1 Aufgabe gibt, und führt mich zu der Annahme, dass ZIP eine gerade Anzahl von Elementen erfordert, um so zu arbeiten, wie ich es erwarten würde.

Ich habe versucht, Observable.concat(tasks$).subscribe(), aber das Ergebnis der concat-Operator scheint nur eine Reihe von Observablen ... z. im Grunde das gleiche wie die Eingabe. Sie können nicht einmal bei ihm abonnieren.

in C# wäre dies Task.WhenAll() ähnlich. in ES6 versprechen es wäre Promise.all().

Ich bin auf eine Reihe von SO Fragen gestoßen, aber sie alle scheinen sich damit zu befassen, auf eine bekannte Anzahl von Streams zu warten (z. B. sie zusammen zu kartieren).

+1

Dies hängt davon ab, was Sie tun möchten, wenn eines der Observables eine Fehlermeldung sendet. Möchten Sie den Fehler einfach ignorieren oder bedeutet dies, dass das gesamte Ergebnis verworfen wird und Sie nur den Fehler erhalten. – martin

Antwort

26

Wenn Sie eine beobachtbare komponieren möchten, dass emittiert, wenn alle von der Quelle vollständig Observablen, können Sie forkJoin verwenden:

import { Observable } from 'rxjs/Observable'; 
import 'rxjs/add/observable/forkJoin'; 
import 'rxjs/add/operator/first'; 

var tasks$ = []; 
tasks$.push(Observable.timer(1000).first()); 
tasks$.push(Observable.timer(3000).first()); 
tasks$.push(Observable.timer(10000).first()); 
Observable.forkJoin(...tasks$).subscribe(results => { console.log(results); }); 
+6

Seien Sie vorsichtig, wenn 'tasks' dynamisch erstellt wird und leer ist, stoppt forkJoin die beobachtbare Sequenz. Siehe meine Antwort hier für weitere Informationen http://StackOverflow.com/a/42622968/1224564 – bgondy

+0

O der dritten beobachtbaren war nicht in der Hauptstadt wie erklärt, so dass es beobachtbar ergab, ist undefined. Außerdem fehlt den Tasks das abschließende Dollarzeichen vor dem Abonnement. Ich kann die Antwort nicht bearbeiten, weil: Bearbeitungen müssen mindestens 6 Zeichen lang sein; Gibt es in diesem Beitrag noch etwas zu verbessern? –

+0

Ich will dich nicht für immer nörgeln, aber ich denke, das Ergebnis wird nicht angezeigt, weil der Fluss der beobachtbaren nicht vollständig ist. Möglicherweise fügen Sie einfach .first() nach dem Timer-Aufruf oder .take (n) hinzu, um ein interessanteres Ergebnis zu erhalten. forkJoin war der Operator, den ich suchte, danke @cartant! –

0

Für mich sample war beste Lösung.

const source = Observable.interval(500); 
const example = source.sample(Observable.interval(2000)); 
const subscribe = example.subscribe(val => console.log('sample', val)); 

Also .. nur wenn zweite (Beispiel) emittieren - Sie sehen zuletzt ausgegebenen Wert der ersten (Quelle).

In meiner Aufgabe warte ich Formvalidierung und andere DOM-Ereignis.

Verwandte Themen