2016-04-20 5 views
5

Ich habe einen sehr einfachen Integrationsfluss, bei dem eine RESTful-Anfrage über einen Publish-Subscribe-Channel an zwei Provider weitergeleitet wird. Das Ergebnis der beiden RESTful-Dienste wird dann in einem einzigen Array aggregiert. Die Skizze der Integrationsstrom wird, wie unten gezeigt:Spring Integration Java DSL - Konfiguration des Aggregators

@Bean 
IntegrationFlow flow() throws Exception { 
    return IntegrationFlows.from("inputChannel") 
      .publishSubscribeChannel(s -> s.applySequence(true) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider1.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
       ).subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider2.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class) 
         ) 
       ) 
      ) 
      .aggregate() 
      .get(); 
} 

Wenn jedoch meinen Code ausgeführt wird, die sich ergebende Array enthält die Elemente durch nur eine der RESTful Dienste zurückgegeben. Gibt es einen Konfigurationsschritt, den ich vermisse?

UPDATE

Die folgende Version entspricht der vollständigen Lösung unter Berücksichtigung Artem Kommentare.

@Bean 
IntegrationFlow flow() throws Exception { 
    return IntegrationFlows.from("inputChannel-scatter") 
      .publishSubscribeChannel(s -> s.applySequence(true) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider1.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
         .channel("inputChannel-gather")) 
       .subscribe(f -> f 
         .handle(Http.outboundGateway("http://provider2.com/...") 
           .httpMethod(HttpMethod.GET) 
           .expectedResponseType(ItemDTO[].class)) 
         .channel("inputChannel-gather"))) 
      .get(); 
} 

@Bean 
IntegrationFlow gatherFlow() { 
    return IntegrationFlows.from("inputChannel-gather") 
      .aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
         g.getMessages().stream() 
           .flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload())) 
           .collect(Collectors.toList()).toArray(new ItemDTO[0])))) 
      .get(); 
} 

Antwort

3

Eigentlich funktioniert es nicht so.

Die .aggregate() ist ein dritten Teilnehmer zu diesem publishSubscribeChannel.

Sie müssen Ihren Fluss zu zwei von ihnen trennen. Wie folgt aus:

@Bean 
    public IntegrationFlow publishSubscribeFlow() { 
     return flow -> flow 
       .publishSubscribeChannel(s -> s 
         .applySequence(true) 
         .subscribe(f -> f 
           .handle((p, h) -> "Hello") 
           .channel("publishSubscribeAggregateFlow.input")) 
         .subscribe(f -> f 
           .handle((p, h) -> "World!") 
           .channel("publishSubscribeAggregateFlow.input")) 
       ); 
    } 

    @Bean 
    public IntegrationFlow publishSubscribeAggregateFlow() { 
     return flow -> flow 
       .aggregate(a -> a.outputProcessor(g -> g.getMessages() 
         .stream() 
         .<String>map(m -> (String) m.getPayload()) 
         .collect(Collectors.joining(" ")))) 
       .channel(c -> c.queue("subscriberAggregateResult")); 
    } 

Achten Sie bitte auf die .channel("publishSubscribeAggregateFlow.input") Nutzung von beiden Teilnehmern.

Um ehrlich zu sein, das ist ein Punkt von jedem publish-subscribe. Wir müssen wissen, wo wir das Ergebnis aller Abonnenten senden können, wenn wir sie zusammenfassen.

Ihr Anwendungsfall erinnert mich an das Scatter-Gather EIP-Muster.

Wir haben seine Implementierung in der DSL noch nicht. Fühlen Sie sich frei, um eine GH issue auf die Angelegenheit zu erheben, und wir werden versuchen, damit in der kommenden 1.2 Version umzugehen.

UPDATE

Die GH Thema auf die Frage: https://github.com/spring-projects/spring-integration-java-dsl/issues/75

+0

Vielen Dank für Sie Artem helfen. In der Tat habe ich versucht, Kanäle zu benutzen und den Fluss vorher zu trennen, ohne Erfolg, da ich auch das Problem mit dem Aggregator hatte. Ihre Antwort gab mir auch Hinweise, wie man den Aggregator schreibt. – user3329862

+0

Großartig! Aber Scatter-Gather wäre eine gute Ergänzung zum DSL. Seien Sie also nicht schüchtern, um ein GH-Problem zu lösen! –

Verwandte Themen