2016-06-22 9 views
1

Ich habe Probleme mit async RabbitMQ Nachricht mit Callback-Nachricht.RabbitTemplate Async-Nachricht mit Rückruf

Hier sind ein Teil meines Produzenten:

@Autowired(required = false) 
@Qualifier("rabbitTemplate") 
private RabbitTemplate queueTemplate; 

@Override 
public Response createIncident(String incident) { 
    LOGGER.info("Sending incident into queue"); 
    queueTemplate.convertAndSend((Object)incident, new MessagePostProcessor() { 

     @Override 
     public Message postProcessMessage(Message message) throws AmqpException { 
      message.getMessageProperties().setReplyTo("replyQueue"); 
      return message; 
     } 
    }); 
    Message message = queueTemplate.receive(); 
... 

Frühling RabbitMQ Kontext

<rabbit:admin connection-factory="rabbitConnFactory" /> 

<rabbit:queue name="rabbitQueue" /> 
<rabbit:queue name="replyQueue" /> 

<rabbit:topic-exchange name="rabbitExchange"> 
    <rabbit:bindings> 
     <rabbit:binding queue="rabbitQueue" pattern="camel" /> 
     <rabbit:binding queue="replyQueue" pattern="camel" /> 
    </rabbit:bindings> 
</rabbit:topic-exchange> 


<rabbit:template id="rabbitTemplate" 
    connection-factory="rabbitConnFactory" exchange="rabbitExchange" routing-key="camel" 
    reply-address="replyQueue" reply-queue="replyQueue"> 
    <rabbit:reply-listener/> 
</rabbit:template> 

<rabbit:listener-container 
    connection-factory="rabbitConnFactory" concurrency="10"> 
    <rabbit:listener ref="rabbitMessageListener" method="createIncident" 
     queue-names="rabbitQueue" /> 
</rabbit:listener-container> 

<bean id="rabbitMessageListener" class="com.my.listener.QueueListener" /> 

und Hörer, die erhalten sollten und darauf zu reagieren

@Autowired(required = false) 
@Qualifier("rabbitTemplate") 
private RabbitTemplate queueTemplate; 

public void createIncident() { 
    queueTemplate.receiveAndReply("replyQueue", new ReceiveAndReplyMessageCallback() { 

     @Override 
     public Message handle(Message message) { 
      String incident = new String(message.getBody()); 
      LOGGER.debug("Queue incoming message: " + incident); 
      String result = "" + messageHandler.createIncident(incident); 
      return new Message(result.getBytes(), new MessageProperties()); 
     } 

    }); 

Es will nicht nach innen erhalten Diese Handle-Nachricht im Listener. Und Applikationsbehälter Display-Meldung

2016-06-22 14: 15: 30.457 ERROR [AbstractFaultChainInitiatorObserver: 115] Fehler bei den Fehlern Handhabung aufgetreten ist, aufgeben! org.apache.cxf.interceptor.Fault: Keine 'Queue' angegeben. Überprüfen Sie die Konfiguration von RabbitTemplate. bei org.apache.cxf.service.invoker.AbstractInvoker.createFault (AbstractInvoker.java:170) bei org.apache.cxf.service.invoker.AbstractInvoker.invoke (AbstractInvoker.java:136) bei org.apache .cxf.jaxrs.JAXRSInvoker.invoke (JAXRSInvoker.java:204) bei org.apache.cxf.jaxrs.JAXRSInvoker.invoke (JAXRSInvoker.java:101) bei org.apache.cxf.interceptor.ServiceInvokerInterceptor $ 1.run (ServiceInvokerInterceptor.java:58) bei org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage (ServiceInvokerInterceptor.java:94) bei org.apache.cxf.phase.PhaseInterceptorChain.doIntercept (PhaseInterceptorChain.java:272) um org.apache.cxf.transport.ChainInitiationObserver.onMessage (ChainInitiationObserver.java:121) bei org.apache.cxf.transport.http.AbstractHTTPDestination.invoke (AbstractHTTPDestination.java:249) bei org.apache.cxf .transport.servlet.ServletController.invokeDestination (ServletController.java:248) bei org.apache.cxf.transport.servlet.ServletController.invoke (ServletController.java:222) bei org.apache.cxf.transport.servlet .ServletController.invoke (ServletController.java:153) bei org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke (CXFNonSpringServlet.java:171) bei org.apache.cxf.transpo rt.servlet.AbstractHTTPServlet.handleRequest (AbstractHTTPServlet.java:289) bei org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost (AbstractHTTPServlet.java:209) bei javax.servlet.http.HttpServlet.service (HttpServlet .java-: 650) bei org.apache.cxf.transport.servlet.AbstractHTTPServlet.service (AbstractHTTPServlet.java:265) bei org.apache.catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain.java:303) bei org.apache.catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain.java:208) bei org.apache.tomcat.websocket.server.WsFilter.doFilter (WsFilter.java:52) bei org.apache.c atalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain.java:241) um org.apache.catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain.Java: 208) bei com.mobilitymedia.connected.http.core.LoggerFilter.doFilter (LoggerFilter.java:41) bei org.apache.catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain.java:241) bei org.apache.catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain.java:208) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:330) bei org.springframework.security .web.access.intercept.FilterSecurityInterceptor.invoke (FilterSecurityInterceptor.java:118) unter org.springframework.security.web.access.intercept.FilterSecurityInterceptor.doFilter (Fil terSecurityInterceptor.java:84) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) bei org.springframework.security.web.access.ExceptionTranslationFilter.doFilter (ExceptionTranslationFilter.java: 113) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) bei org.springframework.security.web.session.SessionManagementFilter.doFilter (SessionManagementFilter.java:103) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) unter org.springframework.secu rity.web.authentication.AnonymousAuthenticationFilter.doFilter (AnonymousAuthenticationFilter.java:113) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) bei org.springframework.security.web. servletapi.SecurityContextHolderAwareRequestFilter.doFilter (SecurityContextHolderAwareRequestFilter.java:154) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) bei org.springframework.security.web.savedrequest.RequestCacheAwareFilter. doFilter (RequestCacheAwareFilter.java:45) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainPro xy.java:342) bei org.springframework.security.web.authentication.preauth.AbstractPreAuthenticatedProcessingFilter.doFilter (AbstractPreAuthenticatedProcessingFilter.java:94) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy. Java: 342) bei org.springframework.security.web.context.request.async.WebAsyncManagerIntegrationFilter.doFilterInternal (WebAsyncManagerIntegrationFilter.java:50) bei org.springframework.web.filter.OncePerRequestFilter.doFilter (OncePerRequestFilter.java: 107) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) bei 012.org.springframework.security.web.context.SecurityContextPersistenceFilter.doFilter (SecurityContextPersistenceFilter.java:87) bei org.springframework.security.web.FilterChainProxy $ VirtualFilterChain.doFilter (FilterChainProxy.java:342) bei org.springframework .security.web.FilterChainProxy.doFilterInternal (FilterChainProxy.java:192) bei org.springframework.security.web.FilterChainProxy.doFilter (FilterChainProxy.java:160) bei org.springframework.web.filter.DelegatingFilterProxy.invokeDelegate (DelegatingFilterProxy.java:344) bei org.springframework.web.filter.DelegatingFilterProxy.doFilter (DelegatingFilterProxy.Java: 261) bei org.apache.catalina.core.ApplicationFilterChain.internalDoFilter (ApplicationFilterChain.java:241) bei org.apache.catalina.core.ApplicationFilterChain.doFilter (ApplicationFilterChain.java:208) bei org .apache.catalina.core.StandardWrapperValve.invoke (StandardWrapperValve.java:220) bei org.apache.catalina.core.StandardContextValve.invoke (StandardContextValve.java:122) bei org.apache.catalina.authenticator.AuthenticatorBase .invoke (AuthenticatorBase.java:505) um org.apache.catalina.core.StandardHostValve.invoke (StandardHostValve.java:169) um org.apache.catalina.valves.ErrorReportValve.invoke (ErrorReportValve.java:103) bei org.apache.catalina.valves.AccessLogValve.invoke (AccessLogValve.java:956) bei org.apache.catalina.core .StandardEngineValve.invoke (StandardEngineValve.java:116) bei org.apache.catalina.connector.CoyoteAdapter.service (CoyoteAdapter.java:436) bei org.apache.coyote.http11.AbstractHttp11Processor.process (AbstractHttp11Processor.java : 1078) bei org.apache.coyote.AbstractProtocol $ AbstractConnectionHandler.process (AbstractProtocol.java:625) bei org.apache.tomcat.util.net.JIoEndpoint $ SocketProcessor.run (JIoEndpoint.java:316) bei java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1142) bei java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java:617) bei org.apache.tomcat.util .threads.TaskThread $ WrappingRunnable.run (TaskThread.java:61) bei java.lang.Thread.run (Thread.java:745) Verursacht von: org.springframework.amqp.AmqpIllegalStateException: Keine 'Warteschlange' angegeben. Überprüfen Sie die Konfiguration von RabbitTemplate. bei org.springframework.amqp.rabbit.core.RabbitTemplate.getRequiredQueue (RabbitTemplate.java:1514) bei org.springframework.amqp.rabbit.core.RabbitTemplate.receive (RabbitTemplate.java:802) bei com. myService.IncidentServiceDefaultImpl.createIncident (IncidentServiceDefaultImpl.java:36) bei sun.reflect.NativeMethodAccessorImpl.invoke0 (native Methode) bei sun.reflect.NativeMethodAccessorImpl.invoke (NativeMethodAccessorImpl.java:62) bei sun.reflect.DelegatingMethodAccessorImpl. aufrufen (DelegatingMethodAccessorImpl.java:43) bei java.lang.reflect.Method.invoke (Method.java:498) bei org.apache.cxf.service.invoker.AbstractInvoker.performInvocation (Abstract Invoker.java:188) bei org.apache.cxf.service.invoker.AbstractInvoker.invoke (AbstractInvoker.java:104) ... 65 mehr

+0

Geben Sie den vollständigen Stacktrace ein. –

Antwort

0

sollten Sie verwenden:

/** 
* The name of the default queue to receive messages from when none is specified explicitly. 
* 
* @param queue the default queue name to use for receive 
*/ 
public void setQueue(String queue) { 
    this.queue = queue; 
} 

statt reply-address.

Und überdenken Ihre

queueTemplate.receiveAndReply("replyQueue", 

Scheint mir Zuhörer einen Blick in eine andere Warteschlange an den routing-key="camel" gebunden nehmen.

Obwohl Sie immer noch ein Chaos mit <binding> haben.

Plus wäre es besser, einen Blick zu werfen, wenn Sie wirklich ListenerContainer auf dem Consumer-Teil statt receiveAndReply verwenden können.