2017-01-02 4 views
0

Ich versuche, ein Benutzerobjekt parallel zu laden.RxJava: Reduce funktioniert nicht wie erwartet

final User user = new User(); 
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
      .flatMap(field -> getOrchestrator(user, field)) 
      .scan(new User(), (finalUser, event) -> { 
       finalUser = event; 
       return finalUser; 
      }); 

Scan emittiert drei Benutzerobjekt, wo die reduzieren keine Elemente überhaupt emittiert? Was mache ich hier falsch?

final User user = new User(); 
    final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
      .flatMap(field -> getOrchestrator(user, field)) 
      .reduce(new User(), (finalUser, event) -> { 
       finalUser = event; 
       return finalUser; 
      }); 

Der getOrchestrator gibt Observable zurück. Jede Hilfe wird geschätzt.

Unten finden Sie die komplette Code

public class Orchestrator { 
    private String userId; 

    public Orchestrator(final String userId) { 
     this.userId = userId; 
    } 

    public static void main(final String[] args) throws Exception { 
     final User user = new User(); 
     final Observable<User> userObs = Observable.just("NAMES", "ADDRESSES", "CURRENT_ADDRESS") 
       .flatMap(field -> getOrchestrator(user, field)) 
       .scan(new User(), (finalUser, event) -> { 
        finalUser = event; 
        return finalUser; 
       }); 

     userObs.subscribeOn(Schedulers.io()).subscribe(result -> { 
      System.out.println(result.toString()); 
     }); 

     TimeUnit.SECONDS.sleep(10); 

    } 

    private static Observable<User> getOrchestrator(final User user, final String fieldName) { 
     switch (fieldName) { 
      case "CURRENT_ADDRESS": 
       return new AddressOrchestrator().getCurrentAddress(user.getUserId()) 
         .map(currentAddress -> { 
          user.setAddress(currentAddress); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 
      case "ADDRESSES": 
       return new AddressOrchestrator().getAddresses(user.getUserId()) 
         .map(addresses -> { 
          user.setAddresses(addresses); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 

      case "NAMES": 
       return new NameOrchestrator().getNames(user.getUserId()) 
         .map(names -> { 
          user.setNames(names); 
          try { 
           TimeUnit.MILLISECONDS.sleep(200); 
          } 
          catch (final InterruptedException e) { 

          } 
          return user; 
         }); 
     } 
     return null; 
    } 

    public User getUser() { 
     final Random r = new Random(); 
     if (r.nextInt(3) % 2 == 0) { 
      return new User(); 
     } 
     throw new RuntimeException(); 
    } 
} 

Jede Orchestrierung gibt beobachtbare Snippet.

public class AddressOrchestrator { 

    public Observable<List<Address>> getAddresses(final String userId) { 
     return Observable.create(s -> { 
      final Address currentAddress = this.getBaseAddress(userId); 
      final Address anotherAddress = this.getBaseAddress(userId); 
      anotherAddress.setState("NE"); 
      s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
     }); 

    } 

    public Observable<Address> getCurrentAddress(final String userId) { 
     return Observable.create(s -> s.onNext(this.getBaseAddress(userId))); 
    } 

    public Address getBaseAddress(final String userId) { 
     final Address address = new Address(); 
     address.setLine1("540 Caddo Lake Dr"); 
     address.setCity("Georgetown"); 
     address.setCountry("USA"); 
     address.setState("TX"); 

     return address; 
    } 
} 


public class NameOrchestrator { 

    public Observable<List<Name>> getNames(final String userId) { 
     return Observable.create(s -> { 
      final Name name = new Name(); 
      name.setName("Vanchi"); 
      final Name formerName = new Name(); 
      formerName.setName("Vanchinathan"); 
      s.onNext(Arrays.asList(name, formerName)); 
     }); 
    } 
} 
+1

Zeigen Sie ein vollständiges Beispiel. Soweit wir wissen, ist Ihr Abonnement falsch. – weston

+0

Es scheint, dass dein Stream unendlich ist. –

+0

Während der Scanvorgang endet der Stream. Wenn Sie Reduzieren verwenden, wird kein Element ausgegeben. @weston Hinzugefügt das komplette Code-Snippet –

Antwort

2

Ihre Orchestrierungen, die Sie Observable.create (eine große rote Fahne, wenn Sie wirklich wissen, was Sie tun) erstellen mit bekannter beenden nicht. Dies führt zu unendlichen Strömen (in dem Sinne, dass niemals ein vollständiges Ereignis emittiert wird). Was Sie brauchen es so etwas wie

public Observable<List<Address>> getAddresses(final String userId) { 
     return Observable.create(s -> { 
      final Address currentAddress = this.getBaseAddress(userId); 
      final Address anotherAddress = this.getBaseAddress(userId); 
      anotherAddress.setState("NE"); 
      s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
      s.onCompleted(); 
     }); 

Notiere die onCompleted genannt zu werden. Dies wird Ihre oberflächlichen Probleme beheben, aber Sie sind besser dran, die Observable.create in erster Linie loszuwerden und etwas wie Observable.defer verwenden.

+0

Können Sie erklären, wie erstellen ist schlecht und wie hilft Defer? –

+0

Kurz gesagt, gibt es eine Reihe von Möglichkeiten zu vermasseln, wenn Sie 'Observable verwenden.create', weshalb es im Allgemeinen nicht empfohlen wird, wenn Sie eine Alternative haben. Es gibt einen guten Blogbeitrag auf 'Observable.defer [hier] (http://blog.danlew.net/2014/10/08/grokking-rxjava-part-4). – JohnWowUs

1

Das passiert, weil alle vorgelagerten Elemente ausgibt, während reduce nur emittiert wird, wenn der Stream abgeschlossen ist. Ersetzen

public Observable<Address> getCurrentAddress(final String userId) { 
    return Observable.create(s -> s.onNext(this.getBaseAddress(userId))); 
} 

mit

public Observable<Address> getCurrentAddress(final String userId) { 
    return Observable.fromCallable(() -> this.getBaseAddress(userId)); 
} 

und

public Observable<List<Address>> getAddresses(final String userId) { 
    return Observable.create(s -> { 
     final Address currentAddress = this.getBaseAddress(userId); 
     final Address anotherAddress = this.getBaseAddress(userId); 
     anotherAddress.setState("NE"); 
     s.onNext(Arrays.asList(currentAddress, anotherAddress)); 
    }); 
} 

mit

public Observable<List<Address>> getAddresses(final String userId) { 
    return Observable.fromCallable(() -> this.getBaseAddress(userId)) 
      .zipWith(Observable.fromCallable(() -> this.getBaseAddress(userId)) 
        .map(address -> address.setState("NE")), 
        (currentAddress, anotherAddress) -> Arrays.asList(currentAddress, anotherAddress)); 
} 

und Ihre setAddress() Verfahren zu diesem

public Address setState(String state) { 
    this.state = state; 
    return this; 
} 

fromCallable()

ändern, ist ein besserer Weg, eine Observable, die höchstens ein Element emittiert zu schaffen, weil sie Fehler und Gegendruck für Sie übernimmt.

Verwandte Themen