2016-05-19 2 views
0

Ich habe Message-Driven-Channel-Adapter konfiguriert, der die Nachricht aus der Warteschlange abruft und Downstream zur Verarbeitung sendet. service-activator downstream verarbeitet diese Nachricht mit einigen Datenbankeinfügungen, die spring-jpa verwenden, und fügt Nachrichten in die Warteschlange ein. Da Atomikos als Transaktionsmanager verwendet wird, wenn eine Ausnahme in service-activator ausgelöst wird, wird die Ausnahmebedingungsnachricht an den messageError-Kanal gesendet, der auf dem Message-Driven-Channel-Adapter konfiguriert ist. Ich bin nicht in der Lage, diese Nachricht in der Datenbank zu protokollieren, wie ich bekomme com.atomikos.jdbc.AtomikosSQLException: Die Transaktion ist nur für Rollback markiert oder hat das Zeitlimit. Wie behandeln Sie die Ausnahmebedingungsnachricht. Ich verwende eine Federintegration 4.2.4.RELEASE. Im Folgenden finden Sie meine KonfigurationNachrichtengesteuerter Adapter und externer Transaktionsmanager (Atomikos)

<context:annotation-config /> 

<context:component-scan 
    base-package="com.home.sendermain,com.home.queueapi,com.home.processauth" /> 





<int-jms:message-driven-channel-adapter 
    id="pollQueuetoProcess" channel="queueChannel" destination="senderQueue" 
    concurrent-consumers="5" max-concurrent-consumers="10" acknowledge="transacted" 
    auto-startup="false" error-channel="messageError" transaction-manager="JtaTransactionManager" /> 



<int:recipient-list-router id="status1router" 
    input-channel="queueChannel" default-output-channel="nullChannel"> 
    <int:recipient channel="messageSuccessChannel" 
     selector-expression="payload.getStatus1()==1" /> 

</int:recipient-list-router> 

<int:channel id="processChannel"></int:channel> 

<int:chain id="messageSuccessChain" input-channel="messageSuccessChannel"> 

    <int:service-activator ref="senderMessageProcess" id="check1" 
     method="check1" requires-reply="true" /> 

    <int:service-activator ref="senderMessageProcess" 
     id="processMessageStatus" method="processMessageStatus" 
     requires-reply="true" /> 

    <int:recipient-list-router id="status2router" 
     default-output-channel="nullChannel"> 
     <int:recipient channel="nullChannel" 
      selector-expression="payload.getStatus2()==8" /> 
     <int:recipient channel="processChannel" 
      selector-expression="payload.getStatus2()==5 || payload.getStatus2()==10" /> 

    </int:recipient-list-router> 

</int:chain> 

<int:service-activator input-channel="processChannel" 
    output-channel="nullChannel" ref="senderMessageProcess" id="processMessage" 
    method="processMessage" requires-reply="true" /> 


<int:service-activator ref="senderErrorProcess" 
    id="processErrorMessage" method="processErrorMessage" input-channel="messageError" 
    output-channel="nullChannel" requires-reply="true" /> 



<bean id="senderQueue" class="com.ibm.mq.jms.MQQueue"> 
    <constructor-arg value="SENDERQUEUE" /> 
</bean> 

<bean id="testQ" class="com.ibm.mq.jms.MQQueue"> 
    <constructor-arg value="TESTQUEUE" /> 
</bean> 

<!-- declaring MQ XA connection with max pool size as 10 --> 
<bean id="mqConnectionFactory" class="com.ibm.mq.jms.MQXAQueueConnectionFactory"> 
    <property name="queueManager"> 
     <value>TESTQMGR</value> 
    </property> 
</bean> 

<bean id="connectionFactory" class="com.atomikos.jms.AtomikosConnectionFactoryBean" 
    init-method="init" destroy-method="close"> 
    <property name="uniqueResourceName" value="ALL_MQSeries_XA_RMI" /> 
    <property name="xaConnectionFactory" ref="mqConnectionFactory" /> 
    <property name="maxPoolSize"> 
     <value>10</value> 
    </property> 
