2017-11-27 3 views
1

Ich versuche derzeit, rxjs-websockets mit eckigen 4 (https://github.com/ohjames/rxjs-websockets) zu verwenden.rxjs-websockets auf eckig 4, handling reconnections

Hier ist mein Service:

import { Injectable } from '@angular/core'; 
import { QueueingSubject } from 'queueing-subject' 
import { Observable } from 'rxjs/Observable' 
import websocketConnect from 'rxjs-websockets' 
import { environment } from 'environments/environment'; 

@Injectable() 
export class WebsocketJWTService { 
    private inputStream: QueueingSubject <any> 
    public messages: Observable <any> 
    SERVER: string = environment["SERVER_ADDRESS"]; 
    constructor(private modal: Modal, private SS: SharedService, private userManager: UserManagerService) {} 
    public connect() { 
     let SERVER = this.SERVER.substring(8); 
     if (this.messages) 
      return; 
     let temp = websocketConnect(
      "wss://" + SERVER + "/ws/media?token=" + this.userManager.getToken(), 
      this.inputStream = new QueueingSubject <any>() 
     ) 
     this.messages = temp.messages.share() 
    } 
    public send(message: any): void { 
     message['WSTOKEN'] = this.userManager.getToken(); 
     this.inputStream.next(message) 
    } 
} 

und ich bin es so ruft auf meine Komponenten:

constructor(private socket:WebsocketJWTService) {} 
ngOnInit() { 
    this.socket.connect(); 
    this.socketSubscription = this.socket.messages.subscribe(message => this.socketMessage(message)); 
} 

Nun, ich versuche Wiederverbindung zu behandeln, damit ich den Verbindungsstatus aussehen und rufen Sie die Verbindungsfunktion wieder, wenn die Anzahl der angeschlossenen Buchse Abfall auf 0:

temp.connectionStatus.subscribe(numberConnected => { 
    if (this.connectedNumber > numberConnected) { 
     console.log("disconnected"); 
     setTimeout(() => { 
      delete this.messages; 
      this.connect(); 
      this.messages.subscribe(); 
     }, 5000) 
    } 
    this.connectedNumber = numberConnected; 
}) 

ich bin richtig Ich sehe, wie mein Socket wieder verbunden wird, wenn es abstürzt, aber mein Abonnement nach der Wiederverbindung nicht mehr funktioniert. Ich sehe die Socket-Ereignisse von der neuen Socket-Verbindung gesendet werden, aber mein Winkelcode scheint nicht bekannt zu sein.

Weiß jemand, wie ich mich wieder mit der WebSocket verbinden kann und mein Abonnement funktioniert?

Vielen Dank für Ihre Zeit

Antwort

0

Die link you pasted einen Abschnitt „bei einem Fehler Wiederherstellen der Verbindung“. Hast du das gelesen, weil du das so machst und es funktioniert.

Ihr eigener Versuch wird nicht funktionieren, denn so funktioniert rxjs nicht. subscribe() gibt ein neues Abonnement zurück und in Ihrem Wiederverbindungscode tun Sie nichts mit diesem Abonnement. Dein altes Abo ist dein altes, danach ist es nicht mehr benutzbar.

Ich würde mit einem grundlegenden Primer auf rxjs beginnen und über heiße und kalte Observablen lesen, bevor Sie versuchen, Ihre eigenen Sachen damit zu tun, sonst können Sie das Beispiel einfach in das README der Bibliothek kopieren, die Sie verwenden.

(Seitennotiz: Ich bin der Library Author).

0

geben Sie so etwas wie eine Aufnahme. count ist nur eine lokale Variable, um die Wiederholungen zu verfolgen.

this.messages = temp.messages.share() 
          .retryWhen(attempts => { 
           return attempts 
           .do((error)=>{ 
            return 1}) 
            .mergeMap(error, count)=>{ 
             console.log(`Wait ${count} seconds, then reconnect!`); 
            return Observable.timer(count * 1000);}}