2017-10-04 1 views
0

Ich habe den folgenden Code:beobachtbaren Warten Sie für alle mit rxjs

this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => { 

    let hangars: IHangar[] = result.arguments[0]; 

    for (let hangar of hangars) { 
     this.pieceService.getGroupedPieces(hangar.pieces).subscribe(group => hangar.groupedPieces = group); 
    } 

    this.hangars$.next(hangars); 
}, (ex: any) => this.hangars$.error(ex)); 

Also im Grunde sendScopeCommand ist etwas über ein websocket Senden und die then Funktion ausgeführt wird, wenn eine Antwort auf die websocket empfangen wird. An diesem Punkt erhalte ich ein Array von Objekten, die ich in hangars eingegeben habe.

In diesen Objekten habe ich ein Array aller Stücke, die der Spieler besitzt. Es könnte mehrere Stücke mit demselben Stücktyp geben, also habe ich eine Funktion gemacht, um sie zu gruppieren: getGroupedPieces. Sein Code ist der folgende:

public getGroupedPieces(pieces: IPiece[]): Observable<IGroupedPiece[]> { 
    return Observable 
     .from(pieces) 
     .groupBy(p => p.pieceTypeId) 
     .flatMap(p => p.toArray()) 
     .map(p => { return <IGroupedPiece>{ amount: p.length, piece: p[0] }; }) 
     .toArray(); 
} 

Dieser Code funktioniert, aber ich bin ziemlich sicher, dass es nicht korrekt ist. In der Tat, ich denke, dass hangars ist auf der beobachtbaren bereits vor dem beobachtbaren in der for Schleife abgeschlossen sind.

Was ich hier möchte, ist warten auf alle diese beobachtbaren zu vervollständigen, bevor hangars auf dem Observable emittieren.

+0

Warum nicht emittieren Das Ereignis, wenn der Abonnent angerufen wird? –

Antwort

0

denke ich, können Sie versuchen, RxJS des forkJoin Operator zu verwenden:

this.hubService.sendScopedCommand(Constants.hangarCommands.getHangarsOfPlayer).then((result: ICommand) => { 

    let hangars: IHangar[] = result.arguments[0]; 
    let hangars$$: Observable<IHangar>[] = hangars.map(hangar => { 
     return this.pieceService.getGroupedPieces(hangar.pieces) 
    }) 

    Observable 
     .forkJoin(...hangars$$) 
     .subscribe(groups => { 
     groups.forEach((group, i) => hangars[i].groupedPieces = group) 
     this.hangars$.next(hangars); 
     }) 

}, (ex: any) => this.hangars$.error(ex)); 
+0

Super, danke – ssougnez

2

Ich persönlich versuche, so wenig wie möglich zu abonnieren, und vor allem mögliche Verschachtelung von Abonnements zu vermeiden.

Ich warf eine schnelle eckige Komponente zusammen, um ein Beispiel zu geben, wie ich damit umgehen würde.

leid, wenn sein rohe oder schlechte Rechtschreibung (gebrochenes Schlüsselbein mit nur einer Hand zu arbeiten.)

import { Component, OnInit } from '@angular/core'; 
import { Observable, Subject } from 'rxjs'; 

interface hangar { 
    pieces: number[]; 
    groupedPieces: number[]; 
} 

@Component({ 
    selector: 'app-root', 
    templateUrl: './app.component.html', 
    styleUrls: ['./app.component.css'] 
}) 
export class AppComponent { 
    title = 'app works!'; 
    //output mock 
    hangars$: Subject<hangar[]> = new Subject<hangar[]>(); 

    ngOnInit() { 
     //proof of functionality 
     this.hangars$.subscribe(h => console.log(h)); 
    } 

    //method to mock the then call from example 
    start() { 

     //mock some data 
     let hangars: hangar[] = [{ pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }, { pieces: [1, 2, 3], groupedPieces: null }]; 

     //subject to handle observable clean up 
     let subManagement$: Subject<any> = new Subject<any>(); 
     let obsArr: Observable<number[]>[] = []; 

     //here were going to build an array the observables but not subscribe to them yet 
     hangars.forEach(hangar => 
      obsArr.push(
       this.getGroupedPieces(hangar.pieces) 
        .takeUntil(subManagement$) 
        .do(group => hangar.groupedPieces = group) 
      ) 
     ); 

     //real magic, this waits for all of the observables responses before emitting its value 
     Observable.combineLatest(obsArr).subscribe(
      () => this.hangars$.next(hangars), 
      null, 
      () => subManagement$.next()//cleanup 
     ); 
    } 

    //mock out your service 
    private getGroupedPieces(pcs: number[]): Observable<number[]> { 

     return Observable.of([1, 2, 3, 4]).delay(1000); 
    } 
} 
+0

Wenn Sie Observable auf Versprechungen ändern, können Sie 'Promise.all' verwenden, um auf alle Versprechungen zu warten und fortzufahren. – Dekonunes

Verwandte Themen