2012-03-27 4 views
2

Wir haben eine ActiveMQ/Camel-Konfiguration, die bisher ausschließlich Nachrichtenwarteschlangen mit gleichzeitigen Consumern verwendet.ActiveMQ: Korrekte Konfiguration mit Queues (mit gleichzeitigen Consumern) und Topics

Wir führen jedoch jetzt Nachrichtenthemen ein und stellen fest, dass - aufgrund der gleichzeitigen Consumer - Nachrichten, die in dem Thema empfangen werden, mehrere Male konsumiert werden.

Wie lautet die richtige Konfiguration für dieses Szenario?

dh wir wollen mehrere gleichzeitige Verbraucher für Nachrichten in einer Warteschlange empfangen, aber nur einen einzigen Verbraucher für Nachrichten zu einem Thema definiert definiert.

Hier ist die aktuelle Konfiguration:

<amq:connectionFactory id="amqConnectionFactory" 
    useAsyncSend="true" brokerURL="${${ptl.Servername}.jms.cluster.uri}" 
    userName="${jms.username}" password="${jms.password}" sendTimeout="1000" 
    optimizeAcknowledge="true" disableTimeStampsByDefault="true"> 
</amq:connectionFactory> 

<bean id="cachingConnectionFactory" 
    class="org.springframework.jms.connection.CachingConnectionFactory"> 
    <property name="targetConnectionFactory" ref="amqConnectionFactory"></property> 
    <property name="cacheConsumers" value="true"></property> 
    <property name="cacheProducers" value="true"></property> 
    <property name="reconnectOnException" value="true"></property> 
    <property name="sessionCacheSize" value="${jms.sessioncachesize}"></property> 
</bean> 

<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" /> 
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

Antwort

3

können Sie explizit concurrentConsumers/maxConcurrentConsumers zu "1" für jedes Thema Verbraucher.

from("activemq:topic:myTopic?concurrentConsumers=1&maxConcurrentConsumers=1")... 

Alternativ setzte die JmsConfiguration Concurrent/maxConcurrentConsumers Eigenschaften auf „1“ und dann explizit gleichzeitigen Verbrauch für Warteschlangen ermöglichen je nach Bedarf.

from("activemq:queue:myQueue?maxConcurrentConsumers=5")... 

auch, können Sie Virtual Topics benutzen ohne gleichzeitigen Konsum von Topic Nachrichten auszuführen Duplikate zu bekommen (hoch im Vergleich zu traditionellen Themen empfohlen)

0

Die Lösung, die ich am Ende mit war ein separaten jmsConfig/activeMQ Konfigurationsblock zu erstellen.

Die Gesamt configration sieht wie folgt aus:

<!-- This is appropriate for consuming Queues, but not topics. For topics, use 
jmsTopicConfig/activemqTopics --> 
<bean id="jmsConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="${jms.concurrentConsumer}" /> 
    <property name="maxConcurrentConsumers" value="${jms.max.concurrentConsumer}" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemq" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsConfig" /> 
</bean> 

<!-- This config limits to a single concurrent consumer. This config is appropriate for 
consuming Topics, not Queues. --> 
<bean id="jmsTopicConfig" class="org.apache.camel.component.jms.JmsConfiguration"> 
    <property name="connectionFactory" ref="cachingConnectionFactory" /> 
    <property name="transacted" value="false" /> 
    <property name="concurrentConsumers" value="1" /> 
    <property name="maxConcurrentConsumers" value="1" /> 
    <property name="preserveMessageQos" value="true" /> 
    <property name="timeToLive" value="${jms.timeToLive}" /> 
</bean> 

<bean id="activemqTopics" class="org.apache.activemq.camel.component.ActiveMQComponent"> 
    <property name="configuration" ref="jmsTopicConfig" /> 
</bean> 

Dann wird in der Kamel-Pipeline, das Thema aus der activemqTopics bean raubend wie folgt:

<camel:route id="myTopicResponder"> 
    <camel:from uri="activemqTopics:topic:stockQuotes?concurrentConsumers=1" /> 
    <camel:to uri="bean:stockQuoteResponder?method=saveStockQuote"/> 
</camel:route> 
Verwandte Themen