2016-04-13 10 views
8

Kann ich Retrofit + RxJava verwenden, um einen endlosen Stream zu hören? Zum Beispiel der Twitter-Stream. Was ich habe, ist dies:Android Retrofit 2 + RxJava: Endlos-Stream anhören

aber die "onComplete" wird aufgerufen, nachdem das erste Objekt geparst wurde. Gibt es eine Möglichkeit zu sagen, dass Retrofit bis auf weiteres geöffnet bleibt?

Antwort

9

Hier ist meine Lösung:

Sie können die @Streaming Anmerkung verwenden:

public interface ITwitterAPI { 

    @GET("/2/rsvps") 
    @Streaming 
    Observable<ResponseBody> twitterStream(); 
} 

ITwitterAPI api = new Retrofit.Builder() 
      .baseUrl("http://stream.meetup.com") 
      .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
      .build().create(ITwitterAPI.class); 

Mit @Streaming können wir Roh-Eingangs Von ResponseBody bekommen.

Hier ist meine Funktion Körper durch Linien mit Ereignissen geteilt zu wickeln:

public static Observable<String> events(BufferedSource source) { 
    return Observable.create(new Observable.OnSubscribe<String>() { 
     @Override 
     public void call(Subscriber<? super String> subscriber) { 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
       subscriber.onCompleted(); 
      } catch (IOException e) { 
       e.printStackTrace(); 
       subscriber.onError(e); 
      } 
     } 
    }); 
} 

und Nutzung führen:

api.twitterStream() 
    .flatMap(responseBody -> events(responseBody.source())) 
    .subscribe(System.out::println); 

upd über anmutig

zu stoppen, wenn wir Retrofit schließt abzumelden, Eingabestrom. Aber unmöglich zu erkennen Eingangsstrom geschlossen oder nicht von Eingangsstrom selbst, also nur Weg - versuchen Sie, aus dem Stream lesen - wir erhalten Ausnahme mit Socket closed Nachricht. Wir können diese Ausnahme als Schluss interpretieren:

 @Override 
     public void call(Subscriber<? super String> subscriber) { 
      boolean isCompleted = false; 
      try { 
       while (!source.exhausted()) { 
        subscriber.onNext(source.readUtf8Line()); 
       } 
      } catch (IOException e) { 
       if (e.getMessage().equals("Socket closed")) { 
        isCompleted = true; 
        subscriber.onCompleted(); 
       } else { 
        throw new UncheckedIOException(e); 
       } 
      } 
      //if response end we get here 
      if (!isCompleted) { 
       subscriber.onCompleted(); 
      } 
     } 

Und wenn die Verbindung, weil Antwort Ende geschlossen ist, haben wir keine Ausnahmen. Hier isCompleted überprüfen Sie das. Lassen Sie mich wissen, wenn ich falsch liege :)

+0

Danke! Irgendwelche Hinweise, wie Sie den Stream nicht mehr hören können? – ticofab

Verwandte Themen