2017-03-24 4 views
14

Kürzlich habe ich eine schnelle Implementierung auf Erzeuger/Verbraucher-Queue-System.Implementierung der verzögerten Warteschlange für PHP AMQP

<?php 
namespace Queue; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable;  

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     // First, we need to make sure that RabbitMQ will never lose our queue. 
     // In order to do so, we need to declare it as durable. To do so we pass 
     // the third parameter to queue_declare as true. 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    // Just in case : http://stackoverflow.com/questions/151660/can-i-trust-php-destruct-method-to-be-called 
    // We should call close explicitly if possible. 
    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      // First, we need to make sure that RabbitMQ will never lose our queue. 
      // In order to do so, we need to declare it as durable. To do so we pass 
      // the third parameter to queue_declare as true. 
      $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     // This tells RabbitMQ not to give more than one message to a worker at 
     // a time. 
     $this->channel->basic_qos(null, 1, null); 

     // Requires ack. 
     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'consumeCallback')); 

     while(count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function consumeCallback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     // Very important to ack, in order to remove msg from queue. Ack after 
     // callback, as exception might happen in callback. 
     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 

    public function getQueueSize() 
    { 
     // three tuple containing (<queue name>, <message count>, <consumer count>) 
     $tuple = $this->channel->queue_declare($this->queueName, false, true, false, false); 
     if ($tuple != null && isset($tuple[1])) { 
      return $tuple[1]; 
     } 
     return -1; 
    } 
} 

public function produce und public function consume Paar arbeitet wie erwartet.

Wenn es jedoch mit einem verzögerten Warteschlangensystem kommt

public function produceWithDelay und public function consume Paar funktioniert nicht wie erwartet. Der Verbraucher, der consume anruft, nicht in der Lage, irgendeinen Gegenstand zu empfangen, sogar auf einen Zeitraum wartend.

Ich glaube etwas nicht richtig mit meiner produceWithDelay Implementierung. Darf ich wissen, was los ist?

+0

Versuchen Sie, Ihre Warteschlange als' $ Channel-> queue_declare ("name", falsch, falsch, falsch, wahr, wahr, array()) zu erklären, ] (https://gist.github.com/tairov/11289983) – Vardius

+0

Es ist nicht notwendig, es von Grund auf neu zu implementieren. So sollten Sie es tun https://stackoverflow.com/a/45549182/579025 –

Antwort

1

Für Randnotiz.

Ich entdeckte, dass dies von meinem eigenen Fehler verursacht wird.

