2017-10-30 1 views
0

Problemstellung:Feder AMQP-Ausgangs-Gateway der Antwort von einem anderen Thead zu produzieren (wie JMS-Outbound-Gateway)

Feder AMQP-Outbound-Gateway-Antwort von einem anderen Thread (Like JMS-Outbound-Gateway herzustellen, unterschiedliche Warteschlangen haben, korreliere die Anfrage/Antwort unter Verwendung des Korrelationsschlüssels).

Konnte die Nachricht mit diesem Beispiel nicht korrelieren.

Frühling Integration

<int:gateway id="outboundGateway" service-interface="com.amqp.outbound.gateway.OutboundGateway"  
         default-reply-channel="defaultReplyChannel" > 
     <int:method name="process" request-channel="inboundRequestChannel"/> 
    </int:gateway> 

    <int:channel id="defaultReplyChannel"/> 
    <int:channel id="inboundRequestChannel"/> 
    <int:channel id="enrichedInboundRequestChannel"/> 
    <int:channel id="processAuthRequestChannel"/> 
    <int:channel id="postProcessorChannel"/> 

    <int:chain input-channel="inboundRequestChannel" output-channel="enrichedInboundRequestChannel"> 
     <int:service-activator id="serviceActivator" 
         ref="ouboundService" method="createRequest"/> 
    </int:chain> 

    <int-amqp:outbound-gateway id="outboundGtwyId" header-mapper="headerMapper" 
         request-channel="enrichedInboundRequestChannel" 
         reply-channel="defaultReplyChannel" 
         amqp-template="template" 
         reply-timeout="30000" 
         exchange-name="request_exchange" 
         routing-key="request_exchange_queue"/> 

    <int-amqp:inbound-channel-adapter id="amqpMessageDriven" queue-names="request_queue" 
           connection-factory="rabbitConnectionFactory" channel="processAuthRequestChannel"/> 

    <int:service-activator id="serviceActivator" 
         ref="ouboundService" input-channel="processAuthRequestChannel" output-channel="postProcessorChannel" 
         method="processRequest"/> 

    <int-amqp:outbound-channel-adapter amqp-template="template" channel="postProcessorChannel" 
      header-mapper="headerMapper" exchange-name="reply_exchange" routing-key="reply_exchange_queue"/> 

    <bean id="headerMapper" class="org.springframework.integration.amqp.support.DefaultAmqpHeaderMapper"/> 

Config

@Bean 
public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ 
    final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); 
    template.setQueue("reply_queue"); 
    return template; 
} 



@Bean 
public Binding binding(){ 
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("request_exchange_queue"); 
} 

@Bean 
public DirectExchange exchange(){ 
    return new DirectExchange("request_exchange"); 
} 

@Bean 
public Queue queue(){ 
    return new Queue("request_queue", true, false, true); 
} 

@Bean 
public Binding bindingReply(){ 
    return BindingBuilder.bind(this.queue()).to(this.exchange()).with("reply_exchange_queue"); 
} 

@Bean 
public DirectExchange exchangeReply(){ 
    return new DirectExchange("reply_exchange"); 
} 


@Bean 
public Queue replyQueue(){ 
    return new Queue("reply_queue", true, false, true); 
} 

Dienst

@Service 
public final class OuboundService { 


    public Message createRequest(String message){ 
     System.out.println("Inside createRequest : "+ message); 
     final String transactionId = UUID.randomUUID().toString(); 
     final Message builtMessage = MessageBuilder.withBody(message.getBytes()) 
       .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN) 
       .setHeader(AmqpHeaders.CORRELATION_ID, transactionId) 
       .build(); 
     return builtMessage; 
    } 


    public Message processRequest(Message message){ 
     System.out.println("Inside process Request : "+ new String(message.getBody())); 
     System.out.println("Header values : "+message.getMessageProperties().getHeaders()); 
     final Message result = MessageBuilder.withBody("Successful".getBytes()).copyProperties(message.getMessageProperties()) 
           .copyHeaders(message.getMessageProperties().getHeaders()).build(); 
     return result; 
    } 

} 

