2015-05-13 11 views
30

Ich habe eine Menge Probleme, den Zip-Operator in RxJava für mein Android-Projekt zu verstehen. Problem Ich muss in der Lage sein, eine Netzwerkanforderung zum Hochladen eines Videos zu senden Dann muss ich eine Netzwerkanforderung senden, ein Bild hochladen, um damit zu gehen schließlich muss ich eine Beschreibung hinzufügen und die Antworten von den beiden vorherigen verwenden fordert an, die Standort-URLs des Videos und des Bildes zusammen mit der Beschreibung auf meinen Server hochzuladen.Rxjava Android Wie benutze ich den Zip-Operator

Ich nahm an, dass der Zip-Operator für diese Aufgabe perfekt wäre, da ich verstand, dass wir die Antwort von zwei Observablen (Video und Bildanfragen) nehmen und für meine letzte Aufgabe verwenden könnten. Aber ich kann nicht scheinen, dass dies geschieht, wie ich es mir vorstelle.

Ich bin auf der Suche nach jemandem zu beantworten, wie dies konzeptuell mit ein wenig pseudo-Code getan werden kann. Danke

Antwort

40

Zip-Operator strikt Paare emittiert Objekte von Observablen. Es wartet darauf, dass beide (oder mehr) Artikel ankommen, und fügt sie dann zusammen. Also ja das wäre für Ihre Bedürfnisse geeignet.

Ich würde Func2 verwenden, um das Ergebnis von den ersten beiden Observablen zu verketten. Beachten Sie, dass dieser Ansatz einfacher wäre, wenn Sie Retrofit verwenden, da seine API-Schnittstelle eine Observable zurückgeben kann. Andernfalls müssten Sie Ihr eigenes Observable erstellen.

// assuming each observable returns response in the form of String 
Observable<String> movOb = Observable.create(...); 
// if you use Retrofit 
Observable<String> picOb = RetrofitApiManager.getService().uploadPic(...), 
Observable.zip(movOb, picOb, 

    new Func2<String, String, MyResult>() { 

     @Override 
     public MyResult call(String movieUploadResponse, 
      String picUploadResponse) { 
      // analyze both responses, upload them to another server 
      // and return this method with a MyResult type 
      return myResult; 
     } 
     } 
) 
// continue chaining this observable with subscriber 
// or use it for something else 
+0

Das Problem, auf das ich gestoßen bin, ist, dass ich die Ansicht habe, dass ich die Aufgaben des Videoobservablen abfeuern kann und ein anderes, das das Bild beobachtbar macht, und noch ein weiteres, das beide Ergebnisse nutzen und für das endgültige Observable verwenden soll. Gibt zip mir die Ergebnisse von Observables zurück, die bereits ausgeführt wurden? – feilong

+0

Ich glaube nicht, dass Sie eine Observable ausführen können, aber ich denke, ich verstehe Ihren Standpunkt. Observable wird nicht ausgeführt, bis ein Subriber an ihn angehängt ist. Also muss das gesamte beobachtbare Arrangement gemacht werden, bevor Sie den Abonnenten anhängen. – inmyth

+1

Ich stimme mit Ihrem Punkt, dass "Zip-Operator können Sie ein Ergebnis aus den Ergebnissen von zwei verschiedenen beobachtbaren komponieren". Nun möchte ich, dass das Observable 2 von Observable 1 abhängig ist, also Observable 1 vor Observable 2 ausführen soll und dann muss ich das Ergebnis beider Observablen kombinieren. Haben wir einen Operator dafür? Kann zip diesen Job machen. Ich möchte flatMap nicht verwenden, da es einen Stream in einen anderen konvertiert, aber hier muss ich die Abhängigkeit festlegen und dann das Ergebnis zippen. Bitte antworte. –

0

zip Operator können Sie ein Ergebnis aus den Ergebnissen von zwei verschiedenen beobachtbaren komponieren.

Sie müssen ein Lambda angeben, das ein Ergebnis aus Daten erzeugt, die von jedem Observablen ausgegeben werden.

Observable<MovieResponse> movies = ... 
Observable<PictureResponse> picture = ... 

Observable<Response> response = movies.zipWith(picture, (movie, pic) -> { 
     return new Response("description", movie.getName(), pic.getUrl()); 

}); 
+0