Statt

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare($this->delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

Ich sollte es schreiben in

if (is_null($this->delayedQueueName)) 
    { 
     $delayedQueueName = $this->queueName . '.delayed'; 

     $this->channel->queue_declare(delayedQueueName, false, true, false, false, false, 
     ... 

     $this->delayedQueueName = $delayedQueueName; 
    } 

Mein Membervariable noch nicht richtig initialisiert.

Ein voll funktionsfähiger Code ist wie folgt für Ihren Referenzzweck.

<?php 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

class Amqp 
{ 
    private $connection; 
    private $queueName; 
    private $delayedQueueName; 
    private $channel; 
    private $callback; 

    public function __construct($host, $port, $login, $password, $queueName) 
    { 
     $this->connection = new AMQPStreamConnection($host, $port, $login, $password); 
     $this->queueName = $queueName; 
     $this->delayedQueueName = null; 
     $this->channel = $this->connection->channel(); 
     $this->channel->queue_declare($queueName, false, true, false, false); 
    } 

    public function __destruct() 
    { 
     $this->close(); 
    } 

    public function close() 
    { 
     if (!is_null($this->channel)) { 
      $this->channel->close(); 
      $this->channel = null; 
     } 

     if (!is_null($this->connection)) { 
      $this->connection->close(); 
      $this->connection = null; 
     } 
    } 

    public function produceWithDelay($data, $delay) 
    { 
     if (is_null($this->delayedQueueName)) 
     { 
      $delayedQueueName = $this->queueName . '.delayed'; 

      $this->channel->queue_declare($delayedQueueName, false, true, false, false, false, 
       new AMQPTable(array(
        'x-dead-letter-exchange' => '', 
        'x-dead-letter-routing-key' => $this->queueName 
       )) 
      ); 

      $this->delayedQueueName = $delayedQueueName; 
     } 

     $msg = new AMQPMessage(
      $data, 
      array(
       'delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT, 
       'expiration' => $delay 
      ) 
     ); 

     $this->channel->basic_publish($msg, '', $this->delayedQueueName); 
    } 

    public function produce($data) 
    { 
     $msg = new AMQPMessage(
      $data, 
      array('delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT) 
     ); 

     $this->channel->basic_publish($msg, '', $this->queueName); 
    } 

    public function consume($callback) 
    { 
     $this->callback = $callback; 

     $this->channel->basic_qos(null, 1, null); 

     $this->channel->basic_consume($this->queueName, '', false, false, false, false, array($this, 'callback')); 

     while (count($this->channel->callbacks)) { 
      $this->channel->wait(); 
     } 
    } 

    public function callback($msg) 
    { 
     call_user_func_array(
      $this->callback, 
      array($msg) 
     ); 

     $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
    } 
} 
3

Zuerst überprüfen Sie, ob Ihr Plugin rabbitmq_delayed_message_exchange durch den Befehl aktiviert: rabbitmq-plugins list, wenn nicht - lesen Sie mehr Informationen here.

Und Sie müssen Ihre __construct Methode aktualisieren, weil Sie Queue auf eine andere Weise deklarieren müssen. Ich behaupte nicht, Ihr Konstrukt zu aktualisieren, aber möchte mein einfaches Beispiel bieten:

Declare Warteschlange:

<?php 

require_once __DIR__ . '/../vendor/autoload.php'; 

use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
use PhpAmqpLib\Wire\AMQPTable; 

$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); 
$channel = $connection->channel(); 
$args = new AMQPTable(['x-delayed-type' => 'fanout']); 
$channel->exchange_declare('delayed_exchange', 'x-delayed-message', false, true, false, false, false, $args); 
$args = new AMQPTable(['x-dead-letter-exchange' => 'delayed']); 
$channel->queue_declare('delayed_queue', false, true, false, false, false, $args); 
$channel->queue_bind('delayed_queue', 'delayed_exchange'); 

Nachricht:

$data = 'Hello World at ' . date('Y-m-d H:i:s'); 
$delay = 7000; 
$message = new AMQPMessage($data, ['delivery_mode' => AMQPMessage::DELIVERY_MODE_PERSISTENT]); 
$headers = new AMQPTable(['x-delay' => $delay]); 
$message->set('application_headers', $headers); 
$channel->basic_publish($message, 'delayed_exchange'); 
printf(' [x] Message sent: %s %s', $data, PHP_EOL); 
$channel->close(); 
$connection->close(); 

Empfangen Nachricht:

$callback = function (AMQPMessage $message) { 
    printf(' [x] Message received: %s %s', $message->body, PHP_EOL); 
    $message->delivery_info['channel']->basic_ack($message->delivery_info['delivery_tag']); 
}; 
$channel->basic_consume('delayed_queue', '', false, false, false, false, $callback); 
while(count($channel->callbacks)) { 
    $channel->wait(); 
} 
$channel->close(); 
$connection->close(); 

Sie können auch Quelldateien here finden.
Hoffe es wird dir helfen!

0

Es dauert nur ein paar Zeilen, um verzögerte Messaging-Arbeit zu erhalten, wenn Sie Queue Interop wählen. Es gibt eine Lösung, die auf ttl plus Dead Letter Exchange Ansatz sowie ein verzögertes Plugin basiert. `Und der Austausch vielleicht beim nächsten Anschluss an diesen [Kern,

https://blog.forma-pro.com/rabbitmq-delayed-messaging-da802e3a0aa9

Verwandte Themen