2017-05-09 4 views
3

Ich versuche Header Austausch auf RabbitMQ zu verwenden, mit gemischten Java und Python-Komponenten, und ich brauche eine bestätigte Lieferung.Rabbitmq Header Austausch und bestätigte Lieferung

scheine ich anders ein Verhalten aus dem Python (pika) und Java-Clients zu erhalten.

In Python:

channel.exchange_declare(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ ¦type='headers', 
¦ ¦ ¦ ¦ ¦ ¦ ¦durable=True) 
channel.confirm_delivery() 
result = channel.basic_publish(exchange='headers_test', 
¦ ¦ ¦ ¦ ¦ ¦ routing_key='', 
¦ ¦ ¦ ¦ ¦ ¦ mandatory=True, 
¦ ¦ ¦ ¦ ¦ ¦ body=message, 
¦ ¦ ¦ ¦ ¦ ¦ properties=pika.BasicProperties(
¦ ¦ ¦ ¦ ¦ ¦ ¦ delivery_mode=2, 
¦ ¦ ¦ ¦ ¦ ¦ ¦ headers=message_headers)) 

Wenn die Header übereinstimmen keine gebundenen Verbraucher und die Nachricht nicht weitergeleitet werden, Ergebnis ist falsch

Aber in java/scala:

channel.exchangeDeclare("headers_test", "headers", true, false, null) 
channel.confirmSelect 

val props = MessageProperties.PERSISTENT_BASIC.builder 
¦ ¦ ¦ ¦ .headers(messageHeaders).build 
channel.basicPublish("headers_test", 
¦ ¦ ¦ ¦ ¦ ¦"", //routingKey 
¦ ¦ ¦ ¦ ¦ ¦true, //mandatory 
¦ ¦ ¦ ¦ ¦ ¦props, 
¦ ¦ ¦ ¦ ¦ ¦"data".getBytes) 
channel.waitForConfirmsOrDie() 
Hier

, wenn Messageheaders keine Übereinstimmung finden, die Nachricht scheint nur ohne einen Fehler fallen gelassen werden.

Fehle ich etwas oder das Verhalten beider Clients ist wirklich anders? Und wie kann ich die Lieferung per Header-Austausch in Java bestätigt bekommen?

Hinweis: Ich habe bereits einen „Komplex“ Austausch Warteschlangen Setup-Routing, würde ich lieber vermeiden tote Buchstaben Hinzufügen Routing, um das Spiel, und nur nicht-on-send.

Antwort

1

Das Problem, dass eine Nachricht betrachtet wird bestätigt, auch wenn es keine Warteschlange ist Ihre Header entspricht. Aus der Dokumentation (https://www.rabbitmq.com/confirms.html):

Für unroutable Nachrichten, gibt die Broker eine bestätigen, sobald der Austausch eine Nachricht an jede Warteschlange wird nicht Route überprüft (gibt eine leere Liste von Warteschlangen). Wenn die Nachricht auch als verpflichtend veröffentlicht wird, wird die basic.return vor basic.ack an den Client gesendet. Dasselbe gilt für für negative Bestätigungen (basic.nack).

Stattdessen sollten Sie nach einer basic.return-Nachricht suchen, um festzustellen, ob eine Nachricht weitergeleitet wurde oder nicht.

Ich habe mit wireshark geprüft, und in der Tat kann ich sehen, dass, wenn eine Nachricht dort nicht geroutet ist eine AMQP basic.return Nachricht.

I supppose Sie sollten mit

channel.addReturnListener(new ReturnListener() { 
    @Override 
    public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
    System.out.println("App.handleReturn"); 
    System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
    } 
}); 

Und in der Tat beginnen, wenn eine Nachricht, die ich diese bekommen geroutet wurde nicht:

replyCode = [312], replyText = [NO_ROUTE], Austausch = [headers_logs], routingKey = [], pro ....

Außerdem, wenn Sie Pika das synchrone Verhalten in Java emulieren wollen es scheint, man kann d o indem Sie vor dem Veröffentlichen einer Nachricht eine aktuelle Veröffentlichungsfolgenummer eingeben und einen Bestätigungslistener registrieren, anstatt sich auf .waitForConfirmsOrDie() zu verlassen.

So ein vollständiges Codebeispiel wäre:

channel.addReturnListener(new ReturnListener() { 
     @Override 
     public void handleReturn(int replyCode, String replyText, String exchange, String routingKey, AMQP.BasicProperties properties, byte[] body) throws IOException { 
     System.out.println("App.handleReturn"); 
     System.out.println("replyCode = [" + replyCode + "], replyText = [" + replyText + "], exchange = [" + exchange + "], routingKey = [" + routingKey + "], properties = [" + properties + "], body = [" + body + "]"); 
     } 
    }); 

    channel.addConfirmListener(new ConfirmListener() { 
     @Override 
     public void handleAck(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleAck"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 

     @Override 
     public void handleNack(long deliveryTag, boolean multiple) throws IOException { 
     System.out.println("App.handleNack"); 
     System.out.println("deliveryTag = [" + deliveryTag + "], multiple = [" + multiple + "]"); 
     } 
}); 

long nextPublishSeqNo = channel.getNextPublishSeqNo(); 
System.out.println("nextPublishSeqNo = " + nextPublishSeqNo); 

channel.basicPublish("headers_logs", 
    "", 
    true, 
    props, 
    "data".getBytes()); 

Und innerhalb der Rückkehr/bestätigt Rückruf Sie für einen Kanal suchen muß Nummer veröffentlicht die Sequenz, die Sie, bevor Sie eine Nachricht erhalten zu veröffentlichen.

Wenn Sie sich ansehen, was auf der Leitung geschieht, sendet RabbitMq eine einzelne basic.return-Nachricht zurück, die auch eine Bestätigung (Lieferungs-Tag) enthält, falls eine Nachricht nicht an eine Warteschlange weitergeleitet wurde. Wenn eine Nachricht weitergeleitet wurde, sendet RabbitMq eine einzelne Nachricht zurück, die auch eine Bestätigung enthält.

Es scheint, dass RabbitMQ Java-Client immer basicReturn() Rückruf vor einem basicConfirm() aufruft, so eine Logik, um zu bestimmen, ob eine Nachricht oder nicht weitergeleitet wurde, kann dies sein:

Register Rückkehr und bestätigen Zuhörer auf einem Kanal; Speichern Sie die nächste Veröffentlichungssequenznummer eines Kanals. Warten Sie auf einen Rückruf oder bestätigen Sie den Rückruf. Wenn es sich um einen Rückruf handelt - eine Nachricht wurde nicht weitergeleitet, und Sie sollten eine weitere Bestätigung für denselben Übermittlungstag ignorieren. Wenn Sie einen handleAck() -Aufruf erhalten, bevor Sie einen handleReturn() empfangen, bedeutet dies, dass eine Nachricht an eine Warteschlange weitergeleitet wurde.

Ich bin mir nicht sicher, in welchem ​​Fall .handleNack() aufgerufen werden kann.

Verwandte Themen