2016-09-01 5 views
0

Ich verwende Federn SimpleMessageListenerContainer, um Nachrichten aus einer RabbitMQ-Warteschlange zu konsumieren. Alles funktioniert gut, aber wenn eine ungültige Nachricht an die Warteschlange gesendet wird (z. B. ungültiger JSON), bricht der Listener einfach ab, fährt den Worker herunter und akzeptiert keine weiteren Nachrichten.Spring SimpleMessageListenerContainer für RabbitMQ wird bei ungültiger Nachricht abgebrochen

Ist es möglich, es so zu konfigurieren, dass es die unterbrochene Nachricht verwirft und weiter Nachrichten abgehört?

Ich bin mit Sprint-Kaninchen-1.6.1.RELEASE.jar

Meine Konfiguration sieht wie folgt aus:

@Bean 
public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
               MessageListenerAdapter listenerAdapter, 
               MessageConverter messageConverter) { 
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
    container.setConnectionFactory(connectionFactory); 
    container.setQueueNames("my.queue"); 
    container.setMessageListener(listenerAdapter); 
    container.setMessageConverter(messageConverter); 
    return container; 
} 

@Bean 
public MessageConverter messageConverter() { 
    return new Jackson2JsonMessageConverter(); 
} 

@Bean 
MessageListenerAdapter listenerAdapter(Worker worker) { 
    MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage"); 
    messageListenerAdapter.setMessageConverter(new Jackson2JsonMessageConverter()); 
    return messageListenerAdapter; 
} 

Die Erklärung meiner Zuhörer Methode:

public void processMessage(Map<String, String> message) { 

Wenn ich eine Nachricht wie '"routeId":"7"}' (defekte JSON) sende, dann bekomme ich die Ausnahme:

2016-09-02 08:10:35.821 WARN 35841 --- [ container-29] s.a.r.l.ConditionalRejectingErrorHandler : Execution of Rabbit message listener failed. 

org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) [spring-rabbit-1.6.1.RELEASE.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] 
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String) 
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101] 
at  org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
... 12 common frames omitted 

2016-09-02 08:10:35.828 ERROR 35841 --- [ container-29]  o.s.a.r.l.SimpleMessageListenerContainer : Consumer received fatal exception during processing 

org.springframework.amqp.rabbit.listener.exception.FatalListenerExecutionException: Invalid listener 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1351) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at java.lang.Thread.run(Thread.java:745) [na:1.8.0_101] 
Caused by: org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Failed to invoke target method 'processMessage' with argument type = [class java.lang.String], value = [{routeId}] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:408) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:298) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:777) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:700) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$001(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$1.invokeListener(SimpleMessageListenerContainer.java:187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.invokeListener(SimpleMessageListenerContainer.java:1187) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:681) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:1165) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:1149) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1100(SimpleMessageListenerContainer.java:95) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1312) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
... 1 common frames omitted 
Caused by: java.lang.NoSuchMethodException: nz.co.qrious.transport.batchstarter.service.Worker.processMessage(java.lang.String) 
at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_101] 
at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:174) ~[spring-core-4.3.2.RELEASE.jar:4.3.2.RELEASE] 
at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:386) ~[spring-rabbit-1.6.1.RELEASE.jar:na] 
... 12 common frames omitted 

2016-09-02 08:10:35.833 ERROR 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Stopping container from aborted consumer 
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Waiting for workers to finish. 
2016-09-02 08:10:35.833 INFO 35841 --- [ container-29] o.s.a.r.l.SimpleMessageListenerContainer : Successfully waited for workers to finish. 

Die fatale Ausnahme in SimpleMessageListenerContainer wird hier geworfen:

catch (ListenerExecutionFailedException ex) { 
        // Continue to process, otherwise re-throw 
        if (ex.getCause() instanceof NoSuchMethodException) { 
         throw new FatalListenerExecutionException("Invalid listener", ex); 
        } 
       } 

So scheint es, es soll heruntergefahren werden, wenn der Behälter mit einem nicht existierenden Verfahren konfiguriert ist. Aber im Falle einer fehlerhaften Nachricht wird versucht, die Methode mit einem falschen Parametertyp aufzurufen, was ebenfalls eine NoSuchMethodException verursacht. Das bedeutet, dass jeder Produzent meinen Kunden mit einer kaputten Nachricht töten kann.

Vielen Dank für Anregungen!

+0

Ich bin nicht sicher, was Sie unter 'Aber im Fall einer gebrochenen Nachricht, es versucht, das Verfahren mit einem falschen Parameter-Typ zu nennen, die auch eine NoSuchMethodException.' verursacht. Sie müssen den kompletten Stack-Trace anzeigen, keinen bearbeiteten Stack und welche Version verwenden Sie? Normalerweise verursacht ein fehlerhafter JSON 'MessageConversionException', die speziell behandelt werden und solche Nachrichten zurückgewiesen werden. –

