2017-11-14 5 views
0

Wir stehen vor einem seltsamen Problem, in dem die activemq Verbraucher für einige zufällige Warteschlangen sinken, bis sie 0 werden, nach denen sie nicht wiederherstellen können. Sobald dies passiert, müssen wir die Consumer-Anwendung erneut bereitstellen, um die Verarbeitung zu starten. Wir haben seit einiger Zeit mit diesem Problem zu kämpfen, konnten aber die Ursache nicht herausfinden.Active mq Verbraucher zählen abnehmend auf 0

activemq Broker Version 5.14.5

finden Sie die Verbindungskonfiguration.

<bean id="activeMQIconnectConnectionFactory" class="test.ActiveMQIconnectConnectionFactory"> 
     <property name="brokerURL" value="failover:(tcp://localhost:61616)"/> 
     <property name="prefetchPolicy" ref="prefetchPolicy"/> 
     <property name="redeliveryPolicy" ref="redeliveryPolicy"/> 
     <property name="trustAllPackages" value="true"/> 
     <!-- http://activemq.apache.org/consumer-dispatch-async.html 
     The default setting is dispatchAsync=true 
     If you want better thoughput and the chances of having a slow consumer are low, you may want to change this to false. 
     --> 
     <property name="dispatchAsync" value="true"/> 
     <!-- 
     whether or not timestamps on messages should be disabled or not. If you disable them it adds a small performance boost. 
     Default is false 
     --> 
     <property name="disableTimeStampsByDefault" value="true"/> 

     <!-- http://activemq.apache.org/optimized-acknowledgement.html 
     This option is disabled by default but can be used to improve throughput in some circumstances as it decreases load on the broker. 
     --> 
     <property name="optimizeAcknowledge" value="true"/> 
     <!-- Default 300ms 
     For us, 5 sec. 
     --> 
     <property name="optimizeAcknowledgeTimeOut" value="5000"/> 
     <property name="useAsyncSend" value="true"/> 
     <property name="exceptionListener" ref="jmsExceptionListener"/> 
    </bean> 

<bean id="testQueue" class="org.apache.activemq.command.ActiveMQQueue"> 
     <constructor-arg index="0" value="test.queue"/> 
    </bean> 

    <bean id="jmsProducerFactoryPool" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" 
      init-method="start"> 
     <property name="connectionFactory" ref="activeMQIconnectConnectionFactory"/> 
     <property name="maxConnections" value="10"/> 
     <property name="maximumActiveSessionPerConnection" 
        value="1000"/> 
     <property name="createConnectionOnStartup" value="true"/> 
    </bean> 

    <bean id="jmsConsumerFactoryPool" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop" 
      init-method="start"> 
     <property name="connectionFactory" ref="activeMQIconnectConnectionFactory"/> 
     <property name="maxConnections" value="10"/> 
     <property name="maximumActiveSessionPerConnection"    value="1000"/> 
     <property name="createConnectionOnStartup" value="true"/> 
     <property name="reconnectOnException" value="true"/> 
     <property name="idleTimeout" value="86400000"/> 
    </bean> 

    <bean id="redeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy"> 
     <property name="maximumRedeliveries" value="1"/> 
     <property name="queue" value="*"/> 
    </bean> 

    <bean id="prefetchPolicy" class="org.apache.activemq.ActiveMQPrefetchPolicy"> 
     <property name="queuePrefetch" value="500"/> 
    </bean> 

    <bean id="jmsTemplate" class="com.minda.iconnect.jms.impl.TimedJmsTemplate"> 
     <property name="connectionFactory" ref="jmsProducerFactoryPool"/> 
     <property name="defaultDestinationName" value="iconnect.queue"/> 
     <property name="deliveryPersistent" value="true"/> 
     <!-- I think this is ingored if explicitQosEnabled is not set --> 
    </bean> 

    <bean id="simpleMessageConverter" class="org.springframework.jms.support.converter.SimpleMessageConverter"/> 

<bean id="testProducer" 
      class="com.test.TestProducer"> 
     <property name="consumerDestination" ref="testQueu"/> 
     <property name="jmsTemplate" ref="jmsTemplate"/> 
     <property name="messageConverter" ref="simpleMessageConverter"/> 
    </bean> 

    <bean id="testContainer" class="org.springframework.jms.listener.DefaultMessageListenerContainer"> 
     <property name="connectionFactory" ref="jmsConsumerFactoryPool"/> 
     <property name="destination" ref="testS"/> 
     <property name="messageListener"> 
      <bean class="org.springframework.jms.listener.adapter.MessageListenerAdapter"> 
       <property name="delegate" ref="testConsumer"/> 
       <property name="defaultListenerMethod" value="process"/> 
       <property name="messageConverter" ref="simpleMessageConverter"/> 
      </bean> 
     </property> 
     <property name="concurrentConsumers" value="50"/> 
     <property name="maxConcurrentConsumers" value="100"/> 
     <property name="sessionTransacted" value="false"/> 
     <property name="autoStartup" value="true"/> 
    </bean> 
</beans> 

Klasse für connection

public class ActiveMQIconnectConnectionFactory extends org.apache.activemq.ActiveMQConnectionFactory 
{ 
    private static final Logger LOG = LoggerFactory.getLogger(ActiveMQIconnectConnectionFactory.class); 

