2017-08-17 3 views
1

Ich möchte mehrere Observables kombinieren/zusammenführen und wenn jeder von ihnen abgeschlossen ist, führen Sie eine schließlich Funktion. Der Operator merge scheint jedes Abonnement parallel auszuführen, was ich brauche, aber wenn einer von ihnen einen Fehler auslöst, wird die Ausführung angehalten.rxjs5 Merge und Fehlerbehandlung

RxJS Version 4 hat einen Operator mergeDelayError, die alle Abonnements Ausführung bis alle von ihnen sind abgeschlossen halten sollte, aber dieser Operator in Version 5 nicht implementiert.

Sollte ich zu einem anderen Operator zurückkehren?

var source1 = Rx.Observable.of(1,2,3).delay(3000); 
var source2 = Rx.Observable.throw(new Error('woops')); 
var source3 = Rx.Observable.of(4,5,6).delay(1000); 

// Combine the 3 sources into 1 

var source = Rx.Observable 
    .merge(source1, source2, source3) 
    .finally(() => { 

    // finally is executed before all 
    // subscriptions are completed. 

    console.log('finally'); 

    }); 

var subscription = source.subscribe(
    x => console.log('next:', x), 
    e => console.log('error:', e), 
() => console.log('completed')); 

JSBin

Antwort

2

Ich glaube, Sie das gleiche Verhalten catch() unter Verwendung simulieren kann.

const sources = [source1, source2, source3].map(obs => 
    obs.catch(() => Observable.empty()) 
); 

Rx.Observable 
    .merge(sources) 
    .finally(...) 
    ... 
+0

Danke - der 'Fang' scheint jedoch nicht zu funktionieren. Stattdessen habe ich einfach alles mit 'onErrorResumeNext' gemappt, was in meiner Situation okay ist. – null

+0

@null Der Operator 'catch()' funktioniert nicht wie? Ich denke, es sollte funktionieren ... – martin

+0

Ich bin mir nicht sicher warum. Siehe hierzu JSBin: [http://jsbin.com/qaluyaq/edit?js,console](http://jsbin.com/qaluyaq/edit?js,console) – null

1

Wenn Sie nicht wollen, Ihre Fehler schlucken, sondern wollen, dass sie bis zum Ende verzögern, können Sie: Sie müssen nur es beobachtbar jeder Quelle anhängen müssen

const mergeDelayErrors = []; 
const sources = [source1, source2, source3].map(obs => obs.catch((error) => { 
    mergeDelayErrors.push(error); 
    return Rx.Observable.empty(); 
})); 

return Rx.Observable 
    .merge(...sources) 
    .toArray() 
    .flatMap(allEmissions => { 
    let spreadObs = Rx.Observable.of(...allEmissions); 
    if (mergeDelayErrors.length) { 
     spreadObs = spreadObs.concat(Rx.Observable.throw(mergeDelayErrors)); 
    } 
    return spreadObs; 
    }) 

Möglicherweise möchten Sie nur den ersten Fehler auslösen oder eine CompositeError erstellen. Ich bin mir nicht sicher, wie sich mergeDelayErrors ursprünglich verhalten hat, als mehrere Fehler ausgelöst wurden.

Leider muss diese Implementierung warten, bis alle Observables abgeschlossen sind, bevor Fehler ausgegeben werden, und es wartet auch, bis alle Observablen abgeschlossen sind, bevor sie als nächstes gesendet werden. Dies ist wahrscheinlich nicht das ursprüngliche Verhalten von mergeDelayError, das als Stream ausgegeben werden soll, anstatt sie alle am Ende auszustrahlen.

+0

Danke ja wie du willst –

0

Wir können vermeiden, den Stream zu blockieren, indem wir die Fehler sammeln und am Ende ausstrahlen.

function mergeDelayError(...sources) { 
    const errors = []; 
    const catching = sources.map(obs => obs.catch(e => { 
    errors.push(e); 
    return Rx.Observable.empty(); 
    })); 
    return Rx.Observable 
    .merge(...catching) 
    .concat(Rx.Observable.defer(
    () => errors.length === 0 ? Rx.Observable.empty() : Rx.Observable.throw(errors))); 
} 


const source1 = Rx.Observable.of(1,2,3); 
const source2 = Rx.Observable.throw(new Error('woops')); 
const source3 = Rx.Observable.of(4,5,6); 

mergeDelayError(source1, source2, source3).subscribe(
    x => console.log('next:', x), 
    e => console.log('error:', e), 
() => console.log('completed'));