Ich habe Spring Boot verwendet und alle XML
Dateien in meinem Projekt loszuwerden. Leider verwendet es auch Spring-Integration, die aus meiner Erfahrung sehr stark XML
basiert.Spring Integration Poll Aggregator programmatisch
Ich habe ein Szenario, das erfordert, dass ich einen Aggregator habe, und lassen Sie diesen Aggregator alle x
Menge von Sekunden abgefragt werden.
Dies kann getan werden, unter Verwendung von XML wie so (zB aus einer früheren Frage SO genommen):
<!--
the poller will process 100 messages every minute
if the size of the group is 100 (the poll reached the max messages) or 60 seconds time out (poll has less than 100 messages) then the payload with the list of messages is passed to defined output channel
-->
<int:aggregator input-channel="logEntryChannel" output-channel="logEntryAggrChannel"
send-partial-result-on-expiry="true"
group-timeout="60000"
correlation-strategy-expression="T(Thread).currentThread().id"
release-strategy-expression="size() == 100">
<int:poller max-messages-per-poll="100" fixed-rate="60000"/>
</int:aggregator>
Ich habe es geschafft, eine Klasse zu finden, die ein bisschen sorta funktioniert der Trick und es ist bean Definition lautet:
@Bean(name = "aggregatingMessageHandler")
public AggregatingMessageHandler aggregatingMessageHandler() {
AggregatingMessageHandler aggregatingMessageHandler =
new AggregatingMessageHandler(messageGroupProcessorBean(),
new SimpleMessageStore(10));
aggregatingMessageHandler.setCorrelationStrategy(customCorrelationStrategyBean());
aggregatingMessageHandler.setReleaseStrategy(
new TimeoutCountSequenceSizeReleaseStrategy(3,
TimeoutCountSequenceSizeReleaseStrategy.DEFAULT_TIMEOUT));
aggregatingMessageHandler.setExpireGroupsUponCompletion(true);
aggregatingMessageHandler.setOutputChannel(outputAggregatedChannelBean());
return aggregatingMessageHandler;
}
jedoch löst dies die canRelease()
Verfahren der ReleaseStrategy
nur, wenn eine neue Nachricht in den mit diesem Handler assoziiert inboundChannel
empfangen wird, und nicht in einem festen Zeitintervall, das nicht das gewünschte Ergebnis ist. Ich möchte, dass alle Gruppen, die älter als eine Minute sind, auf den Ausgabekanal umgeleitet werden. Meine Frage ist - gibt es eine Möglichkeit, einen Poller wie den in der XML-Definition programmatisch anzuhängen?
Wie @artem gesagt hat, hat der Poller nichts mit der Vervollständigung zu tun (abgesehen davon, wann er eine Nachricht vom Kanal erhält und die Gruppe vervollständigen kann). Dies geschieht durch das 'Group-Timeout'. Sie können einen 'groupTimeoutExpression' (SpEL-Ausdruck) festlegen, um zu veranlassen, dass die Gruppe asynchron Zeitüberschreitung verursacht. 'neuer SpelExpressionParser(). parseExpression (" 60000 ")'. –
Danke, das hat genau das gemacht, was ich wollte. –