+0

@GaryRussell Danke für Ihren Kommentar. Ich habe den kompletten Stacktrace und auch mehr Konfiguration hinzugefügt. Es scheint, dass es versucht, meine Listener-Methode ('processMessage') mit einem String als Argument aufzurufen, anstatt nur früher zu versagen, wenn versucht wird, den JSON zu parsen. – johannesv

+0

Siehe meine Antwort für eine Arbeit - ich öffnete auch eine [JIRA Problem] (https://jira.spring.io/browse/AMQP-644). –

Antwort

1

Interessant; Ich konnte dein Problem reproduzieren; es stellt sich heraus, dass, wenn die Nachricht keinen __TypeID__ Header (Konvertierungshinweis) enthält, es einfach den "schlechten" JSON als String zurückgibt.

Ich konnte es lösen, indem Sie einen benutzerdefinierten Klassen-Mapper in den Konverter injizieren.

Sie können auch festlegen, dass das sendende System den Typheader setzt.

Dann wird die Nachricht abgelehnt, weil wir eine MessageConversionException erhalten.

package com.example; 

import java.util.concurrent.CountDownLatch; 
import java.util.concurrent.TimeUnit; 

import org.springframework.amqp.core.MessageProperties; 
import org.springframework.amqp.core.Queue; 
import org.springframework.amqp.rabbit.connection.ConnectionFactory; 
import org.springframework.amqp.rabbit.core.RabbitTemplate; 
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer; 
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter; 
import org.springframework.amqp.support.converter.ClassMapper; 
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter; 
import org.springframework.amqp.support.converter.MessageConverter; 
import org.springframework.amqp.support.converter.SimpleMessageConverter; 
import org.springframework.boot.SpringApplication; 
import org.springframework.boot.autoconfigure.SpringBootApplication; 
import org.springframework.context.ConfigurableApplicationContext; 
import org.springframework.context.annotation.Bean; 

@SpringBootApplication 
public class So39264965Application { 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So39264965Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend("my.queue", new Foo()); 
     context.getBean(Worker.class).latch.await(60, TimeUnit.SECONDS); 

     // bad json 
     template.setMessageConverter(new SimpleMessageConverter()); 
     template.convertAndSend("", "my.queue", "\"routeId\":\"7\"}", m -> { 
      m.getMessageProperties().setContentType("application/json"); 
      return m; 
     }); 


     Thread.sleep(60000); 
     context.close(); 
    } 

    @Bean 
    public SimpleMessageListenerContainer container(ConnectionFactory connectionFactory, 
      MessageListenerAdapter listenerAdapter, MessageConverter messageConverter) { 
     SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(); 
     container.setConnectionFactory(connectionFactory); 
     container.setQueueNames("my.queue"); 
     container.setMessageListener(listenerAdapter); 
     container.setMessageConverter(messageConverter); 
     return container; 
    } 

    @Bean 
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) { 
     RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory); 
     rabbitTemplate.setMessageConverter(messageConverter()); 
     return rabbitTemplate; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue("my.queue"); 
    } 

    @Bean 
    public MessageConverter messageConverter() { 
     Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter(); 
     jackson2JsonMessageConverter.setClassMapper(new ClassMapper() { 

      @Override 
      public Class<?> toClass(MessageProperties properties) { 
       return Foo.class; 
      } 

      @Override 
      public void fromClass(Class<?> clazz, MessageProperties properties) { 

      } 

     }); 
     return jackson2JsonMessageConverter; 
    } 

    @Bean 
    MessageListenerAdapter listenerAdapter(Worker worker) { 
     MessageListenerAdapter messageListenerAdapter = new MessageListenerAdapter(worker, "processMessage"); 
     messageListenerAdapter.setMessageConverter(messageConverter()); 
     return messageListenerAdapter; 
    } 

    @Bean 
    public Worker worker() { 
     return new Worker(); 
    } 

    public static class Worker { 

     private final CountDownLatch latch = new CountDownLatch(1); 

     public void processMessage(Foo foo) { 
      System.out.println(foo); 
      this.latch.countDown(); 
     } 

    } 

    public static class Foo { 

     private String bar = "bar"; 

     public String getBar() { 
      return this.bar; 
     } 

     public void setBar(String bar) { 
      this.bar = bar; 
     } 

     @Override 
     public String toString() { 
      return "Foo [bar=" + this.bar + "]"; 
     } 

    } 

} 
+0

Danke, das funktioniert :-) – johannesv

Verwandte Themen