2017-02-03 1 views
1

Ich bin neu in der ganzen Rx Sache und reaktive Programmierung, aber ich muss mit einer solchen Situation umgehen. Ich möchte ein beobachtbares Intervall, um den Status einer Hardware durch eine POST-Anforderung an ihre REST-API alle 500 ms zu überprüfen, um zu sehen, ob sich die Antwort ändert. Sobald es sich also ändert, möchte ich, dass eine solche POST-Anfrage, die sofort beobachtbar ist, sofort beendet wird und die Ressource anderen zukünftigen Operationen überlassen wird. Hier ist ein Stück Code.Wie kann man ein Intervall, das unter der Bedingung in Angular2 oder RxJS beobachtbar ist, abbestellen oder entsorgen?

myLoop(item_array:number[], otheroption : string){ 
    for (let item of item_array){ 
     //set hardware option, still a request 
     this.configHardware(item,otheroption) 
     //after config the hardware, command hardware take action 
      .flatMap(() => { 
       //this return a session id for check status 
       this.takeHardwareAction() 
        .flatMap((result_id) => { 
         //I want every 500ms check if hardware action is done or not 
         let actionCheckSubscription = IntervalObservable.create(500).subscribe(
          () => { 
           //So my question is, can I unsubscribe the actionCheckSubscription in here on condition change? For example, 
           if (this.intervalCheckingStatus(result_id)) 
            actionCheckSubscription.unsubscribe() ; 
          } 
         ) ; 
        }) 

      }) 
    } 
} 

Antwort

2

Sie Observable.from nutzen könnten und concatMap durch alle Elemente zu durchlaufen und dann eine filter in Kombination mit take(1) verwenden, um ein Intervall zu stoppen, sobald die Validierung der geht filter:

myLoop(item_array:number[], otheroption : string) { 
    return Observable.from(item_array) 
     .concatMap(item => this.configHardware(item, otheroption) 
      .switchMap(resultId => Observable.interval(500) 
       .switchMapTo(this.intervalCheckingStatus(resultId)) 
       .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast 
       .take(1) // and complete after 1 value was valid 
       .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not) 
      ) 
     ); 
} 

// usage: 
myLoop([1,2,3,4], "fooBar") 
    .subscribe(
     item => console.log(`Item ${item} is now valid`), 
     error => console.error("Some error occured", error), 
     () => console.log("All items are valid now") 
    ); 

Hier ist ein Live-Beispiel mit Scheindaten

const Observable = Rx.Observable; 
 

 
function myLoop(item_array) { 
 
    return Observable.from(item_array) 
 
     // if you don't mind the execution-order you can use "mergeMap" instead of "concatMap" 
 
     .concatMap(item => configHardwareMock(item) 
 
      .switchMap(resultId => Observable.interval(500) 
 
       .do(() => console.info("Checking status of: " + resultId)) 
 
       .switchMap(() => intervalCheckingStatus(resultId)) 
 
       .filter(status => Boolean(status)) // your logic if the status is valid, currently just a boolean-cast 
 
       .take(1) // and complete after 1 value was valid 
 
       .mapTo(item) // map back to "item" so we can notify the subscriber (this is optional I guess and depends on if you want this feature or not) 
 
      ) 
 
     ); 
 
} 
 

 
// usage: 
 
myLoop([1,2,3,4]) 
 
    .subscribe(
 
     item => console.log(`Item ${item} is now valid`), 
 
     error => console.error("Some error occured", error), 
 
     () => console.log("All items are valid now") 
 
    ); 
 

 
// mock helpers 
 
function configHardwareMock(id) { 
 
    return Observable.of(id); 
 
} 
 

 
function intervalCheckingStatus(resultId) { 
 
    if (Math.random() < .4) { 
 
    return Observable.of(false); 
 
    } 
 
    
 
    return Observable.of(true); 
 
}
<script src="https://unpkg.com/rxjs/bundles/Rx.min.js"></script>

+0

erstaunlich, vielen Dank. Diese reaktive Programmierung ist aus meiner Sicht wirklich komisch. Ich brauche etwas Zeit, um alle neuen Aussagen zu verdauen. Kann ich fragen, ob dies die übliche Art ist, die REST-API-Kommunikation in Angular2 oder RxJS zu erstellen? –

+0

In angular2 injizieren Sie normalerweise 'Http' (https: // eckig.io/docs/ts/neuste/guide/server-communication.html #! # fetch-data) jeder http-Aufruf gibt auch ein 'Observable' zurück und funktioniert damit nahtlos mit RxJS - in reinem RxJS kann man' Observable verwenden. Ajax (...) ' – olsn

+0

Wird das Observable herunterfahren? Es stoppt nur ein Intervall http://reactivex.io/documentation/operators/take.html 'Sie können nur die ersten n Elemente emittieren, die von einem Observable ausgegeben werden, und dann den Rest ausführen, indem Sie den Operator Observable with the Take modifizieren. 'Müsstest du dich nicht manuell abmelden? – Gary

2

So möchten Sie eine POST-Anforderung alle 500 ms machen und dann seine Antwort zu überprüfen. Ich nehme an, Ihre Methode intervalCheckingStatus wertet die POST-Antwort aus und stellt fest, ob es anders ist?

Zuerst würde ich IntervalObservable nicht verwenden. Haben Sie das RxJS-Modul importiert? Es ist die Drittanbieter-Bibliothek, die von Angular unterstützt wird, und die, die sie in allen ihren Entwickler-Guide-Beispielen verwenden. Wenn nicht, installieren Sie es und importieren Sie es. https://github.com/Reactive-Extensions/RxJS

import * as Rx from 'rxjs/Rx'; 

Ich gehe davon aus haben Sie bereits importiert Http, ResponseOptions usw., aber hier ist es für den Fall, andere sind neugierig:

import { Http, Response, ResponseOptions } from '@angular/http'; 

EDIT 1: die Abhängigkeit Injektion schließen vergessen. Inject Http in Ihrem Konstruktor. Ich habe es http genannt, damit, wie ich this.http.post

constructor(private http: Http) { 

Dann bin ruft, würde ich folgendes tun:

EDIT 2: Diese in Ihrer Schleife würde, wo die Post-Argumente sind für das Element in Ihrem Array relevant.

// Every 500 ms, make a POST request 
    Rx.Observable.interval(500) 
        // Add your POST arguments here 
       .map(_ => this.http.post(yourUrl, yourBody)) 
       // This filter here is so that it will only emit when intervalCheckingStatus returns true 
       // You need to get the property you need from the Response: resp 
       // Is it the status code you're interested in? That's what I put as an example here but whatever it is, pass it to your method 
       .filter(resp => this.intervalCheckingStatus(resp.status)) 
       // Take(1) takes only the first emitted value and once it does that, the observable completes. So you do NOT need to unsubscribe explicitly. 
       .take(1); 

Wenn Sie etwas tun müssen, sobald die Antwort den Status hat (oder was auch immer Eigenschaft) Sie suchen, die Kette dann eine .subscribe bis zum Ende und führen Sie die Aktion, die Sie dort brauchen. Sobald das erste Element gepumpt wird, ist der beobachtbare Stream abgeschlossen und Sie müssen sich nicht mehr abmelden (1).

Auch hier eine sehr hilfreiche Website ist: http://rxmarbles.com/#take Sie, dass in ihrem Beispiel sehen können, ist die resultierende beobachtbare vollständige (vertikale Linie) nach 2 Elemente getroffen werden.

+0

Vielen Dank , Ihre Antwort ist detaillierter, dies wird einige Zeit dauern, um zu testen, danke für die Kommentare: D –

Verwandte Themen