2017-02-10 4 views
0

Ich verwende rxjava für die parallele Verarbeitung von zwei Anfragen mit Observable.zip. Was ich versuche zu tun ist, in einem observable say response bekomme ich eine Antwort und in anderen observable say diff Ich versuche, die Antwort zu bekommen und diesen Unterschied in DB zu speichern. Das Problem ist, ich bin nicht sicher, wie meine Anforderung zu erreichen, wie die diff observable nicht abgeschlossen zu werden, wenn response observable die Antwort bekommtrxjava - Antwort bekommen und diff parallel einfügen

Hier ist, was ich tue ...

public ServiceResponse getDummyResponse(ServiceRequest serviceRequest, String prodId){ 
    Observable<ServiceResponse> subInfoDummyObservable = getDummyResonseGenericObservable(); 
    Observable<ServicesDiff> reObservable = getServicesDiffGenericObservable(serviceRequest, prodId); 

    Observable<ServiceResponse> responseObservable = Observable.zip(
      subInfoDummyObservable, 
      reObservable, 
      new Func2<ServiceResponse, ServicesDiff, ServiceResponse>() { 
       @Override 
       public ServiceResponse call(ServiceResponse serviceResponse, ServicesDiff diffResponse) { 
        return serviceResponse; 
       } 
      } 
    ); 

    ServiceResponse serviceResponse = responseObservable.toBlocking().single(); 

    return serviceResponse; 
} 

Observable<ServiceResponse> getDummyResonseGenericObservable() { 
    return GenericHystrixCommand.toObservable("getDummyResonseGenericObservable", "getDummyResonseGenericObservable",() -> new ServiceResponse(),(t) -> {return null;}); 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    return GenericHystrixCommand.toObservable("getServicesDiffGenericObservable", "getServicesDiffGenericObservable",() -> getBothServiceResponses(serviceRequest, prodId),(t) -> {return null;}); 
} 

public ServicesDiff getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    Observable<ServicesDiff> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); // never reaches this point********** 
       } 
      } 
    ); 
    ServicesDiff response = observable.toBlocking().single(); 

    return response; 
} 

ich das diff bin Einfügen zu DB in aggregate Methode, aber es erreicht nie aggregate überhaupt. Bitte lassen Sie mich wissen, was ich hier falsch mache? Vielen Dank.

+0

Code Probe mir nicht klar ist, was ist die Beziehung zwischen der getBothServiceResponses-Methode, in der sich das Problem befindet, und dem Rest des Codes? Was sind die zwei Observables, die Sie bei getBothServiceResponses zippen? – yosriz

+0

Ich stimme zu, der Code muss geklärt werden. die ersten drei Methoden werden nie aufgerufen, daher wissen wir nicht, wie die Observablen in der 4. Methode 'getBothServiceResponses()' aussehen, wo dein Problem liegt – nosyjoe

Antwort

0

Observable sind eine Beschreibung, wie Daten zu verbrauchen. In Ihrem Codebeispiel, das Sie nicht abonnieren, konsumieren Sie die Daten tatsächlich nicht. Sie haben gerade beschrieben, wie Sie eine Anfrage stellen, aber der Subskriptionsabschnitt, der Teil, der die Anfragen auslöst, fehlt.

Also, wenn ich ein wenig Code neu schreiben:

class Aggregate { 
    Aggregate(String reponse, ServicesDiff diff) { 
     ... 
    } 
} 

Observable<String> getService1GenericObservable(String prodId) { 
    ... 
} 

Observable<ServicesDiff> getServicesDiffGenericObservable(ServiceRequest serviceRequest, String prodId) { 
    ... 
} 

public Observable<Aggregate> getBothServiceResponses(ServiceRequest serviceRequest, String prodId) { 
    Observable<String> service1ResponseObservable = getService1GenericObservable(prodId); 
    Observable<ServiceResponse> service2ResponseObservable = getService2GenericObservable(serviceRequest, prodId); 

    return Observable<Aggregate> observable = Observable.zip(
      service1ResponseObservable, service2ResponseObservable, 
      new Func2<String, ServiceResponse, ServicesDiff>() { 
       @Override 
       public ServicesDiff call(String service1Response, ServiceResponse service2Response) { 
        return aggregate(service1Response, service2Response); 
       } 
      } 
    ); 
} 

Sie nur dies tun müssen, um das Ergebnis Aggregat zuzugreifen:

getBothServiceResponses(serviceRequest, prodId).subscribe(...)