Ich stimme mit Ihrem Punkt überein, dass "Zip-Operator können Sie ein Ergebnis aus den Ergebnissen von zwei verschiedenen beobachtbaren komponieren". Nun möchte ich, dass das Observable 2 von Observable 1 abhängig ist, also Observable 1 vor Observable 2 ausführen soll und dann muss ich das Ergebnis beider Observablen kombinieren. Haben wir einen Operator dafür? Kann zip diesen Job machen. Ich möchte flatMap nicht verwenden, da es einen Stream in einen anderen konvertiert, aber hier muss ich die Abhängigkeit festlegen und dann das Ergebnis zippen. Bitte antworte. –

39

Schritt für Schritt: -> Lassen Sie uns zunächst unsere Retrofit Objekt definieren Github API zugreifen zu können, dann Setup zwei Observablen für die beiden Netzwerkanfragen über:

Retrofit repo = new Retrofit.Builder() 
     .baseUrl("https://api.github.com") 
     .addConverterFactory(GsonConverterFactory.create()) 
     .addCallAdapterFactory(RxJavaCallAdapterFactory.create()) 
     .build(); 

Observable<JsonObject> userObservable = repo 
     .create(GitHubUser.class) 
     .getUser(loginName) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()); 

Observable<JsonArray> eventsObservable = repo 
     .create(GitHubEvents.class) 
     .listEvents(loginName) 
     .subscribeOn(Schedulers.newThread()) 
     .observeOn(AndroidSchedulers.mainThread()); 

Die Retrofit-Schnittstellen sind einfach genug:

public interface GitHubUser { 
    @GET("users/{user}") 
    Observable<JsonObject> getUser(@Path("user") String user); 
} 

public interface GitHubEvents { 
    @GET("users/{user}/events") 
    Observable<JsonArray> listEvents(@Path("user") String user); 
} 

In letzter Zeit verwenden wir RxJava s zip Methode unsere beiden Observablen zu kombinieren und warten, bis sie vor abzuschließen eine neue Observable zu schaffen.

Observable<UserAndEvents> combined = Observable.zip(userObservable, eventsObservable, new Func2<JsonObject, JsonArray, UserAndEvents>() { 
    @Override 
    public UserAndEvents call(JsonObject jsonObject, JsonArray jsonElements) { 
    return new UserAndEvents(jsonObject, jsonElements); 
    } 
}); 

Was ist die UserAndEvents?Es ist nur eine einfache POJO die beiden Objekte zu kombinieren:

public class UserAndEvents { 
    public UserAndEvents(JsonObject user, JsonArray events) { 
    this.events = events; 
    this.user = user; 
    } 

    public JsonArray events; 
    public JsonObject user; 
} 

Schließlich wollen wir die Methode abonnieren rufen unsere neuen beobachtbare kombiniert:

combined.subscribe(new Subscriber<UserAndEvents>() { 
      ... 
      @Override 
      public void onNext(UserAndEvents o) { 
      // You can access the results of the 
      // two observabes via the POJO now 
      } 
     }); 
+0

gibt es irgendeine Möglichkeit, dass der Körper params z.B. body request params der ersten Anfrage gesendet als body params der zweiten Anfrage.Es scheint schwierig zu sein in einem meiner Zip-Aufruf @ramkailash – Killer

2

Hier habe ich ein Beispiel, das ich Zip tat mit in asynchron, falls du bist neugierig

 /** 
* Since every observable into the zip is created to subscribeOn a diferent thread, it´s means all of them will run in parallel. 
* By default Rx is not async, only if you explicitly use subscribeOn. 
    */ 
@Test 
public void testAsyncZip() { 
    scheduler = Schedulers.newThread(); 
    scheduler1 = Schedulers.newThread(); 
    scheduler2 = Schedulers.newThread(); 
    long start = System.currentTimeMillis(); 
    Observable.zip(obAsyncString(), obAsyncString1(), obAsyncString2(), (s, s2, s3) -> s.concat(s2) 
                         .concat(s3)) 
       .subscribe(result -> showResult("Async in:", start, result)); 
} 

/** 
* In this example the the three observables will be emitted sequentially and the three items will be passed to the pipeline 
*/ 
@Test 
public void testZip() { 
    long start = System.currentTimeMillis(); 
    Observable.zip(obString(), obString1(), obString2(), (s, s2, s3) -> s.concat(s2) 
                     .concat(s3)) 
       .subscribe(result -> showResult("Sync in:", start, result)); 
} 


public void showResult(String transactionType, long start, String result) { 
    System.out.println(result + " " + 
           transactionType + String.valueOf(System.currentTimeMillis() - start)); 
} 

public Observable<String> obString() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obString1() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obString2() { 
    return Observable.just("") 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
} 

