2016-12-21 2 views
0

Es ist wie "Houston haben wir ein Problem hier", wo ich eine Nachricht für 5 Minuten planen/verzögern muss, nachdem es beim ersten Versuch, ein Ereignis zu verarbeiten fehlschlägt . Ich habe in diesem Szenario einen Dead Letter Exchange implementiert.Spring AMQP - Message Re-Warteschlange mit toten Buchstaben-Mechanismus mit TTL

Die Nachrichten auf Fehler, Weg zum DLX -> Retry Queue und kommt für einen weiteren Versuch nach einer TTL von 5 Minuten zur Arbeit Warteschlange zurück. Hier

ist die Konfiguration verwende ich:

public class RabbitMQConfig { 
    @Bean(name = "work") 
    @Primary 
    Queue workQueue() { 
     return new Queue(WORK_QUEUE, true, false, false, null); 
    } 

    @Bean(name = "workExchange") 
    @Primary 
    TopicExchange workExchange() { 
     return new TopicExchange(WORK_EXCHANGE, true, false); 
    } 

    @Bean 
    Binding workBinding(Queue queue, TopicExchange exchange) { 
     return BindingBuilder.bind(workQueue()).to(workExchange()).with("#"); 
    } 

    @Bean(name = "retryExchange") 
    FanoutExchange retryExchange() { 
     return new FanoutExchange(RETRY_EXCHANGE, true, false); 
    } 

    @Bean(name = "retry") 
    Queue retryQueue() { 
     Map<String, Object> args = new HashMap<String, Object>(); 
     args.put("x-dead-letter-exchange", WORK_EXCHANGE); 
     args.put("x-message-ttl", RETRY_DELAY); //delay of 5 min 
     return new Queue(RETRY_QUEUE, true, false, false, args); 
    } 

    @Bean 
    Binding retryBinding(Queue queue,FanoutExchange exchange) { 
     return BindingBuilder.bind(retryQueue()).to(retryExchange()); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     return factory; 
    } 

    @Bean 
    Consumer receiver() { 
     return new Consumer(); 
    } 

    @Bean 
    MessageListenerAdapter listenerAdapter(Consumer receiver) { 
     return new MessageListenerAdapter(receiver, "receiveMessage"); 
    } 
} 

Producer.java:

@GetMapping(path = "/hello") 
public String sayHello() { 
    // Producer operation 

     String messages[]; 
     messages = new String[] {" hello "}; 

    for (int i = 0; i < 5; i++) { 
     String message = util.getMessage(messages)+i; 

     rabbitTemplate.convertAndSend("WorkExchange","", message); 
     System.out.println(" Sent '" + message + "'"); 
    } 
    return "hello"; 
} 

Consumer.java:

public class Consumer { 

    @RabbitListener(queues = "WorkQueue") 
    public void receiveMessage(String message, Channel channel, 
      @Header(AmqpHeaders.DELIVERY_TAG) Long tag) throws IOException, InterruptedException { 

     try { 

      System.out.println("message to be processed: " + message); 
      doWorkTwo(message); 
      channel.basicAck(tag, false); 

     } catch (Exception e) { 
      System.out.println("In the exception catch block"); 
      System.out.println("message in dead letter exchange: " + message); 
      channel.basicPublish("RetryExchange", "", null, message.getBytes()); 

     } 

    } 

    private void doWorkTwo(String task) throws InterruptedException { 

     int c = 0; 
     int b = 5; 
     int d = b/c; 

    } 

} 

Ist es der richtige Weg, um einen Toten zu verwenden Briefwechsel für mein Szenario und nach dem Warten einmal in der RETRY QUEUE für 5 min, auf der zweiten Mal versucht, sie und bewegt sich in die WORK QUEUE nicht für 5 Minuten in der Wiederholungswarteschlange warten sofort (TTL als 5 Minuten habe ich erwähnt).

ich diese Anwendung leite durch Anschlagen localhost: 8080/hallo url.

Hier ist meine aktualisierte Konfiguration.

RabbitMQConfig.java:

@EnableRabbit 
public class RabbitMQConfig { 

    final static String WORK_QUEUE = "WorkQueue"; 
    final static String RETRY_QUEUE = "RetryQueue"; 
    final static String WORK_EXCHANGE = "WorkExchange"; // Dead Letter Exchange 
    final static String RETRY_EXCHANGE = "RetryExchange"; 
    final static int RETRY_DELAY = 60000; // in ms (1 min) 

    @Bean(name = "work") 
    @Primary 
    Queue workQueue() { 
     Map<String, Object> args = new HashMap<String, Object>(); 
     args.put("x-dead-letter-exchange", RETRY_EXCHANGE); 
     return new Queue(WORK_QUEUE, true, false, false, args); 
    } 

    @Bean(name = "workExchange") 
    @Primary 
    DirectExchange workExchange() { 
     return new DirectExchange(WORK_EXCHANGE, true, false); 
    } 

    @Bean 
    Binding workBinding(Queue queue, DirectExchange exchange) { 
     return BindingBuilder.bind(workQueue()).to(workExchange()).with(""); 
    } 

