Ich habe eine PHP-App, die RabbitMQ verwendet. Aus Gründen der Redundanz habe ich ein Paar RabbitMQ-Server erstellt und sie in einem Cluster zusammengefasst. Ich habe auch einen VyOS-Failover-Cluster, der HAProxy zum Laden von Balance-Verbindungen und zum Umleiten im Falle eines Failovers ausführt.php rabbitmq Verbraucher wiederverbinden
Gestern entschied sich unser VyOS Cluster ein Failover (wahrscheinlich eine kurze Netzwerkunterbrechung) benötigt wurde. HAproxy wurde auf einem VyOS gestoppt, die virtuellen IPs verschoben und HAproxy auf dem anderen Knoten neu gestartet.
Danach betrachtete ich die Warteschlangen in Kaninchen und dort sah, war Null Verbraucher für jede Warteschlange. Ich überprüfte die Maschinen, auf denen die Verbraucher noch PHP laufen hatten. Ich verließ sie für eine Weile, um zu sehen, ob sie sich wieder verbanden und sie taten es nicht. Ich musste das PHP-Skript töten und neu starten, und sie schlossen sich wieder an und begannen sofort zu konsumieren.
Ich denke, dass RabbitMQ und HAproxy wie erwartet funktionieren ... Jetzt brauche ich die PHP-Consumer, um ein Failover-Ereignis zu unterstützen ... in anderen Worten, anstatt nur aufzuhängen, muss eine Trennung erkennen und automatisch wieder verbinden.
Hier ist meine RabbitMQ-Klasse. Danke für jede Hilfe im Voraus!
<?php
while(true)
{
try{getMessages("transcode2");}
catch(Exception $e){echo($e->getMessage()."\n");}
sleep(1);
}
require_once("../api/db.php");
require_once("../vendor/autoload.php");
use PhpAmqpLib\Connection\AMQPStreamConnection;
use PhpAmqpLib\Message\AMQPMessage;
function sendMessage($msg,$prio)
{
global $channel;
$msg=json_encode($msg);
$queue="transcode2";
$channel->queue_declare($queue,true,false,false,false);
$channel->basic_publish(new AMQPMessage($msg,array('priority' => $prio)),'',$queue);
}
function getMessages($queue)
{
global $connection,$channel;
$connection=new AMQPStreamConnection(RABBITMQ_SERVER,RABBITMQ_PORT,RABBITMQ_USERNAME,RABBITMQ_PASSWORD);
$channel=$connection->channel();
$channel->queue_declare($queue,true,false,false,false);
$callback=function($msg)
{
if(handleMessage(json_decode($msg->body,true)))
{
$msg->delivery_info['channel']->basic_ack($msg->delivery_info['delivery_tag']);
}
else
{
$msg->delivery_info['channel']->basic_nack($msg->delivery_info['delivery_tag'],false,true);
}
};
$channel->basic_qos(null,1,null);
$channel->basic_consume($queue,'',false,false,false,false,$callback);
while(count($channel->callbacks))
{
try{$channel->wait();}
catch(Exception $e)
{
break;
}
}
$channel->close();
$connection->close();
}
?>
Ich habe $ channel-> wait() geändert; zu $ channel-> wait (null, false, 60); Es funktionierte, aber es ist erwähnenswert, dass es nicht in der while (count ($ channel-> callbacks)) Schleife blieb, es verließ die getmessage-Funktion die Verbindung geschlossen und startete die gesamte Verbindung eine Sekunde später neu, wenn getmessages oben lief while-Schleife. Ich schätze, es funktioniert, obwohl es viele Reconnection Overhead gibt ... –