</bean> 

<bean id="jmsTemplate" class="org.springframework.jms.core.JmsTemplate"> 
    <property name="connectionFactory" ref="connectionFactory" /> 
    <property name="explicitQosEnabled" value="true" /> 
    <property name="sessionTransacted" value="true" /> 
</bean> 


<bean id="entityManagerFactory" 
    class="org.springframework.orm.jpa.LocalContainerEntityManagerFactoryBean" 
    depends-on="JtaTransactionManager, springJtaPlatformAdapter"> 
    <property name="packagesToScan" 
     value="com.home.domain.entity.processentities" /> 
    <property name="dataSource" ref="datasource" /> 
    <property name="jpaVendorAdapter"> 
     <bean class="org.springframework.orm.jpa.vendor.HibernateJpaVendorAdapter" /> 
    </property> 
    <property name="mappingResources"> 
     <value>META-INF/orm.xml</value> 
    </property> 
    <property name="jpaProperties"> 
     <props> 
      <prop key="hibernate.dialect"> 
       org.hibernate.dialect.Oracle10gDialect 
      </prop> 
      <prop key="hibernate.max_fetch_depth">3</prop> 
      <prop key="hibernate.jdbc.fetch_size">50</prop> 
      <prop key="hibernate.jdbc.batch_size">10</prop> 
      <prop key="hibernate.show_sql">true</prop> 
      <prop key="jadira.usertype.autoRegisterUserTypes">true</prop> 
      <prop key="jadira.usertype.databaseZone">jvm</prop> 
      <prop key="jadira.usertype.javaZone">jvm</prop> 
      <prop key="hibernate.transaction.jta.platform">com.home.AtomikosJtaPlatform</prop> 
      <prop key="javax.persistence.transactionType">JTA</prop> 
     </props> 
    </property> 
</bean> 


<jpa:repositories base-package="com.home.domain.repository.processrepository" 
    entity-manager-factory-ref="entityManagerFactory" 
    transaction-manager-ref="JtaTransactionManager" /> 



<jpa:auditing /> 



<aop:config> 
    <aop:pointcut id="serviceOperation" 
     expression="execution(* com.home.sendermain.service.*.*(..)) || execution(* com.home.sendermain.error.*.*(..))" /> 
    <aop:advisor pointcut-ref="serviceOperation" advice-ref="txAdvice" /> 
</aop:config> 

<tx:advice id="txAdvice" transaction-manager="JtaTransactionManager"> 
    <tx:attributes> 
     <tx:method name="find*" read-only="true" rollback-for="java.lang.Throwable" /> 
     <tx:method name="count*" propagation="NEVER" rollback-for="java.lang.Throwable" /> 
     <tx:method name="*" rollback-for="java.lang.Throwable" /> 
    </tx:attributes> 
</tx:advice> 

<aop:config> 
    <aop:pointcut id="serviceOperationQueue" 
     expression="execution(* com.home.queueapi.*.*(..))" /> 
    <aop:advisor pointcut-ref="serviceOperationQueue" 
     advice-ref="txAdviceQueue" /> 
</aop:config> 

<tx:advice id="txAdviceQueue" transaction-manager="JtaTransactionManager"> 
    <tx:attributes> 
     <tx:method name="find*" read-only="true" rollback-for="java.lang.Throwable" /> 
     <tx:method name="count*" propagation="NEVER" rollback-for="java.lang.Throwable" /> 
     <tx:method name="*" rollback-for="java.lang.Throwable" /> 
    </tx:attributes> 
</tx:advice> 



