2015-07-20 2 views
8

Ich nehme Nachrichten von Amazon SQS Warteschlange. Ich habe Tausende von Nachrichten in der Warteschlange. Wenn ich die Anwendung starte (geschrieben in Java mit Springframework), fängt es an, Nachrichten von der Warteschlange abzufragen und nach dem Empfang von 500 Nachrichten stoppt es. Wenn ich die Anwendung erneut starte, verbraucht sie weitere 500 Nachrichten.Amazon SQS Java SDK stoppt nach dem Konsumieren von 500 Nachrichten

Mein Code ist wie ...

Anschluss Fabrik

@Bean 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactoryActiveMQ() { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(connectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

@Bean(name = "sqsJmsListenerContainerFactory") 
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(CustomDestinationResolver resolver) { 
    DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory(); 
    factory.setConnectionFactory(sqsConnectionFactory()); 
    factory.setConcurrency("3-15"); 
    factory.setReceiveTimeout(3000L); 
    return factory; 
} 

Listener

@JmsListener(containerFactory = "sqsJmsListenerContainerFactory", destination = "sqs.queue") 
public void onMessage(Message message) { 
    //Processing message 
} 

Ist alles, was ich brauche in amazon Warteschlange oder in Verbindung Fabrik Bohne konfigurieren ?
Danke :-)

Aktualisiert: Hinzugefügt Thread-Dump

Während Anwendung verbraucht Nachrichten
DefaultMessageListenerContainer in Thread-Dump wird wie

"[email protected]" prio=5 tid=0x18 nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x2230> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x2231> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.deleteMessage(AmazonSQSClient.java:1340) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.deleteMessage(AmazonSQSMessagingClientWrapper.java:127) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.acknowledge(AutoAcknowledger.java:33) 
     at com.amazon.sqs.javamessaging.acknowledge.AutoAcknowledger.notifyMessageReceived(AutoAcknowledger.java:42) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.messageHandler(SQSMessageConsumerPrefetch.java:477) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.receive(SQSMessageConsumerPrefetch.java:410) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumer.receive(SQSMessageConsumer.java:157) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveMessage(AbstractPollingMessageListenerContainer.java:413) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:293) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread in Thread-Dump wie

"[email protected]" daemon prio=5 tid=0x1b nid=NA runnable 
    java.lang.Thread.State: RUNNABLE 
     at java.net.SocketInputStream.socketRead0(SocketInputStream.java:-1) 
     at java.net.SocketInputStream.socketRead(SocketInputStream.java:116) 
     at java.net.SocketInputStream.read(SocketInputStream.java:170) 
     at java.net.SocketInputStream.read(SocketInputStream.java:141) 
     at sun.security.ssl.InputRecord.readFully(InputRecord.java:465) 
     at sun.security.ssl.InputRecord.read(InputRecord.java:503) 
     at sun.security.ssl.SSLSocketImpl.readRecord(SSLSocketImpl.java:961) 
     - locked <0x23a7> (a java.lang.Object) 
     at sun.security.ssl.SSLSocketImpl.readDataRecord(SSLSocketImpl.java:918) 
     at sun.security.ssl.AppInputStream.read(AppInputStream.java:105) 
     - locked <0x23a8> (a sun.security.ssl.AppInputStream) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.fillBuffer(AbstractSessionInputBuffer.java:160) 
     at org.apache.http.impl.io.SocketInputBuffer.fillBuffer(SocketInputBuffer.java:84) 
     at org.apache.http.impl.io.AbstractSessionInputBuffer.readLine(AbstractSessionInputBuffer.java:273) 
     at org.apache.http.impl.conn.LoggingSessionInputBuffer.readLine(LoggingSessionInputBuffer.java:116) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:140) 
     at org.apache.http.impl.conn.DefaultHttpResponseParser.parseHead(DefaultHttpResponseParser.java:57) 
     at org.apache.http.impl.io.AbstractMessageParser.parse(AbstractMessageParser.java:260) 
     at org.apache.http.impl.AbstractHttpClientConnection.receiveResponseHeader(AbstractHttpClientConnection.java:283) 
     at org.apache.http.impl.conn.DefaultClientConnection.receiveResponseHeader(DefaultClientConnection.java:251) 
     at org.apache.http.impl.conn.ManagedClientConnectionImpl.receiveResponseHeader(ManagedClientConnectionImpl.java:197) 
     at org.apache.http.protocol.HttpRequestExecutor.doReceiveResponse(HttpRequestExecutor.java:271) 
     at com.amazonaws.http.protocol.SdkHttpRequestExecutor.doReceiveResponse(SdkHttpRequestExecutor.java:66) 
     at org.apache.http.protocol.HttpRequestExecutor.execute(HttpRequestExecutor.java:123) 
     at org.apache.http.impl.client.DefaultRequestDirector.tryExecute(DefaultRequestDirector.java:682) 
     at org.apache.http.impl.client.DefaultRequestDirector.execute(DefaultRequestDirector.java:486) 
     at org.apache.http.impl.client.AbstractHttpClient.doExecute(AbstractHttpClient.java:863) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:82) 
     at org.apache.http.impl.client.CloseableHttpClient.execute(CloseableHttpClient.java:57) 
     at com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:685) 
     at com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:460) 
     at com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:295) 
     at com.amazonaws.services.sqs.AmazonSQSClient.invoke(AmazonSQSClient.java:2291) 
     at com.amazonaws.services.sqs.AmazonSQSClient.receiveMessage(AmazonSQSClient.java:1021) 
     at com.amazon.sqs.javamessaging.AmazonSQSMessagingClientWrapper.receiveMessage(AmazonSQSMessagingClientWrapper.java:319) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.getMessages(SQSMessageConsumerPrefetch.java:216) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:180) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 