Fehler:

org.springframework.integration.handler.ReplyRequiredException: Keine Antwort von Handler 'OutboundGtwyId' und seine Eigenschaft 'RequiresReply' wird auf True festgelegt.

GitHub Quellcode (Gelöst Lösung)

https://github.com/kingkongprab/spring-amqp-outbound-gateway

Antwort

0

Die Korrelation als auch im Frühjahr AMQP erfolgt. Weitere Informationen finden Sie unter RabbitTemplate#sendAndRecevie(). Außerdem gibt es eine gute Dokumentation zu diesem Thema in der Reference Manual.

Die Spring-Integration mit den Implementierungen AbstractAmqpOutboundEndpoint und AmqpInboundGateway bietet eine sofort einsatzbereite Korrelationslösung für Anfragen und Antworten. Wenn Sie AmqpInboundGateway auf der Serverseite nicht verwenden können, sollten Sie sicherstellen, dass die correlationId Übertragung von der empfangenen Anforderung an die Antwort zurückgesendet wird. Ja, Sie können dedizierten Austausch für Antworten verwenden, und das wird von der RabbitTemplate#setQueue() unterstützt, auf Antworten auf der Client-Ausgangsseite zu warten. Aber das geht immer noch nicht ohne die richtige correlation Übertragung. Informationen darüber, wie Header (einschließlich correlationId) in der Spring-Integration zugeordnet werden, finden Sie auch unter https://docs.spring.io/spring-integration/docs/4.3.12.RELEASE/reference/html/amqp.html#amqp-message-headers.

UPDATE

Vielen Dank für Ihre Anwendung zu teilen.

Nun, jetzt sehe ich mehrere Probleme:

  1. Sie fehlen auf jeden Fall die replyQueue Bindung:

    @Bean 
    public Binding bindingReply(){ 
        return BindingBuilder.bind(this.replyQueue()).to(this.exchangeReply()).with("reply_exchange_queue"); 
    } 
    
  2. RabbitTemplate muss setReplyAddress() verwenden.Sie haben MessageListenerContainer für die reply_queue konfigurieren und haben RabbitTemplate als Zuhörer:

    @Bean 
    public RabbitTemplate template(ConnectionFactory rabbitConnectionFactory){ 
        final RabbitTemplate template = new RabbitTemplate(rabbitConnectionFactory); 
        template.setReplyAddress(replyQueue().getName()); 
        return template; 
    } 
    
    @Bean 
    public MessageListenerContainer replyContainer(RabbitTemplate template) { 
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
        container.setConnectionFactory(template.getConnectionFactory()); 
        container.setQueues(replyQueue()); 
        container.setMessageListener(template); 
        return container; 
    } 
    
  3. Ihre OuboundService mit org.springframework.amqp.core.Message Manipulation nutzlos ist. Die Kanaladapter wissen nicht über diese Art von payload und Ihre benutzerdefinierte Message wird nur als serialisiert body von einem anderen org.springframework.amqp.core.Message. Ich habe es so weit verändert, und alles funktioniert gut:

    public String createRequest(String message){ 
        System.out.println("Inside createRequest : "+ message); 
        return message; 
    } 
    
    
    public Message processRequest(Message message){ 
        System.out.println("Inside process Request : " + message); 
        return message; 
    } 
    

Auf jeden Fall empfehle ich Ihnen, Ihr Design zu überdenken und kommen zurück in die AmqpInboundGateway.

BTW in der endgültigen Lösung müssen Sie sich um keine correlation kümmern. Das Framework erledigt das automatisch für Sie.

+0

Kommentare sind nicht für längere Diskussion; Diese Konversation wurde [in den Chat verschoben] (http://chat.stackoverflow.com/rooms/157947/discussion-on-answer-by-artem-bilan-spring-amqp-outbound-gateway-to-produce-repl) . – Andy

Verwandte Themen