2016-09-16 1 views
5

Ich bin neu bei Spring AMQP. Ich habe eine Anwendung, die ein Produzent ist, der Nachrichten an die andere Anwendung sendet, die ein Verbraucher ist.So verwenden Sie Ack oder Nack im Frühjahr AMQP

Sobald der Verbraucher die Nachricht erhält, werden wir die Validierung der Daten durchführen.

Wenn die Daten korrekt sind, müssen wir ACK und Nachricht aus der Warteschlange entfernt werden. Wenn die Daten nicht korrekt sind, müssen wir die Daten NACK (Negative Acknowledge) senden, damit sie in RabbitMQ erneut eingereiht werden.

Ich kam in

**factory.setDefaultRequeueRejected(false);** (Es wird die Nachricht überhaupt nicht requeue)

**factory.setDefaultRequeueRejected(true);** (Es wird die Meldung requeue wenn Ausnahme auftritt)

Aber mein Fall werde ich die Meldung bestätigen basierend auf Validierung. Dann sollte es die Nachricht entfernen. Wenn NACK dann die Nachricht erneut anfordert.

ich in RabbitMQ Website

Die AMQP-Spezifikation definiert die basic.reject Methode, die Kunden ermöglicht individuelle abzulehnen, Nachrichten übermittelt, Anweisen der Broker entweder zu verwerfen oder requeue sie

gelesen haben How um das obige Szenario zu erreichen? Bitte gib mir ein paar Beispiele.

habe ich versucht, ein kleines Programm

 logger.info("Job Queue Handler::::::::::" + new Date()); 
     try { 

     }catch(Exception e){ 

      logger.info("Activity Object Not Found Exception so message should be Re-queued the Message::::::::::::::"); 

     } 

     factory.setErrorHandler(new ConditionalRejectingErrorHandler(cause ->{ 
      return cause instanceof XMLException; 
     })); 

Nachricht nicht für andere Ausnahme Warteschlangen erneut factory.setDefaultRequeueRejected (true)

09: 46: 38.854 ERROR [Stderr] (SimpleAsyncTaskExecutor -1) org.activiti.engine.ActivitiObjectNotFoundException: Keine Prozesse mit Schlüssel 'WF89012'

09: 46: 39.102 INFO [com.example.bip.rabbitmq.handler.ErrorQueueHandler] (SimpleAsyncTaskExecutor-1), empfangen von Error Queue: { ERROR = Konnte nicht begangen JPA Transaktion; verschachtelte Ausnahme ist javax.persistence.RollbackException: Transaktion als rollbackonly}

Antwort

4

Siehe the documentation markiert.

Standardmäßig (mit defaultRequeueRejected=true) wird der Container die Nachricht quittieren (verursacht, dass sie entfernt wird), wenn der Listener normal beendet wird oder abgelehnt wird (und ihn erneut anfordert), wenn der Listener eine Ausnahme auslöst.

Wenn der Hörer (oder Fehlerbehandlungsroutine) löst ein AmqpRejectAndDontRequeueException wird das Standardverhalten außer Kraft gesetzt und die Nachricht wird verworfen (oder geroutet einen DLX/DLQ falls so konfiguriert) - der Behälter aufruft basicReject(true)basicReject(false) statt.

Also, wenn Ihre Validierung fehlschlägt, werfen Sie eine AmqpRejectAndDontRequeueException. Oder konfigurieren Sie Ihren Listener mit einem benutzerdefinierten Fehlerhandler, um Ihre Ausnahme in eine AmqpRejectAndDontRequeueException zu konvertieren.

Das ist in this answer beschrieben.

Wenn Sie wirklich Verantwortung wollen sich nehmen für acking, stellen Sie den Bestätigungsmodus MANUAL und ChannelAwareMessageListener oder this technique verwenden, wenn Sie einen @RabbitListener verwenden.

Aber die meisten Leute lassen nur den Container kümmern (sobald sie verstehen, was los ist). Im Allgemeinen ist die Verwendung von manuellen Acks für spezielle Anwendungsfälle, wie das Zurückstellen von Acks oder das frühe Acking.

EDIT

Es war ein Fehler in der Antwort, die ich darauf Sie (behoben); Sie müssen sich die Ursache der ListenerExecutionFailedException ansehen. Ich dies nur getestet und es funktioniert wie erwartet ...

@SpringBootApplication 
public class So39530787Application { 

    private static final String QUEUE = "So39530787"; 

    public static void main(String[] args) throws Exception { 
     ConfigurableApplicationContext context = SpringApplication.run(So39530787Application.class, args); 
     RabbitTemplate template = context.getBean(RabbitTemplate.class); 
     template.convertAndSend(QUEUE, "foo"); 
     template.convertAndSend(QUEUE, "bar"); 
     template.convertAndSend(QUEUE, "baz"); 
     So39530787Application bean = context.getBean(So39530787Application.class); 
     bean.latch.await(10, TimeUnit.SECONDS); 
     System.out.println("Expect 1 foo:" + bean.fooCount); 
     System.out.println("Expect 3 bar:" + bean.barCount); 
     System.out.println("Expect 1 baz:" + bean.bazCount); 
     context.close(); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setErrorHandler(new ConditionalRejectingErrorHandler(
       t -> t instanceof ListenerExecutionFailedException && t.getCause() instanceof FooException)); 
     return factory; 
    } 

    @Bean 
    public Queue queue() { 
     return new Queue(QUEUE, false, false, true); 
    } 
    private int fooCount; 

    private int barCount; 

    private int bazCount; 

    private final CountDownLatch latch = new CountDownLatch(5); 

    @RabbitListener(queues = QUEUE) 
    public void handle(String in) throws Exception { 
     System.out.println(in); 
     latch.countDown(); 
     if ("foo".equals(in) && ++this.fooCount < 3) { 
      throw new FooException(); 
     } 
     else if ("bar".equals(in) && ++this.barCount < 3) { 
      throw new BarException(); 
     } 
     else if ("baz".equals(in)) { 
      this.bazCount++; 
     } 
    } 

    @SuppressWarnings("serial") 
    public static class FooException extends Exception { } 

    @SuppressWarnings("serial") 
    public static class BarException extends Exception { } 

} 

Ergebnis:

Expect 1 foo:1 
Expect 3 bar:3 
Expect 1 baz:1 
+0

Dank für Erklärung. Ich habe ein Programm ausprobiert. Könnten Sie bitte den Fehler korrigieren? – Chandan

+0

Es ist nicht sinnvoll, immer eine 'AmqpRejectAndDontRequeueException' aus dem Error-Handler zu werfen; Setzen Sie dafür einfach 'default RequeueRejected = false '. Verwenden Sie die Technik in [diese Antwort] (http://stackoverflow.com/questions/39509745/negative-acknowledgement-to-rabbitmq-queue-to-re-queue-the-message-using-spring/39511357#39511357) zu Wirf es nur für die 'XMLException' - dein Hörer muss es erneut werfen. –

+0

Siehe die Bearbeitung meiner Antwort. –

Verwandte Themen