2017-07-16 2 views
0

Ich habe 2 Warteschlangen, sagen q1 und q2, die e1 und e2 Austausch mit Binding Key b1 und b2 entspricht. Ich möchte Verbraucherfunktionen parallel laufen lassen, sagen wir c1 und c2, die auf q1 bzw. q2 hören. Ich habe versucht, die folgende Art und Weise:Mehrere Verbraucher in Rabbitmq für mehrere Warteschlange

def c1(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e1', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q1') 
    queue_name = result.method.queue 
    binding_key = "b1" 
    channel.queue_bind(exchange='e1', 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

def c2(): 
    connection = pika.BlockingConnection(pika.ConnectionParameters(host=constants.rmqHostIp)) 
    channel = connection.channel() 
    channel.exchange_declare(exchange='e2', durable='true', 
         type='topic') 
    result = channel.queue_declare(durable='false', queue='q2') 
    queue_name = result.method.queue 
    binding_key = "b2" 
    channel.queue_bind(exchange=e1, 
         queue=queue_name, 
         routing_key=binding_key) 
    channel.basic_consume(callback,queue=queue_name,no_ack=False) 
    channel.start_consuming() 

if __name__ == '__main__': 
    c1() 
    c2() 

Es wird jedoch nur zu c1 Funktion und c2 Funktion hören, nicht ausgeführt zu werden. Wie kann ich beide Funktionen ausführen? Vielen Dank im Voraus.

EDIT: Ich habe Methode c1 und c1 in 2 verschiedenen Modul (Datei)

+0

Sie sollten Python-Threading-Modul oder eine Alternative zum Blockieren der Verbindung verwenden. – alphiii

Antwort

1

Um beide Funktionen gleichzeitig einige Multi-Threading-Verfahren sein muss, um laufen. Bitte werfen Sie einen Blick auf here für einige Python-Beispiele.

Hier ist Ihr Code mit der Prozessklasse geändert. Es kann auch einen Thread verwenden oder explizit vom Betriebssystem ausführen.

import pika 
from multiprocessing import Process 


def callback(): 
    print 'callback got data' 


class c1(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e1', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q1') 
     queue_name = result.method.queue 
     binding_key = "b1" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 
     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 


class c2(): 
    def __init__(self): 
     self.connection = pika.BlockingConnection(pika.ConnectionParameters('localhost')) 
     self.channel = self.connection.channel() 
     self.channel.exchange_declare(exchange='e2', durable='true', type='topic') 
     result = self.channel.queue_declare(durable='false', queue='q2') 
     queue_name = result.method.queue 
     binding_key = "b2" 
     self.channel.queue_bind(exchange='e1', queue=queue_name, routing_key=binding_key) 

     self.channel.basic_consume(callback,queue=queue_name,no_ack=False) 

    def run(self): 
     self.channel.start_consuming() 

if __name__ == '__main__': 
    subscriber_list = [] 
    subscriber_list.append(c1()) 
    subscriber_list.append(c2()) 

    # execute 
    process_list = [] 
    for sub in subscriber_list: 
     process = Process(target=sub.run) 
     process.start() 
     process_list.append(process) 

    # wait for all process to finish 
    for process in process_list: 
     process.join() 
Verwandte Themen