2016-10-18 2 views
0

Ich möchte nicht die SimpleMessageListenerContainer konsumieren die Nachrichten, wenn meine Transaktion als Rollback markiert.SimpleMessageListenerContainer verbraucht kontinuierlich nach TransactionSystemException

Ich habe nicht listener-container in meiner Konfigurationsdatei hinzugefügt und versucht, die AmqpRejectAndDontRequeueException erneut zu lösen. Es funktioniert nicht. Hier ist mein Code-Basis:

@Transactional 
public class MySecondService { 

    @Resource 
    private MySecondRepository mySecondRepository; 

    @Transactional(propagation = Propagation.REQUIRED) 
    @ServiceActivator(inputChannel = "my-first-servie-output-channel", 
         outputChannel = "my-Second-servie-output-channel") 
    public String saveEntity(final MyTestEntity myTestEntity) 
     throws AmqpRejectAndDontRequeueException { 
     try { 
      mySecondRepository.save(myTestEntity); 
     } catch (Exception e) { 
      throw new AmqpRejectAndDontRequeueException("exception"); 
     } 
    } 
} 

federintegrations context.xml

<int:chain input-channel="transaction-inbound-channel" output-channel="my-first-servie-input-channel"> 
    </int:chain> 
<int-amqp:inbound-channel-adapter 
      channel="transaction-inbound-channel" 
      queue-names="sample.queue" 
      concurrent-consumers="5" 
      error-channel="failed-channel" 
      connection-factory="rabbitConnectionFactory" 
      mapped-request-headers="*" 
      transaction-manager="transactionManager" /> 

    <rabbit:connection-factory 
      id="rabbitConnectionFactory" 
      connection-factory="rcf" 
      host="${spring.rabbitmq.host}" 
      port="${spring.rabbitmq.port}" 
      username="${spring.rabbitmq.username}" 
      password="${spring.rabbitmq.password}" 
      /> 
    <bean id="rcf" class="com.rabbitmq.client.ConnectionFactory"> 
     <property name="host" value="${spring.rabbitmq.host}"/> 
     <property name="requestedHeartbeat" value="10" /> 
    </bean> 

Dies ist der Stack-Trace:

WARN [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Consumer raised exception, processing can restart if the connection factory supports it 
org.springframework.transaction.TransactionSystemException: Could not commit JPA transaction; nested exception is javax.persistence.RollbackException: Transaction marked as rollbackOnly 
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:526) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.processCommit(AbstractPlatformTransactionManager.java:757) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE] 
    at org.springframework.transaction.support.AbstractPlatformTransactionManager.commit(AbstractPlatformTransactionManager.java:726) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE] 
    at org.springframework.transaction.support.TransactionTemplate.execute(TransactionTemplate.java:150) ~[spring-tx-4.1.9.RELEASE.jar:4.1.9.RELEASE] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1062) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:93) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1203) ~[spring-rabbit-1.5.3.RELEASE.jar:na] 
    at java.lang.Thread.run(Thread.java:745) [na:1.7.0_55] 
Caused by: javax.persistence.RollbackException: Transaction marked as rollbackOnly 
    at org.hibernate.jpa.internal.TransactionImpl.commit(TransactionImpl.java:74) ~[hibernate-entitymanager-4.3.11.Final.jar:4.3.11.Final] 
    at org.springframework.orm.jpa.JpaTransactionManager.doCommit(JpaTransactionManager.java:517) ~[spring-orm-4.2.4.RELEASE.jar:4.2.4.RELEASE] 
    ... 7 common frames omitted 
2016-10-19 00:52:50,673 [SimpleAsyncTaskExecutor-1] INFO [] org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer: tags=[{amq.ctag-QBWGIY7co5vUpXwGDXDDBg=sample.queue}], channel=Cached Rabbit Channel: AMQChannel(amqp://[email protected]:9999/,2), acknowledgeMode=AUTO local queue size=0 

ich dies nicht wollen passieren org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer - Restarting Consumer

Kann mir jemand helfen, was ich noch hinzufügen muss, um die AmqpRejectAndDontRequeueException. Danke im Voraus.

Antwort

0

Sie zeigen den Durchfluss nicht an.

Wenn Sie nur einen einzelnen Dienst aufrufen, fügen Sie den Transaktionsmanager nicht dem eingehenden Kanaladapter hinzu.

Die Transaktion wird gestartet, wenn der Dienst aufgerufen wird; Solange der Fehlerfluss auf failed-channel keine Ausnahme auslöst (oder AmqpRejectAndDontRequeueException auslöst), wird die Nachricht nicht erneut gesendet.

Wenn Sie mehrere Services haben, die Sie an einer einzelnen Transaktion teilnehmen möchten, können Sie einen AOP-Hinweis verwenden, um transaction-inbound-channel transaktional zu machen, und der Downstream-Flow wird innerhalb dieser Transaktion ausgeführt.

Oder, wenn Sie nie Nachrichten requeued möchten, setzen Sie defaultRequeueRejected-false auf der Behälter Inbound-Adapter (die Sie als <bean/> verkabeln müssen werden).

+0

Behoben - 'defaultRequeueRejected' ist standardmäßig true; muss das 'falsche' immer ablehnen. –

+0

Sie könnten versuchen, eine "AmqpRejectAndDontRequeueException" aus dem Fehlerfluss zu werfen; Das sollte den Commit-Versuch vermeiden. –

+0

Danke Gary, als ich das zu meinem 'Fehler-Kanal' hinzugefügt habe, hat es funktioniert. '@ServiceActivator (inputChannel =" failed-channel ") public void errorHandler (letzte Ausnahme e) löst AmqpRejectAndDontRequeueException { throw new AmqpRejectAndDontRequeueException (" Exception. "); } '. –

Verwandte Themen