    @Bean(name = "retryExchange") 
    DirectExchange retryExchange() { 
     return new DirectExchange(RETRY_EXCHANGE, true, false); 
    } 

    // Messages will drop off RetryQueue into WorkExchange for re-processing 
    // All messages in queue will expire at same rate 
    @Bean(name = "retry") 
    Queue retryQueue() { 
     Map<String, Object> args = new HashMap<String, Object>(); 
     //args.put("x-dead-letter-exchange", WORK_EXCHANGE); 
     //args.put("x-message-ttl", RETRY_DELAY); 
     return new Queue(RETRY_QUEUE, true, false, false, null); 
    } 

    @Bean 
    Binding retryBinding(Queue queue, DirectExchange exchange) { 
     return BindingBuilder.bind(retryQueue()).to(retryExchange()).with(""); 
    } 

    @Bean 
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) { 
     SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory(); 
     factory.setConnectionFactory(connectionFactory); 
     factory.setDefaultRequeueRejected(false); 
     /*factory.setAdviceChain(new Advice[] { 
       org.springframework.amqp.rabbit.config.RetryInterceptorBuilder 
         .stateless() 
         .maxAttempts(2).recoverer(new RejectAndDontRequeueRecoverer()) 
         .backOffOptions(1000, 2, 5000) 
         .build() 
     });*/ 
     return factory; 
    } 

    @Bean 
    Consumer receiver() { 
     return new Consumer(); 
    } 

    @Bean 
    MessageListenerAdapter listenerAdapter(Consumer receiver) { 
     return new MessageListenerAdapter(receiver, "receiveMessage"); 
    } 
} 

Consumer.java:

public class Consumer { 

    @RabbitListener(queues = "WorkQueue") 
    public void receiveMessage(String message, Channel channel, 
      @Header(AmqpHeaders.DELIVERY_TAG) Long tag, 
      @Header(required = false, name = "x-death") HashMap<String, String> xDeath) 
      throws IOException, InterruptedException { 

     doWorkTwo(message); 
     channel.basicAck(tag, false); 
    } 

    private void doWorkTwo(String task) { 
     int c = 0; 
     int b = 5; 
     if (c < b) { 
      throw new AmqpRejectAndDontRequeueException(task); 
     } 
    } 
} 
+0

Was Sie vorschlagen macht keinen Sinn - da Sie die Wiederholungs veröffentlichen selbst Warteschlange, hat der Broker keine Kenntnis, dass dies eine zweite Wiederholung ist - es ist nur eine neue Nachricht aus seiner Sicht - so wird jede andere Aktion nicht nehmen. –

+0

Danke @GaryRussell. Wie kann ich das Szenario erreichen, in dem der Broker bestätigen kann, dass es sich um eine zweite Wiederholung handelt? Ich habe mehrere Dinge ausprobiert, aber immer die Anzahl als 1 erhalten. Wie kann die Veröffentlichung in der retryQueue automatisch geschehen? – Diva04

+0

Wenn Sie ablehnen und Requeue direkt, der Makler hat nur einen Hinweis darauf, dass es erneut zugestellt wird, nicht eine Zählung; Wenn Sie einen DLQ ablehnen, zählt der Broker über den 'x-death'-Header - siehe meine Antwort. Wenn Sie es selbst in der DLQ veröffentlichen, müssen Sie über Ihre eigene Kopfzeile zählen. –

Antwort

0

Wenn Sie die Nachricht so die Broker leitet sie an einer DLQ ablehnen, können Sie den x-death Header untersuchen. In diesem Szenario habe ich eine DLQ mit einer TTL von 5 Sekunden und der Consumer der Nachricht aus der Hauptwarteschlange lehnt es ab; die Broker leitet sie an die DLQ, dann verfällt es und wird zurück zum Haupt Warteschlange geleitet - die x-death Header die Anzahl der Re-Routing-Operationen zeigt:

x-death header

+0

Wie lehne ich die Nachricht ab, um zur DLQ zu gehen? Mit channel.reject wird die Nachricht einfach aus der workQueue selbst entfernt. Muss ich im Catch-Block anders umgehen? oder hat meine Konfigurationsdatei etwas fehlt? – Diva04

+0

Sie müssen die Warteschlange mit einem 'x-dead-letter-letterchange'-Argument konfigurieren und Ihre DLQ mit dem richtigen Routingschlüssel (falls der Austauschtyp dies erfordert) an diesen Austausch binden oder einen Fanout-Austausch verwenden und die Warteschlange daran binden). Siehe [hier] (https://www.rabbitmq.com/dlx.html). –

+0

@GaryRusselI, ich habe factory.setDefaultRequeueRejected (false) hinzugefügt; In der Bean SimpleRabbitListenerContainerFactory kommen immer noch die fehlgeschlagenen Nachrichten nicht zur DLQ (retryQueue). Was vermisse ich, um eine Nachricht abzulehnen, so dass sie direkt an die DLQ geht? – Diva04

Verwandte Themen