2012-10-01 12 views
9

Ich habe eine vorhandene RabbitMQ-Bereitstellung, dass einige Java-Anwendungen die Sendeprotokollmeldungen als Zeichenfolge-JSON-Objekte auf verschiedenen Kanälen verwenden. Ich möchte Sellerie verwenden, um diese Nachrichten zu konsumieren und sie an verschiedenen Orten (z. B. DB, Hadoop usw.) zu schreiben.Verwenden von Sellerie mit vorhandenen RabbitMQ-Nachrichten

Ich kann sehen, dass Sellerie Design sowohl Hersteller als auch Verbraucher von RabbitMQ-Nachrichten ist, da es versucht, den Mechanismus zu verbergen, mit dem diese Nachrichten geliefert werden. Gibt es überhaupt etwas, damit Sellery Nachrichten, die von einer anderen App erstellt wurden, konsumiert und Jobs bei ihrer Ankunft ausführt?

Antwort

12

Es ist derzeit schwierig, den Selleriearbeitern benutzerdefinierte Benutzer hinzuzufügen, aber dies ändert sich in der Entwicklungsversion (3.1), wo ich Unterstützung für Consumer-Bootschritte hinzugefügt habe.

Es gibt keine Dokumentation noch, wie ich es gerade beendet habe Umsetzung, aber hier ist ein Beispiel:

from celery import Celery 
from celery.bin import Option 
from celery.bootsteps import ConsumerStep 
from kombu import Consumer, Exchange, Queue 

class CustomConsumer(ConsumerStep): 
    queue = Queue('custom', Exchange('custom'), routing_key='custom') 

    def __init__(self, c, enable_custom_consumer=False, **kwargs): 
     self.enable = self.enable_custom_consumer 

    def get_consumers(self, connection): 
     return [ 
      Consumer(connection.channel(), 
       queues=[self.queue], 
       callbacks=[self.on_message]), 
     ] 

    def on_message(self, body, message): 
     print('GOT MESSAGE: %r' % (body,)) 
     message.ack() 


celery = Celery(broker='amqp://localhost//') 
celery.steps['consumer'].add(CustomConsumer) 
celery.user_options['worker'].add(
    Option('--enable-custom-consumer', action='store_true', 
      help='Enable our custom consumer.'), 
) 

Beachten Sie, dass der API in der finalen Version ändern kann, ist eine Sache, die ich noch nicht sicher bin ungefähr ist, wie Kanäle nach get_consumer(connection) gehandhabt werden. Derzeit ist der Kanal des Verbrauchers geschlossen, wenn die Verbindung unterbrochen wird, und beim Herunterfahren , aber die Leute möchten vielleicht Kanäle manuell behandeln. In diesem Fall besteht immer die Möglichkeit ConsumerStep anzupassen oder einen neuen StartStopStep zu schreiben.

+3

Die Dokumentation kann jetzt unter http://cellery.readthedocs.org/en/latest/userguide/extending.html gefunden werden –

Verwandte Themen