2015-06-17 21 views
20

Mein Problem ist, ich kann nicht unendlich Stream mit Retrofit bekommen. Nachdem ich die Anmeldeinformationen für die erste Poll() Anfrage erhalten habe - ich initiale Poll() Anfrage. Jede poll() - Anfrage antwortet in 25 Sekunden, wenn keine Änderung erfolgt, oder früher, wenn Änderungen vorhanden sind - wobei geänderte_Daten [] zurückgegeben werden. Jede Antwort enthält timestamp Daten, die für die nächste Abfrageanforderung benötigt werden - ich sollte eine neue poll() Anfrage nach jeder poll() Antwort machen. Hier ist mein Code:RxJava + Retrofit lange Polling

getServerApi().getLongPollServer() 
    .flatMap(longPollServer -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollServer.getTs(), "") 
    .take(1) 
    .flatMap(longPollEnvelope -> getLongPollServerApi(longPollServer.getServer()).poll("a_check", Config.LONG_POLLING_SERVER_TIMEOUT, 2, longPollServer.getKey(), longPollEnvelope.getTs(), ""))) 
    .retry() 
    .subscribe(longPollEnvelope1 -> { 
    processUpdates(longPollEnvelope1.getUpdates()); 
}); 

Ich bin neu in RxJava, vielleicht verstehe ich nicht etwas, aber ich kann nicht unendlich Strom bekommen. Ich bekomme 3 Anrufe, dann OnNext und OnComplete.

P.S. Vielleicht gibt es eine bessere Lösung, um Long-Polling auf Android zu implementieren?

+0

In Ihrem Fall würde ich in Erwägung ziehen, meine eigene 'Observable' mit' Observable.create() ' –

Antwort

11

Obwohl nicht ideal, glaube ich, dass Sie die Nebenwirkungen von RX verwenden können, um ein gewünschtes Ergebnis zu erzielen (DoOn-Operationen).

Observable<CredentialsWithTimestamp> credentialsProvider = Observable.just(new CredentialsWithTimestamp("credentials", 1434873025320L)); // replace with your implementation 

Observable<ServerResponse> o = credentialsProvider.flatMap(credentialsWithTimestamp -> { 
    // side effect variable 
    AtomicLong timestamp = new AtomicLong(credentialsWithTimestamp.timestamp); // computational steering (inc. initial value) 
    return Observable.just(credentialsWithTimestamp.credentials) // same credentials are reused for each request - if invalid/onError, the later retry() will be called for new credentials 
      .flatMap(credentials -> api.query("request", credentials, timestamp.get())) // this will use the value from previous doOnNext 
      .doOnNext(serverResponse -> timestamp.set(serverResponse.getTimestamp())) 
      .repeat(); 
}) 
     .retry() 
     .share(); 

private static class CredentialsWithTimestamp { 

    public final String credentials; 
    public final long timestamp; // I assume this is necessary for you from the first request 

    public CredentialsWithTimestamp(String credentials, long timestamp) { 
     this.credentials = credentials; 
     this.timestamp = timestamp; 
    } 
} 

Wenn Sie 'o' abonnieren, wird die interne Observable wiederholt. Sollte ein Fehler auftreten, wird 'o' den Credentials-Stream erneut versuchen und erneut anfordern.

In Ihrem Beispiel wird die rechnerische Steuerung durch Aktualisierung der Zeitstempelvariablen erreicht, die für die nächste Anforderung erforderlich ist.

+0

für Ihre Antwort zu implementieren. Allerdings bekomme ich einen Zeitstempel von API und ich sollte es mit einem neuen Poll() Anruf zurückschicken. – localhost

+0

Ich habe die Antwort aktualisiert, um hoffentlich näher zu Ihrer Situation zu passen. Sie können sehen, dass Sie einfach eine Variable festlegen, wenn Sie eine ServerResponse erhalten. "doOnNext" macht den Nebeneffekt explizit. Meine Sorge ist, dass dies nicht schön ist und wir müssten Ihren Code sehen, um eine bessere Antwort zu geben. – snodnipper

+0

Ich habe ähnliches Problem und gelöst mit Ihrem Code, aber in meinem Fall möchte ich Wert auch zu allererster Zeit zu speichern. Wo kann ich diesen Code? – Krutik

Verwandte Themen