2017-03-01 2 views
0

Wenn ich manuell die Warteschlange lösche, die mein PikaClient verbraucht, passiert nichts. Ich kann die Warteschlange mit dem gleichen Namen neu erstellen, aber der Kanal hat aufgehört, die Warteschlange zu verbrauchen (normal, weil ich es gelöscht habe). Aber ich möchte ein Ereignis erhalten, wenn die verbrauchte Warteschlange gelöscht wurde.Wie wurde eine Warteschlange erkannt?

Ich erwartete, dass der Kanal automatisch geschlossen würde, aber «on_channel_close_callback» wird nie aufgerufen. «basic_consume» bietet keinen Rückruf beim Schließen. Ein weiterer wichtiger Punkt, ich muss die TornadoConnection verwenden.

Pika: 0.10.0 Python: 2.7 Tornado: 4,3

Danke für Ihre Hilfe.

class PikaClient(object): 

    def __init__(self): 
     # init everything here 

    def connect(self): 
     pika.adapters.tornado_connection.TornadoConnection(connection_param, on_open_callback=self.on_connected) 

    def on_connected(self, connection): 
     self.logger.info('PikaClient: connected to RabbitMQ') 
     self.connected = True 
     self.connection = connection 
     self.connection.channel(self.on_channel_open) 

    def on_open_error_callback(self, *args): 
     self.logger.error("on_open_error_callback") 

    def on_channel_open(self, channel): 
     channel.add_on_close_callback(self.on_channel_close_callback) 

     channel.basic_consume(self.on_message, queue=self.queue_name, no_ack=True) 

    def on_channel_close_callback(self, reply_code, reply_text): 
     self.logger.error("Consumer was cancelled remotely, shutting down", reply_code=reply_code, reply_text=reply_text) 

Antwort

0

Ich habe eine Problemumgehung gefunden. Ich überprüfe alle X Sekunden, ob mein PikaClient Nachrichten verbraucht hat. Wenn nicht, starte ich die Anwendung neu, die automatisch eine Warteschlange erstellt.

Wenn Sie eine bessere Lösung haben, bin ich noch offen für Vorschläge.

def __init__(self): 
    ... 
    self.have_messages_been_consumed = False 

def on_connected(self, connection): 
    self.logger.info('PikaClient: connected to RabbitMQ') 
    self.connected = True 
    self.connection = connection 
    self.connection.add_timeout(X, self.check_if_messages_have_been_consumed) 
    self.connection.channel(self.on_channel_open) 

def check_if_messages_have_been_consumed(self): 
    if self.have_messages_been_consumed: 
     self.have_messages_been_consumed = False 
     self.connection.add_timeout(X, self.check_if_messages_have_been_consumed) 
    else: 
     # close_and_restart will set to False have_messages_been_consumed 
     self.close_and_restart() 

def on_message(self, channel, basic_deliver, header, body): 
    self.have_messages_been_consumed = True 
    ... 
Verwandte Themen