0

Ich habe eine Situation mit meiner Feder Integration mit Rabbitmq. Die Nachrichten werden an die Warteschlangen gesendet, enden jedoch im Bereitschaftszustand und einer als unbestätigt, aber der Verbraucher erhält sie nicht.Federintegration Rabbitmq Nachrichten auf Bereitzustand

Thx XML-Konfiguration ist wie folgt:

<beans xmlns="http://www.springframework.org/schema/beans" 
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xmlns:int="http://www.springframework.org/schema/integration" 
    xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp" 
    xmlns:rabbit="http://www.springframework.org/schema/rabbit" 
    xmlns:task="http://www.springframework.org/schema/task" 
    xmlns:context="http://www.springframework.org/schema/context" 
    xsi:schemaLocation=" 
    http://www.springframework.org/schema/beans 
    http://www.springframework.org/schema/beans/spring-beans.xsd 
    http://www.springframework.org/schema/integration 
    http://www.springframework.org/schema/integration/spring-integration.xsd 
    http://www.springframework.org/schema/integration/amqp 
    http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd 
    http://www.springframework.org/schema/rabbit 
    http://www.springframework.org/schema/rabbit/spring-rabbit.xsd 
    http://www.springframework.org/schema/task 
    http://www.springframework.org/schema/task/spring-task.xsd 
    http://www.springframework.org/schema/context 
    http://www.springframework.org/schema/context/spring-context.xsd"> 

<context:property-placeholder location="classpath:spring/prj-rabbitmq-context-thirdparty.properties" ignore-unresolvable="true" order="3"/> 

<rabbit:connection-factory 
     id="prjRabbitmqConnectionFactory" 
     addresses="${rabbitmq.addresses}" 
     username="${rabbitmq.username}" 
     password="${rabbitmq.password}" 
     connection-timeout="5000" /> 

<bean id="rabbitTxManager" 
     class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager"> 
    <property name="connectionFactory" ref="prjRabbitmqConnectionFactory"/> 
</bean> 

<rabbit:template 
     id="prjRabbitmqTemplate" 
     connection-factory="prjRabbitmqConnectionFactory" 
     message-converter="serializerMessageConverter" 
     retry-template="retryTemplate" /> 

<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate"> 
    <property name="backOffPolicy"> 
     <bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy"> 
      <property name="initialInterval" value="1000" /> 
      <property name="multiplier" value="3" /> 
      <property name="maxInterval" value="10000" /> 
     </bean> 
    </property> 
</bean> 

<rabbit:admin 
     id="prjRabbitmqAdmin" 
     auto-startup="true" 
     connection-factory="prjRabbitmqConnectionFactory" /> 

<rabbit:queue 
     id="prjSyncQueue" 
     name="${prj.sync.queue}" 
     durable="true"> 
    <rabbit:queue-arguments> 
     <entry key="x-ha-policy" value="all" /> 
    </rabbit:queue-arguments> 
</rabbit:queue> 


<rabbit:listener-container 
     connection-factory="prjRabbitmqConnectionFactory" 
     acknowledge="auto" 
     channel-transacted="true" 
     transaction-manager="rabbitTxManager" 
     task-executor="prjSyncExecutor" 
     concurrency="1" 
     max-concurrency="2" 
     requeue-rejected="true" 
     message-converter="serializerMessageConverter"> 
    <rabbit:listener 
      ref="prjProcessorService" 
      queue-names="${prj.sync.queue}" method="processMessage" /> 
</rabbit:listener-container> 

<task:executor id="prjSyncExecutor" 
       pool-size="${prj.sync.concurrency.min}-${prj.sync.concurrency.max}" 
       keep-alive="${prj.sync.concurrency.keep-alive}" 
       queue-capacity="${prj.sync.concurrency.queue}" 
       rejection-policy="CALLER_RUNS"/> 
<int:channel 
     id="prjChannel" /> 

<int-amqp:outbound-channel-adapter 
     channel="prjChannel" 
     amqp-template="prjRabbitmqTemplate" 
     exchange-name="prjSyncExchange" 
     routing-key="prj-event" 
     default-delivery-mode="PERSISTENT" /> 


<rabbit:direct-exchange 
     name="prjSyncExchange"> 
    <rabbit:bindings> 
     <rabbit:binding 
       queue="prjSyncQueue" 
       key="prj-event" /> 
    </rabbit:bindings> 
</rabbit:direct-exchange> 

<int:gateway 
     id="prjGateway" 
     service-interface="ro.oss.niinoo.thirdparty.prj.gateway.prjEnrichmentGateway"> 
    <int:method 
      name="send" 
      request-channel="prjChannel"/> 
</int:gateway> 

<bean id="prjProcessorService" class="ro.oss.niinoo.thirdparty.prj.processor.impl.prjEnrichmentProcessorImpl" /> 
<bean id="serializerMessageConverter" class="ro.oss.niinoo.thirdparty.prj.serializer.prjSerializer"/> 

auf dem Server neu starten, die erste wird abgeholt, aber beim nächsten Aufruf der Meldungen Pfähle in der Warteschlange. Hast du eine Idee, warum das passieren könnte?

Dank Daniel

EDIT:

Der Verbraucher Code:

public class JsonEnrichmentService implements EnrichmentService { 

@Resource 
private UserQueryService userQueryService; 

@Resource 
private SecurityContextService securityContextService; 

@Override 
public void processMessage(POJO record) { 
    System.out.println(record); 
} 

Dies wird einen neuen Dienst aufrufen, die eine Transactional Anmerkung zu ihm hat.

Antwort

0

Meiner Erfahrung nach wird dies im Allgemeinen dadurch verursacht, dass der Listener-Thread irgendwo im Benutzercode "hängt". Nehmen Sie einen Thread-Dump, um zu sehen, was der Listener-Thread gerade macht.

+0

Ich habe das angenommen und eine einzelne System.out.println in den Consumer-Code platziert, aber es hat nichts geändert. –

+0

Wie gesagt, Sie müssen sich einen Thread Dump anschauen. –

Verwandte Themen