    @Override 
    public void setBrokerURL(String brokerURL) 
    { 
     // LOG when connecting to activemq 
     // using this setter to be sure it's only logged once 
     // See DJ-5780 
     LOG.info("ActiveMQ configured is: " + (DEFAULT_BROKER_URL.equals(brokerURL) ? "(default init setting) " : "") + brokerURL); 
     LOG.info("Connecting to ActiveMQ"); 
     super.setBrokerURL(brokerURL); 
    } 
} 

Bis jetzt haben wir für Timeouts etc um die Parameter gespielt, aber nicht viel Glück. Wir vermuten, dass das Problem aufgrund eines Verbindungsproblems oder der Handhabung von Verbindungen über DMLC auftritt, konnten das Problem jedoch nicht identifizieren. Hilfe wird sehr geschätzt!

Antwort

0

Ich denke, dass Ihr Problem eine Mischung aus Spring DMLC- und AMQ-Verhalten ist, basierend auf Ihren Konfigurationen, die sich gegenseitig beeinflussen.

Versuch durch Änderung:

<property name="optimizeAcknowledgeTimeOut" value="500"/> 

UND

org.springframework.jms.listener.DefaultMessageListenerContainer.setReceiveTimeout (0);

Oder

org.springframework.jms.listener.DefaultMessageListenerContainer.setReceiveTimeout(10000); 
org.springframework.jms.listener.DefaultMessageListenerContainer.setIdleTaskExecutionLimit(100); 

https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/AbstractPollingMessageListenerContainer.html#setReceiveTimeout-long-

https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/jms/listener/DefaultMessageListenerContainer.html#setIdleTaskExecutionLimit-int-

public void setIdleTaskExecutionLimit (int idleTaskExecutionLimit) die Grenze für Leerlauf Ausführungen eines Verbraucher angeben Aufgabe, keine Nachricht innerhalb seiner Ausführung erhalten. Wenn dieses Limit erreicht ist, wird die Task heruntergefahren und der Empfang von an andere ausführende Tasks wird fortgesetzt. Der Standardwert ist 1 und schließt die Leerlaufressourcen früh, sobald eine Aufgabe keine Nachricht erhalten hat. Dies gilt nur für die dynamische Planung ; Siehe die Einstellung "maxConcurrentConsumers". Die minimale Anzahl von Verbrauchern (siehe "ConcurrentConsumers") wird bis zur Außerbetriebnahme auf jeden Fall um gehalten.

Innerhalb jeder Aufgabenausführung, versucht, eine Anzahl von Nachrichtenempfang (nach der „maxMessagesPerTask“ Einstellung) wird für eine eingehende Nachricht wartet jeweils (entsprechend den „ReceiveTimeout“ -Einstellung). Wenn alle von denen, die Versuche in einer bestimmten Aufgabe erhalten, ohne eine Nachricht zurückkehren, wird die Aufgabe in Bezug auf empfangene Nachrichten als inaktiv betrachtet. Solch ein Task kann immer noch neu geplant werden; Sobald es jedoch die angegebene "idleTaskExecutionLimit" erreicht hat, wird es heruntergefahren (im Falle der dynamischen Skalierung).

Erhöhen Sie dieses Limit, wenn Sie zu häufig auf und ab skalieren. Wenn diese Grenze höher ist, wird ein leerer Benutzer länger um herumgehalten, wodurch der Neustart eines Verbrauchers vermieden wird, sobald eine neue Ladung von Nachrichten hereinkommt. Alternativ können Sie einen höheren Wert für "maxMessagesPerTask" und/oder "receiveTimeout" angeben. Dies führt auch dazu, dass Leerlaufverbraucher länger gehalten werden (während gleichzeitig auch die durchschnittliche Ausführungszeit jeder geplanten Aufgabe erhöht wird).

http://activemq.apache.org/performance-tuning.html

Optimierte quittieren Wenn Nachrichten in Auto bestätigen Modus (gesetzt, wenn die Sitzung der Verbraucher zu schaffen) raubend, ActiveMQ Empfang von Nachrichten zurück an den Broker in Chargen bestätigen können (zu verbessern Leistung). Die Stapelgröße beträgt 65% des Vorabruflimits für den Consumer. Auch wenn der Nachrichtenverbrauch langsam ist, wird der Stapel alle 300 ms gesendet. Sie wechseln die Batch-Bestätigung, indem Sie in der ActiveMQ ConnectionFactory auf optimizeAcknowledge = true setzen.

http://activemq.apache.org/what-is-the-prefetch-limit-for.html

Sobald der Broker eine Prefetch-Grenze Anzahl der Nachrichten zu Verbraucher es wird keine weiteren Nachrichten versenden an diesen Verbraucher bis zum Verbraucher hat bestätigt, mindestens 50% der Tinte hat die Prefetch-Nachrichten , z. B. Prefetch/2, die es erhalten hat. Wenn der Broker die Bestätigungen empfangen hat, wird er eine weitere Vorabruf-/2 Anzahl von Nachrichten an den Verbraucher senden, um sozusagen seinen Vorabspeicherpuffer "aufzufüllen". Beachten Sie, dass es möglich ist, ein Vorabruflimit für jeden Kunden festzulegen (siehe unten).

Verwandte Themen