Ich habe eine Situation mit meiner Feder Integration mit Rabbitmq. Die Nachrichten werden an die Warteschlangen gesendet, enden jedoch im Bereitschaftszustand und einer als unbestätigt, aber der Verbraucher erhält sie nicht.Federintegration Rabbitmq Nachrichten auf Bereitzustand
Thx XML-Konfiguration ist wie folgt:
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:int="http://www.springframework.org/schema/integration"
xmlns:int-amqp="http://www.springframework.org/schema/integration/amqp"
xmlns:rabbit="http://www.springframework.org/schema/rabbit"
xmlns:task="http://www.springframework.org/schema/task"
xmlns:context="http://www.springframework.org/schema/context"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/integration
http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/amqp
http://www.springframework.org/schema/integration/amqp/spring-integration-amqp.xsd
http://www.springframework.org/schema/rabbit
http://www.springframework.org/schema/rabbit/spring-rabbit.xsd
http://www.springframework.org/schema/task
http://www.springframework.org/schema/task/spring-task.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd">
<context:property-placeholder location="classpath:spring/prj-rabbitmq-context-thirdparty.properties" ignore-unresolvable="true" order="3"/>
<rabbit:connection-factory
id="prjRabbitmqConnectionFactory"
addresses="${rabbitmq.addresses}"
username="${rabbitmq.username}"
password="${rabbitmq.password}"
connection-timeout="5000" />
<bean id="rabbitTxManager"
class="org.springframework.amqp.rabbit.transaction.RabbitTransactionManager">
<property name="connectionFactory" ref="prjRabbitmqConnectionFactory"/>
</bean>
<rabbit:template
id="prjRabbitmqTemplate"
connection-factory="prjRabbitmqConnectionFactory"
message-converter="serializerMessageConverter"
retry-template="retryTemplate" />
<bean id="retryTemplate" class="org.springframework.retry.support.RetryTemplate">
<property name="backOffPolicy">
<bean class="org.springframework.retry.backoff.ExponentialBackOffPolicy">
<property name="initialInterval" value="1000" />
<property name="multiplier" value="3" />
<property name="maxInterval" value="10000" />
</bean>
</property>
</bean>
<rabbit:admin
id="prjRabbitmqAdmin"
auto-startup="true"
connection-factory="prjRabbitmqConnectionFactory" />
<rabbit:queue
id="prjSyncQueue"
name="${prj.sync.queue}"
durable="true">
<rabbit:queue-arguments>
<entry key="x-ha-policy" value="all" />
</rabbit:queue-arguments>
</rabbit:queue>
<rabbit:listener-container
connection-factory="prjRabbitmqConnectionFactory"
acknowledge="auto"
channel-transacted="true"
transaction-manager="rabbitTxManager"
task-executor="prjSyncExecutor"
concurrency="1"
max-concurrency="2"
requeue-rejected="true"
message-converter="serializerMessageConverter">
<rabbit:listener
ref="prjProcessorService"
queue-names="${prj.sync.queue}" method="processMessage" />
</rabbit:listener-container>
<task:executor id="prjSyncExecutor"
pool-size="${prj.sync.concurrency.min}-${prj.sync.concurrency.max}"
keep-alive="${prj.sync.concurrency.keep-alive}"
queue-capacity="${prj.sync.concurrency.queue}"
rejection-policy="CALLER_RUNS"/>
<int:channel
id="prjChannel" />
<int-amqp:outbound-channel-adapter
channel="prjChannel"
amqp-template="prjRabbitmqTemplate"
exchange-name="prjSyncExchange"
routing-key="prj-event"
default-delivery-mode="PERSISTENT" />
<rabbit:direct-exchange
name="prjSyncExchange">
<rabbit:bindings>
<rabbit:binding
queue="prjSyncQueue"
key="prj-event" />
</rabbit:bindings>
</rabbit:direct-exchange>
<int:gateway
id="prjGateway"
service-interface="ro.oss.niinoo.thirdparty.prj.gateway.prjEnrichmentGateway">
<int:method
name="send"
request-channel="prjChannel"/>
</int:gateway>
<bean id="prjProcessorService" class="ro.oss.niinoo.thirdparty.prj.processor.impl.prjEnrichmentProcessorImpl" />
<bean id="serializerMessageConverter" class="ro.oss.niinoo.thirdparty.prj.serializer.prjSerializer"/>
auf dem Server neu starten, die erste wird abgeholt, aber beim nächsten Aufruf der Meldungen Pfähle in der Warteschlange. Hast du eine Idee, warum das passieren könnte?
Dank Daniel
EDIT:
Der Verbraucher Code:
public class JsonEnrichmentService implements EnrichmentService {
@Resource
private UserQueryService userQueryService;
@Resource
private SecurityContextService securityContextService;
@Override
public void processMessage(POJO record) {
System.out.println(record);
}
Dies wird einen neuen Dienst aufrufen, die eine Transactional Anmerkung zu ihm hat.
Ich habe das angenommen und eine einzelne System.out.println in den Consumer-Code platziert, aber es hat nichts geändert. –
Wie gesagt, Sie müssen sich einen Thread Dump anschauen. –