Ich habe einen sehr einfachen Integrationsfluss, bei dem eine RESTful-Anfrage über einen Publish-Subscribe-Channel an zwei Provider weitergeleitet wird. Das Ergebnis der beiden RESTful-Dienste wird dann in einem einzigen Array aggregiert. Die Skizze der Integrationsstrom wird, wie unten gezeigt:Spring Integration Java DSL - Konfiguration des Aggregators
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
).subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class)
)
)
)
.aggregate()
.get();
}
Wenn jedoch meinen Code ausgeführt wird, die sich ergebende Array enthält die Elemente durch nur eine der RESTful Dienste zurückgegeben. Gibt es einen Konfigurationsschritt, den ich vermisse?
UPDATE
Die folgende Version entspricht der vollständigen Lösung unter Berücksichtigung Artem Kommentare.
@Bean
IntegrationFlow flow() throws Exception {
return IntegrationFlows.from("inputChannel-scatter")
.publishSubscribeChannel(s -> s.applySequence(true)
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider1.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather"))
.subscribe(f -> f
.handle(Http.outboundGateway("http://provider2.com/...")
.httpMethod(HttpMethod.GET)
.expectedResponseType(ItemDTO[].class))
.channel("inputChannel-gather")))
.get();
}
@Bean
IntegrationFlow gatherFlow() {
return IntegrationFlows.from("inputChannel-gather")
.aggregate(a -> a.outputProcessor(g -> new GenericMessage<ItemDTO[]>(
g.getMessages().stream()
.flatMap(m -> Arrays.stream((ItemDTO[]) m.getPayload()))
.collect(Collectors.toList()).toArray(new ItemDTO[0]))))
.get();
}
Vielen Dank für Sie Artem helfen. In der Tat habe ich versucht, Kanäle zu benutzen und den Fluss vorher zu trennen, ohne Erfolg, da ich auch das Problem mit dem Aggregator hatte. Ihre Antwort gab mir auch Hinweise, wie man den Aggregator schreibt. – user3329862
Großartig! Aber Scatter-Gather wäre eine gute Ergänzung zum DSL. Seien Sie also nicht schüchtern, um ein GH-Problem zu lösen! –