2016-07-07 13 views
1

Ich habe eine PHP-App, die RabbitMQ verwendet. Aus Gründen der Redundanz habe ich ein Paar RabbitMQ-Server erstellt und sie in einem Cluster zusammengefasst. Ich habe auch einen VyOS-Failover-Cluster, der HAProxy zum Laden von Balance-Verbindungen und zum Umleiten im Falle eines Failovers ausführt.php rabbitmq Verbraucher wiederverbinden

Gestern entschied sich unser VyOS Cluster ein Failover (wahrscheinlich eine kurze Netzwerkunterbrechung) benötigt wurde. HAproxy wurde auf einem VyOS gestoppt, die virtuellen IPs verschoben und HAproxy auf dem anderen Knoten neu gestartet.

Danach betrachtete ich die Warteschlangen in Kaninchen und dort sah, war Null Verbraucher für jede Warteschlange. Ich überprüfte die Maschinen, auf denen die Verbraucher noch PHP laufen hatten. Ich verließ sie für eine Weile, um zu sehen, ob sie sich wieder verbanden und sie taten es nicht. Ich musste das PHP-Skript töten und neu starten, und sie schlossen sich wieder an und begannen sofort zu konsumieren.

Ich denke, dass RabbitMQ und HAproxy wie erwartet funktionieren ... Jetzt brauche ich die PHP-Consumer, um ein Failover-Ereignis zu unterstützen ... in anderen Worten, anstatt nur aufzuhängen, muss eine Trennung erkennen und automatisch wieder verbinden.

Hier ist meine RabbitMQ-Klasse. Danke für jede Hilfe im Voraus!

<?php 
while(true) 
{ 
    try{getMessages("transcode2");} 
    catch(Exception $e){echo($e->getMessage()."\n");} 
    sleep(1); 
} 
require_once("../api/db.php"); 
require_once("../vendor/autoload.php"); 
use PhpAmqpLib\Connection\AMQPStreamConnection; 
use PhpAmqpLib\Message\AMQPMessage; 
function sendMessage($msg,$prio) 
{ 
    global $channel; 
    $msg=json_encode($msg); 
    $queue="transcode2"; 
    $channel->queue_declare($queue,true,false,false,false); 
    $channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue); 
} 
function getMessages($queue) 
{ 
    global $connection,$channel; 
    $connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD); 
    $channel=$connection->channel(); 
    $channel->queue_declare($queue,true,false,false,false); 
    $callback=function($msg) 
    { 
     if(handleMessage(json_decode($msg->body,true))) 
     { 
      $msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']); 
     } 
     else 
     { 
      $msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true); 
     } 
    }; 
    $channel->basic_qos(null,1,null); 
    $channel->basic_consume($queue,'',false,false,false,false,$callback); 
    while(count($channel->callbacks)) 
    { 
     try{$channel->wait();} 
     catch(Exception $e) 
     { 
      break; 
     } 
    } 
    $channel->close(); 
    $connection->close(); 
} 
?> 

Antwort

0

Es könnte funktionieren, wenn Sie den Timeout-Parameter zu $ ​​channel-> wait() verwenden;

+0

Ich habe $ channel-> wait() geändert; zu $ channel-> wait (null, false, 60); Es funktionierte, aber es ist erwähnenswert, dass es nicht in der while (count ($ channel-> callbacks)) Schleife blieb, es verließ die getmessage-Funktion die Verbindung geschlossen und startete die gesamte Verbindung eine Sekunde später neu, wenn getmessages oben lief while-Schleife. Ich schätze, es funktioniert, obwohl es viele Reconnection Overhead gibt ... –

0

Die Null-Timeout funktioniert nicht richtig, wie Sie der Broker die Verbindung schließen kann und PHP Verbraucher es nicht beschmutzen.

Die Lösung wäre, kein Null-Empfang-Timeout zu verwenden. Stellen Sie sicher, dass das Verbindungstimeout größer ist als das empfangene.

Hier ist ein Beispiel, basierend auf AMQP Interop:

das AMQP Interop kompatible Transport installieren, zum Beispiel:

composer require enqueue/amqp-bunny 

Der Code funktioniert gleichen Dinge wie Ihr mit explizit Set-Timeouts:

<?php 
use Enqueue\AmqpBunny\AmqpConnectionFactory; 
use Interop\Amqp\AmqpConsumer; 
use Interop\Amqp\AmqpMessage; 
use Interop\Amqp\AmqpQueue; 

$context = (new AmqpConnectionFactory(sprintf(
    'amqp://%s:%[email protected]%s:%s/%2f?connection_timeout=600', // 10 min 
    RABBITMQ_USERNAME,RABBITMQ_PASSWORD, RABBITMQ_SERVER, RABBITMQ_PORT 
)))->createContext(); 

$context->setQos(null,1,null); 

//sendMessage 

$queue = $context->createQueue("transcode2"); 
$queue->addFlag(AmqpQueue::FLAG_PASSIVE); 
$context->declareQueue($queue); 

$message = $context->createMessage(json_encode($msg)); 
$message->setPriority($prio); 

$producer = $context->createProducer(); 
$producer->send($queue, $message); 

// getMessages 

$consumer = $context->createConsumer($queue); 
$context->subscribe($consumer, function(AmqpMessage $message, AmqpConsumer $consumer) { 
    if(handleMessage(json_decode($message->getBody(), true))) { 
     $consumer->acknowledge($message); 
    } else { 
     $consumer->reject($message); 
    } 

    return true; 
}); 

$receiveTimeout = 5000; // 5 seconds, should be lesser than connection_timeout which is 600 seconds now. 

$context->consume($receiveTimeout); 
Verwandte Themen