2016-11-22 3 views
0

Ich habe Spring Boot verwendet und alle XML Dateien in meinem Projekt loszuwerden. Leider verwendet es auch Spring-Integration, die aus meiner Erfahrung sehr stark XML basiert.Spring Integration Poll Aggregator programmatisch

Ich habe ein Szenario, das erfordert, dass ich einen Aggregator habe, und lassen Sie diesen Aggregator alle x Menge von Sekunden abgefragt werden.

Dies kann getan werden, unter Verwendung von XML wie so (zB aus einer früheren Frage SO genommen):

<!-- 
    the poller will process 100 messages every minute 
    if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel 
--> 
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel" 
    send-partial-result-on-expiry="true" 
    group-timeout="60000" 
    correlation-strategy-expression="T(Thread).currentThread().id" 
    release-strategy-expression="size() == 100"> 
    <int:poller max-messages-per-poll="100" fixed-rate="60000"/> 
</int:aggregator> 

Ich habe es geschafft, eine Klasse zu finden, die ein bisschen sorta funktioniert der Trick und es ist bean Definition lautet:

@Bean(name = "aggregatingMessageHandler") 
public AggregatingMessageHandler aggregatingMessageHandler() { 

    AggregatingMessageHandler aggregatingMessageHandler = 
      new AggregatingMessageHandler(messageGroupProcessorBean(), 
        new SimpleMessageStore(10)); 

aggregatingMessageHandler.setCorrelationStrategy(customCorrelationStrategyBean()); 

    aggregatingMessageHandler.setReleaseStrategy(
      new TimeoutCountSequenceSizeReleaseStrategy(3, 
        TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_TIMEOUT)); 

    aggregatingMessageHandler.setExpireGroupsUponCompletion(true); 

    aggregatingMessageHandler.setOutputChannel(outputAggregatedChannelBean()); 

    return aggregatingMessageHandler; 
} 

jedoch löst dies die canRelease() Verfahren der ReleaseStrategy nur, wenn eine neue Nachricht in den mit diesem Handler assoziiert inboundChannel empfangen wird, und nicht in einem festen Zeitintervall, das nicht das gewünschte Ergebnis ist. Ich möchte, dass alle Gruppen, die älter als eine Minute sind, auf den Ausgabekanal umgeleitet werden. Meine Frage ist - gibt es eine Möglichkeit, einen Poller wie den in der XML-Definition programmatisch anzuhängen?

Antwort

1

Für Java & Annotation Konfiguration werfen Sie einen Blick here und here.

Die Aggregator Komponente hat AggregatorFactoryBean für einfachere Java-Konfiguration.

Wie auch immer, Sie müssen darauf achten, dass es eine @ServiceActivator Annotation zusammen mit einer @Bean für diese Handlerdefinition gibt. Und genau @ServiceActivator hat poller Attribut.

Achten Sie auch darauf, dass es eine Java DSL für Spring Integration gibt.

Ein anderer Teil Ihrer Frage ist ein bisschen Verwirrung. Die poller ist vollständig nicht mit der Freigabestrategie verbunden. Seine Verantwortung in diesem Fall, Nachrichten von der PollableChannel zu erhalten, ist das logEntryChannel. Und erst danach werden bereits abgefragte Nachrichten an den Aggregator zur Korrelations- und Freigabelogik gesendet.

Was in diesem Beispiel getan wird, ist völlig andere Geschichte und wir können es in der separaten SO Thread diskutieren.

+1

Wie @artem gesagt hat, hat der Poller nichts mit der Vervollständigung zu tun (abgesehen davon, wann er eine Nachricht vom Kanal erhält und die Gruppe vervollständigen kann). Dies geschieht durch das 'Group-Timeout'. Sie können einen 'groupTimeoutExpression' (SpEL-Ausdruck) festlegen, um zu veranlassen, dass die Gruppe asynchron Zeitüberschreitung verursacht. 'neuer SpelExpressionParser(). parseExpression (" 60000 ")'. –

+0

Danke, das hat genau das gemacht, was ich wollte. –