wenn Anwendung stoppt Nachrichten
ConsumerPrefetchThread in Thread-Dump raubend ist wie

"[email protected]" prio=5 tid=0x18 nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
     at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
     at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
     at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
     at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
     at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 
     at sun.reflect.GeneratedMethodAccessor240.invoke(Unknown Source:-1) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:497) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:185) 
     at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:104) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:90) 
     at org.springframework.jms.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:66) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:674) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:634) 
     at org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:605) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:308) 
     at org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:246) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1144) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1136) 
     at org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1033) 
     at java.lang.Thread.run(Thread.java:745) 


ConsumerPrefetchThread in Thread-Dump ist wie

"[email protected]" daemon prio=5 tid=0x1b nid=NA waiting 
    java.lang.Thread.State: WAITING 
     at java.lang.Object.wait(Object.java:-1) 
     at java.lang.Object.wait(Object.java:502) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.waitForPrefetch(SQSMessageConsumerPrefetch.java:273) 
     at com.amazon.sqs.javamessaging.SQSMessageConsumerPrefetch.run(SQSMessageConsumerPrefetch.java:174) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
     at java.lang.Thread.run(Thread.java:745) 
+0

Versuchen Sie, einen Thread Dump zu nehmen, wenn Ihr Verbraucher stoppt, um zu sehen, was die Container-Threads tun. Vielleicht ist es in Ihrem Code stecken geblieben? –

+0

@GaryRussell Ich habe Thread Dump hinzugefügt. In dieser Anwendung verwende ich auch ActiveMQ. Ich dränge Nachrichten, die ich von SQS zu den ActiveMQ-Warteschlangen bekommen werde. – Piyush

Antwort

3

wie eine Art Pool Erschöpfung Sieht in Ihrem Code ...

at org.apache.commons.pool.impl.GenericKeyedObjectPool.borrowObject(GenericKeyedObjectPool.java:1151) 
    at org.apache.activemq.jms.pool.ConnectionPool.createSession(ConnectionPool.java:133) 
    at org.apache.activemq.jms.pool.PooledConnection.createSession(PooledConnection.java:167) 
    at com.ac.jms.senders.AbstractNoResponseSender.request(AbstractNoResponseSender.java:40) 
    at com.ac.mic.listener.AbstractMicQueueListener.onMessage(AbstractMicQueueListener.java:117) 
    at com.ac.mic.listener.MicQueueListener.onMessage(MicQueueListener.java:40) 

Der Container-Thread bleibt stecken, versucht, eine Sitzung von PooledConnection zu bekommen.

Vielleicht geben Sie keine Sitzungen an den Pool zurück?

Verwenden Sie eine JmsTemplate anstelle Ihres eigenen Codes, um mit JMS zu sprechen. Es vermeidet solche Probleme.

+1

Ich habe dieses Problem behoben. Ich habe Sitzung und Verbindung nach dem Senden der Nachricht nicht geschlossen. Ich habe gelernt, Thread-Dump zu verwenden. Danke @GaryRussell :) – Piyush

Verwandte Themen