2016-10-02 10 views
0

schreiben Ich bin neu in Twisted, das ist mein erstes Programm.Twisted Python von Kafka zu lesen und zu Elasticsearch

Ich kann keinen Weg finden, den KafkaConsumer aus der kafka-python-Bibliothek zu verwenden und treq zu verwenden, um eine post-Anfrage an elasticsearch auszulösen.

Ich konnte das Problem in kleine Stücke zerlegen: einen kafka Verbraucher Iterator erstellen und Daten daraus lesen (das Thema sehr groß sein kann)

def consumeKafka(): 
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest') 
    consumer.subscribe(['kafkapipeline']) 
    for v in consumer: 
     v.value 

Beitrag Elasticsearch TREQ mit

def post(self): 
    d = treq.post('http://es:9200/pro/pr/', self.data) 
    d.addCallbacks(lambda x: print(x), lambda x: print("error %s " % x)) 

Start des Reaktors

from twisted.internet import reactor 
reactor.callWhenRunning(consumeKafka) 
reactor.run() 

Irgendeine Idee wie um das zu machen?

Antwort

0

Ich benutze Kafka überhaupt nicht, also bin ich mir nicht sicher, ob das für dich funktioniert. Außerdem nehme ich an, dass Sie Schwierigkeiten haben, gleichzeitig Kafka und treq zu laufen. Eine generelle Art, mit Iteratoren in Twisted umzugehen, besteht darin, inlineCallbacks zu verwenden, um auf ein Ergebnis zu warten und danach etwas mit diesem Ergebnis zu tun.

from twisted.internet import defer 

@defer.inlineCallbacks 
def consumeKafka(): 
    consumer = KafkaConsumer(bootstrap_servers="kafka:9092", auto_offset_reset='earliest') 
    consumer.subscribe(['kafkapipeline']) 
    for v in consumer: 
     value = yield v.value 
     # do stuff with value 

Dann können Sie einfach diese Funktion aufrufen und der Reaktor wird sich um den Rest kümmern. Also Ihr Hauptteil wird wie folgt aussehen:

consumeKafka() 
reactor.run() 

Beachten Sie, dass die consumeKafka() Funktion gibt ein Deferred so fügen Sie Rückrufe und errbacks nach Bedarf. Sobald Sie mit diesem Modell vertraut sind, werfen Sie einen Blick auf Cooperator Objekte für mehr Funktionalität.

+1

danke für deine Antwort, ich versuche immer noch, es funktioniert zu machen, aber ich denke, ich muss zuerst die verdrehte Dokumentation lesen. Das ist nicht einfach. – rolele

Verwandte Themen