public Observable<String> obAsyncString() { 
    return Observable.just("") 
        .observeOn(scheduler) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "Hello"); 
} 

public Observable<String> obAsyncString1() { 
    return Observable.just("") 
        .observeOn(scheduler1) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> " World"); 
} 

public Observable<String> obAsyncString2() { 
    return Observable.just("") 
        .observeOn(scheduler2) 
        .doOnNext(val -> { 
         System.out.println("Thread " + Thread.currentThread() 
                   .getName()); 
        }) 
        .map(val -> "!"); 
} 

Sie hier weitere Beispiele sehen https://github.com/politrons/reactive

6

Ein kleiner example:

Observable<String> stringObservable1 = Observable.just("Hello", "World"); 
Observable<String> stringObservable2 = Observable.just("Bye", "Friends"); 

Observable.zip(stringObservable1, stringObservable2, new BiFunction<String, String, String>() { 
    @Override 
    public String apply(@NonNull String s, @NonNull String s2) throws Exception { 
     return s + " - " + s2; 
    } 
}).subscribe(new Consumer<String>() { 
    @Override 
    public void accept(String s) throws Exception { 
     System.out.println(s); 
    } 
}); 

Dies drucken:

Hello - Bye 
World - Friends 
+0

schön, in der 'Func2 <...>', was sind die drei Saiten? ist es für die 's',' -' und 's2'? – chornge

+1

Ich habe gerade die Antwort aktualisiert, um RxJava2-Methoden zu behandeln, der erste String in der BiFunction ist der Typ des ersten Observablen, der zweite String ist der Typ des zweiten Observablen und der dritte String ist der Typ des Observablen, das generiert wird! –

1

i für eine einfache Antwort gesucht haben, wie Sie die Zip-Operator verwenden, und was mit den Observablen zu tun Ich erschaffe, um sie an sie zu übergeben, ich fragte mich, ob ich subscribe() für jede beobachtbare oder nicht nennen sollte, keine dieser Antworten waren einfach zu finden, ich musste es selbst herausfinden, also hier ist ein einfaches Beispiel dafür Verwenden von Zip-Operator auf 2 Observables:

@Test 
public void zipOperator() throws Exception { 

    List<Integer> indexes = Arrays.asList(0, 1, 2, 3, 4); 
    List<String> letters = Arrays.asList("a", "b", "c", "d", "e"); 

    Observable<Integer> indexesObservable = Observable.fromIterable(indexes); 

    Observable<String> lettersObservable = Observable.fromIterable(letters); 

    Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems()) 
      .subscribe(printMergedItems()); 
} 

@NonNull 
private BiFunction<Integer, String, String> mergeEmittedItems() { 
    return new BiFunction<Integer, String, String>() { 
     @Override 
     public String apply(Integer index, String letter) throws Exception { 
      return "[" + index + "] " + letter; 
     } 
    }; 
} 

@NonNull 
private Consumer<String> printMergedItems() { 
    return new Consumer<String>() { 
     @Override 
     public void accept(String s) throws Exception { 
      System.out.println(s); 
     } 
    }; 
} 

das gedruckte Ergebnis ist:

[0] a 
[1] b 
[2] c 
[3] d 
[4] e 

die endgültigen Antworten auf die Fragen, wo in meinem Kopf wurden als

folgt

die Observable zum zip() übergeben Methode muss nur sein nur erstellt, sie brauchen keine Abonnenten für sie, nur die Erstellung von ihnen ist genug ... Wenn Sie möchten, dass alle Observable auf einem Scheduler ausgeführt werden, können Sie dies für das Observable angeben ... Ich habe auch versucht, die zip() Operator auf Observables, wo sie auf das Ergebnis warten sollten, und das Consumable der zip() war t Riggered nur, wenn beide Ergebnisse bereit (das ist das erwartete Verhalten)

+0

Ich habe zwei Listen von Observablen (http Anfragen). Die erste Liste enthält Anforderungen zum Erstellen von Elementen und die zweite Liste enthält Anforderungen zum Aktualisieren von Elementen. Ich möchte "etwas" ausführen, wenn alle Anfragen beendet sind. Ich dachte daran, zip zu verwenden, aber bei Ihrem Beispiel sehe ich, dass es die Anfragen paarweise gruppiert und ich möchte nur am Ende benachrichtigt werden (die Listen haben unterschiedliche Größen). Kannst du mir eine Idee geben? Vielen Dank. – JCarlos

+1

werfen Sie einen Blick auf diese: http://reactivex.io/documentation/operators/combinelatest.html –