2016-04-27 2 views
1

Ich habe ein benutzerdefiniertes Senken-Modul und ich möchte nur die Nachrichten von der input verbrauchen, wenn die Anzahl der Nachrichten erreicht eine Zählung oder wenn sie seit einiger Zeit im Kanal sind. Kurz gesagt, ich möchte einen Bulk-Push machen.Verbrauchen von Kanal nur, wenn die Anzahl der Nachrichten eine Anzahl erreicht oder die Nachricht seit einer Weile im Kanal ist

Ich habe versucht, die Anzahl der Nachrichten nach dem Konsumieren und Speichern sie in einem aggregierten Kanal von SimpleMessageStore gesichert und MessageGroupStoreReaper für Nachrichten im Kanal zu überprüfen.

Ich bin mit diesem Ansatz nicht zufrieden, da ich die Nachrichten konsumiere und sie in einem In-Memory-Speicher ablege, ich kenne den JDBC-Speicher ebenfalls, aber ich möchte diesem Ansatz nicht als Nachrichtenkanäle folgen im Frühjahr XD werden von redis/mq unterstützt Ich möchte aus dem input-Kanal basierend auf meinen Bedingungen verbrauchen.

Meine aktuelle Bean-Konfiguration ist wie folgt:

<int:aggregator id="messageAggregator" ref="messageAggregatorBean" 
    method="aggregate" input-channel="input" output-channel="aggregatorOutputChannel" 
    release-strategy="messageReleaseStrategyBean" release-strategy-method="canRelease" 
    send-partial-result-on-expiry="true" message-store="resultMessageStore"> 
</int:aggregator> 

<int:service-activator id="contributionIndexerService" 
    ref="contributionIndexerBean" method="bulkIndex" input-channel="aggregatorOutChannel" /> 

<bean id="resultMessageStore" 
    class="org.springframework.integration.store.SimpleMessageStore" /> 

<bean id="resultMessageStoreReaper" 
    class="org.springframework.integration.store.MessageGroupStoreReaper"> 
    <property name="messageGroupStore" ref="resultMessageStore" /> 
    <property name="timeout" value="60000" /> 
</bean> 

<task:scheduled-tasks> 
    <task:scheduled ref="resultMessageStoreReaper" method="run" 
     fixed-rate="10000" /> 
</task:scheduled-tasks> 

Irgendwelche Gedanken oder Kommentare?

Vielen Dank im Voraus.

Antwort

0

Ich bin mir nicht sicher, ob Sie die Anzahl der Nachrichten in der Warteschlange des Brokers (Redis/RabbitMQ oder auch nur normales JMS) bestimmen können, mehr darüber, wie viel sie dort waren.

Sie sollten auf jeden Fall verbrauchen sie eine solche Logik zu tun.

Und ja, ich denke, Aggregator kann Ihnen helfen. Aber richtig: das muss ein Persistent Message Store sein.

für den Fall

wenn sie sich seit einiger Zeit im Kanal sind

Die Aggregator vorschlagen eine Option wie group-timeout zu Release jene Gruppen, die die releaseStrategy Zustand nicht erreicht haben, aber Sie möchten sie trotzdem irgendwann ausstrahlen: http://docs.spring.io/spring-integration/reference/html/messaging-routing-chapter.html#agg-and-group-to

Verwandte Themen