2017-02-04 3 views
1

Ich habe ein Observable, das ich verwende, um ein Versprechen in ein Abonnement zu konvertieren. Dies führt zu einer Sammlung, die ich durchlaufen muss, um einen HTTP-Dienst für jedes Element aufzurufen. Ich benutze forkJoin, um auf alle diese Anrufe zu warten, damit ich etwas anderes machen kann, aber leider wird mein Abonnement nicht aufgerufen. Siehst du, was ich hier vermisse?Beobachtbares Abonnement wird nicht aufgerufen

Observable.fromPromise(this.users.getElements()).subscribe(results => { 
    Observable.forkJoin(
    results.map(
     aUser => this.HttpService.submitUser(aUser).subscribe(
     results => { 
      this.progress += 1; 
     }, 
     err => { 
      this.progress += 1; 
      this.handleError(<any>err); 
     }) 
    ).subscribe(
     //it never gets to either of these calls after all service calls complete 
     data => { 
     debugger; 
     console.log(data); 
     this.reset(); 
     }, 
     err => { 
     debugger; 
     console.log(err); 
     this.reset(); 
     } 
    )); 
}); 

Antwort

2

Eine Sache ist, dass Sie nicht jede Observable an forkJoin() übergeben abonnieren. Der Betreiber muss es selbst tun.

Wenn Sie benachrichtigt werden möchten, wenn jede Observable abgeschlossen ist, können Sie .do(undefined, undefined,() => {...}) verwenden.

let observables = [ 
    Observable.of(42).do(undefined, undefined,() => console.log('done')), 
    Observable.of('a').delay(100).do(undefined, undefined,() => console.log('done')), 
    Observable.of(true).do(undefined, undefined,() => console.log('done')), 
]; 

Observable.forkJoin(observables) 
    .subscribe(results => console.log(results)); 

Diese Drucke zu trösten:

done 
done 
done 
[ 42, 'a', true ] 

Schließlich gibt es auch .finally() Operator. Es ist jedoch nicht dasselbe wie die Verwendung von .do().

EDIT:

Wenn eine der Quelle Observable versagen die forkJoin() Operator reemits den Fehler (das heißt, sie scheitert auch).
Dies bedeutet, Sie müssen Fehler in jeder Quelle Observable getrennt (mit catch() Operator zum Beispiel) zu fangen.

let observables = [ 
    Observable.throw(new Error()) 
    .catch(() => Observable.of('caught error 1')) 
    .do(undefined, undefined,() => console.log('done 1')), 

    Observable.of('a') 
    .delay(100).catch(() => Observable.of('caught error 2')) 
    .do(undefined, undefined,() => console.log('done 2')), 

    Observable.of(true) 
    .catch(() => Observable.of('caught error 3')) 
    .do(undefined, undefined,() => console.log('done 3')), 
]; 

Observable.forkJoin(observables) 
    .subscribe(results => console.log(results)); 

Welche druckt:

done 1 
done 3 
done 2 
[ 'caught error 1', 'a', true ] 
+0

Ok, ich verstehe, was du meinst, aber wenn eine dieser Observablen eine Ausnahme hat, nimmt der forkJoin sie sofort auf, bevor die anderen fertig sind.Ich dachte, dass forkJoin warten sollte, bis alle fertig sind, aber von der Quelle sieht es so aus, als würde es "fertig" markieren, anstatt zum nächsten zu gehen. In meinem Beispiel möchte ich, dass jede Observable den Fehler elegant behandelt und fortfährt. Gibt es eine Möglichkeit, das zu tun? – occasl

+0

@occasl mein Update sehen. – martin

0

Ich glaube nicht, dass Sie in der Karte abonnieren müssen.

Observable.fromPromise(this.users.getElements()).subscribe(results => { 
    Observable.forkJoin(
    results.map(
     aUser => this.HttpService.submitUser(aUser)) 
    ).subscribe(
     //it never gets to either of these calls after all service calls complete 
     data => { 
     debugger; 
     console.log(data); 
     this.reset(); 
     }, 
     err => { 
     debugger; 
     console.log(err); 
     this.reset(); 
     } 
    )); 
}); 

Beachten Sie, dass in dem rxjs Beispiel hier:

https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/forkjoin.md

sie zeichnen nicht auf die einzelnen Observablen - ForkJoin sie wird alles geht, wartet dann auf alle von ihnen (zurückzukehren in Ihrem subscribe)

EDIT:.

Die forkjoin Quelle ist hier:

https://github.com/Reactive-Extensions/RxJS/blob/master/src/core/linq/observable/forkjoin.js

und es sieht nicht so aus, als ob es Haken hat, um herauszufinden, wann jeder fertig ist. Ich denke, der beste Weg, sich einer UI-Leiste zu nähern, wäre, jede der zugeordneten Observablen einzeln zu abonnieren, die alle eine Funktion aufrufen, um die UI-Zählbalkenvariable zu inkrementieren und einen Test auf "Vollständigkeit" durchzuführen, der es erlaubt, die Daten zu verwenden.

+0

Ja, aber dann, wie werde ich meine Zählerinkrement (für UI Fortschrittsbalken), wie jeder vervollständigt? Im Grunde möchte ich etwas tun, wenn jeder fertig ist und dann noch etwas, wenn alle fertig sind. – occasl

+0

Ich habe mir nur die Quelle von forfjoin angeschaut, und ich glaube nicht, dass sie irgendetwas offenlegt, was es erlaubt, sich einzuklinken und zu sehen, wann jeder reinkommt. Ich denke, man müsste jede dieser Karten einzeln abonnieren und eine Art von Call nennen Zählfunktion für Ihre UI-Fortschrittsleiste. – chrispy