2016-07-07 10 views
1

Ich habe Probleme, eine Möglichkeit zu finden, die Nachrichtenebene in SpringAMQP zu verzögern. Ich rufe einen Webservice auf, wenn der Dienst nicht verfügbar ist oder wenn er eine Ausnahme auslöst. Ich speichere alle Anfragen in der RabbitMQ-Warteschlange und versuche den Serviceaufruf solange, bis er erfolgreich ausgeführt wird. Wenn der Dienst einen Fehler anzeigt oder nicht verfügbar ist, führt der RabbitMQ-Listener eine Schleifenbildung durch (bedeutet, dass der Listener die Nachricht abruft und einen Dienst aufruft, falls ein Fehler auftritt)SpringAMQP-Verzögerung

Ich beschränkte die Schleifenbildung auf X Stunden mit MessagePostProcessor Ich wollte jedoch die Verzögerung auf Nachrichtenebene und bei jedem Zugriff auf den Dienst aktivieren. Zum Beispiel 1. versuchen Sie 3000ms Verzögerung und zweiten Mal 6000ms so weiter, bis ich x Zeit versuchen.

Es wäre großartig, wenn Sie ein paar Beispiele bereitstellen.

Könnten Sie mir bitte eine Idee dazu geben?

Antwort

1
I added CustomeMessage delay exchange 

    @Bean 
    CustomExchange delayExchange() { 
     Map<String, Object> args = new HashMap<>(); 
     args.put("x-delayed-type", "direct"); 
     return new CustomExchange("delayed-exchange", "x-delayed-message", true, false, args); 
    } 

Added MessagePostProcessor 

    if (message.getMessageProperties().getHeaders().get("x-delay") == null) { 
      message.getMessageProperties().setHeader("x-delay", 10000); 
     } else { 

      Integer integer = (Integer) message.getMessageProperties().getHeaders().get("x-delay"); 
      if (integer < 60000) { 
       integer = integer + 10000; 
       message.getMessageProperties().setHeader("x-delay", integer); 
      } 
     } 

First time it delays 30 seconds and adds 10seconds each time till it reaches 600 seconds.This should be configurable. 

And finally send the message to 

    rabbitTemplate.convertAndSend("delayed-exchange", queueName,message, rabbitMQMessagePostProcessor); 
+0

Ok. Sieht interessant aus, aber ich finde es als Overhead, die volle Reise zum Broker zu machen, nur um es erneut zu versuchen ... –

+0

Ja, aber auf diese Weise verlieren wir keine Nachrichten und die Wiederholung wird fortgesetzt bis x Minuten, dann verschiebe die Nachricht nach a andere Warteschlange – sea

+1

Wir verlieren sie auch nicht mit Spring Retry, nur weil wir es nicht auf Broker anerkennen. Und ja, wir können es auch an verschiedene Warteschlangen senden. Siehe Dokumentation, auf die ich in meiner Antwort hingewiesen habe. –

1

Nun, es ist nicht möglich, wie Sie das tun.

Message-Re-Queuing ist vollständig der Transaktion Rallback ähnlich, bei der das System in den Zustand vor einer Ausnahme zurückkehrt. Sie können also definitiv keine Nachricht ändern, um zur Warteschlange zurückzukehren.

Wahrscheinlich müssen Sie einen Blick in Spring Retry Projekt aus dem gleichen Grund und Poll-Nachricht aus der Warteschlange nur einmal und Versuche im Speicher, bis erfolgreiche Antwort oder Wiederholung der Politik anstrengend. Am Ende können Sie die Nachricht einfach aus der Warteschlange löschen oder in DLQ verschieben.

Weitere Informationen finden Sie in der Reference Manual.

+0

Hallo Artem, Danke für die Antwort. Ich habe dies mit MessagePostProcessor und CustomExchange erreicht. @Bean CustomExchange delayExchange() { Zuordnung args = new HashMap <>(); args.put ("x-verzögerter Typ", "direkt"); return new CustomExchange ("cos-verzögerter Austausch", "x-verzögerte Nachricht", wahr, falsch, args); } – sea

+0

Ok. Wenn es als gut gefunden wird, können Sie es hier als Antwort auf Ihre eigene Frage teilen. Nur ping mich hier im Kommentar, um darauf zu achten, –

+0

if (message.getMessageProperties(). GetHeaders(). Get ("x-Verzögerung") == null) { message.getMessageProperties(). SetHeader ("x- Verzögerung ", 10000); } else { Ganzzahl Ganzzahl = (Ganzzahl) message.getMessageProperties(). GetHeaders(). Get ("x-delay"); if (Ganzzahl <60000) { integer = ganze Zahl + 10000; message.getMessageProperties(). SetHeader ("x-delay", ganze Zahl); } } – sea