2017-01-28 3 views

Antwort

-1

Unten ist der Beispielarbeitscode, der RxJava Observables verwendet, der das Ergebnis aus dem am schnellsten reagierenden Thread aus einer Gruppe von Threads druckt.

public static void main(String[] args) { 
      // Create a slow thread which spans 5 secs 
      Callable<String> task1 = new Callable<String>() { 

        @Override 
        public String call() throws Exception { 
          Thread.sleep(5000); 
          return "task1"; 
        } 
      }; 
      // Create a faster thread which spans 1 secs 
      Callable<String> task2 = new Callable<String>() { 

        @Override 
        public String call() throws Exception { 
          Thread.sleep(1000); 
          return "task2"; 
        } 
      }; 
      List<Callable<String>> tasks = new ArrayList<>(); 
      tasks.add(task1); 
      tasks.add(task2); 

      String result = null; 
      try { 
        result = Observable.from(tasks) 
            .subscribeOn(Schedulers.computation()) 
            .flatMap(eachTask -> Observable.fromCallable(eachTask) 
                .subscribeOn(Schedulers.io()) 
                .doOnNext(e -> System.out.println("Executing your action on "+Thread.currentThread().getName())) 
                .doOnError(e -> System.out.println("Failed reason for : "+Thread.currentThread().getName()+" with error "+e.getMessage())) 
                ) 
            .toBlocking() 
            .first(); 

      } catch (Exception e) { 
        System.out.println(e.getMessage()); 
      } 
      System.out.println("result--->"+result); 
} 
+0

schlechten Vorschlag, warten diese auf alle von ihnen zu vervollständigen und dann nehmen Sie die erste. –

+0

@DanieleSegato Nein, wenn Sie den Code so ausführen könnten, wie er ist, würden Sie sehen, dass er NICHT auf ALLE wartet. Es wählt das FIRST-Element aus, das von BlockingObservable zurückgegeben wird. – Sabarish

+0

unabhängig, fordert die Anfrage für merge + take (1) oder merge + first() –

3

Wenn ich Sie recht verstanden, müssen Sie etwas wie folgt aus:

taskSource 
    .flatMap(task -> // for each task 
     Observable.merge(
      // submit same task to multiple services 
      service1.submit(task), 
      service2.submit(task), 
      ..., 
      serviceN.submit(task) 
      ) 
      .take(1)) // take first response; discard others 
    ... // continue processing result of the task 
    .subscribe(...) 
2

Sie wollen den Observable.amb Operator. Es hat den Vorteil, mit Observablen mit mehr als einer Emission zu arbeiten.

+0

Danke wird es überprüfen. – Sabarish

Verwandte Themen