<bean id="userTransactionService" class="com.atomikos.icatch.config.UserTransactionServiceImp" 
    init-method="init" destroy-method="shutdownForce"> 
    <constructor-arg> 
     <!-- IMPORTANT: specify all Atomikos properties here --> 
     <props> 
      <prop key="com.atomikos.icatch.service"> 
       com.atomikos.icatch.standalone.UserTransactionServiceFactory 
      </prop> 
     </props> 
    </constructor-arg> 
</bean> 

<!-- Construct Atomikos UserTransactionManager, needed to configure Spring --> 
<bean id="AtomikosTransactionManager" class="com.atomikos.icatch.jta.UserTransactionManager" 
    init-method="init" destroy-method="close" depends-on="userTransactionService"> 

    <!-- IMPORTANT: disable startup because the userTransactionService above 
     does this --> 
    <property name="startupTransactionService" value="false" /> 

    <!-- when close is called, should we force transactions to terminate or 
     not? --> 
    <property name="forceShutdown" value="false" /> 
</bean> 

<!-- Also use Atomikos UserTransactionImp, needed to configure Spring --> 
<bean id="AtomikosUserTransaction" class="com.atomikos.icatch.jta.UserTransactionImp" 
    depends-on="userTransactionService"> 
    <property name="transactionTimeout" value="300" /> 
</bean> 

<!-- Configure the Spring framework to use JTA transactions from Atomikos --> 
<bean id="JtaTransactionManager" 
    class="org.springframework.transaction.jta.JtaTransactionManager" 
    depends-on="userTransactionService"> 
    <property name="transactionManager" ref="AtomikosTransactionManager" /> 
    <property name="userTransaction" ref="AtomikosUserTransaction" /> 
</bean> 

<!-- declaring database connection for xaresource with poolsize is set to 
    10 --> 
<bean id="datasource" class="com.atomikos.jdbc.AtomikosDataSourceBean" 
    init-method="init" destroy-method="close"> 
    <property name="uniqueResourceName"> 
     <value>XAORADBMS</value> 
    </property> 
    <property name="xaDataSourceClassName"> 
     <value>${jdbc.driverClassName}</value> 
    </property> 
    <property name="xaProperties"> 
     <props> 
      <prop key="user">${jdbc.username}</prop> 
      <prop key="password">${jdbc.password}</prop> 
      <prop key="URL">${jdbc.url}</prop> 
     </props> 
    </property> 
    <property name="poolSize"> 
     <value>${sender.maxPoolSize}</value> 
    </property> 
</bean> 

<bean id="springJtaPlatformAdapter" class="com.home.AtomikosJtaPlatform"> 
    <property name="jtaTransactionManager" ref="JtaTransactionManager" /> 
</bean> 

vorschlagen, wie Ausnahmemeldung zu behandeln.

Antwort

0

Sie müssen eine neue Transaktion für den Protokolliervorgang starten - möglicherweise mit einem einfachen DB Tx Manager - nicht Atomikos - siehe Propagation.REQUIRES_NEW.

+0

Ich bin in der Lage, die Ausnahmemeldung in den Warteschlangenkanal zu bekommen und zu verarbeiten. Die eigentliche Nachricht wird jedoch nicht von einem Message Driven Adapter aus der Warteschlange entfernt. Ich verstehe, dass das Speichern der Ausnahmebedingungsnachricht im Warteschlangenkanal eine andere Transaktion hat. Ich werde es mit einfachen db tx manager, d. H jtransaction manager tun und wird Ihnen das Ergebnis wissen lassen. – Vaidya

+0

'Aber die eigentliche Nachricht wird nicht von der Message-gesteuerten Adapter aus der Warteschlange entfernt. Natürlich nicht - da Sie Atomikos verwenden, wird die JMS-Transaktion auch zurückgesetzt. –

+0

OK hat den Punkt, muss die eigentliche Nachricht behandeln, wenn es bereits fehlgeschlagen ist. d. h. nicht erlauben, die Nachricht stromabwärts weiterzuleiten. – Vaidya

Verwandte Themen