2017-09-10 5 views
1

Ich habe Probleme mit dem Abrufen von Nachrichten von Kafka in einer Consumer Group. Mein Consumer-Objekt ordnet eine bestimmte Partition mitKafka Consumer Poll Nachrichten mit Python

self.ps = TopicPartition(topic, partition) 

und dass, nachdem die Verbrauchern zuordnet diese Partition:

self.consumer.assign([self.ps]) 

Danach kann mir die Nachrichten innerhalb der Trennwand zählen mit

self.consumer.seek_to_beginning(self.ps) 
pos = self.consumer.position(self.ps) 

und .....

In meinem tpoic sind über 30000 Nachrichten. Das Problem ist, dass ich nur genau eine Nachricht bekomme.

Consumer-Konfiguration mit: max_poll_records= 200 AUTO_OFFSET_RESET ist früheste

Und hier ist meine Funktion mit dieser ich die Nachrichten zu bekommen versuchen:

def poll_messages(self): 


    data = [] 

    messages = self.consumer.poll(timeout_ms=6000) 


    for partition, msgs in six.iteritems(messages): 

     for msg in msgs: 

      data.append(msg) 

    return data 

Auch wenn ich vor dem Offset zum ersten verfügbaren gehen Starten Sie die Abfragen der Nachrichten Ich bekomme nur eine Nachricht.

Ich hoffe, jemand kann mir erklären, was ich falsch mache. Vielen Dank im Voraus.

Beste Wünsche Jörn

Antwort

0

Ich glaube, dass Sie max_poll_records sind Missverständnis - das bedeutet nicht, Sie 200 pro Umfrage bekommen, nur eine Grenze für die Sie vielleicht bekommen. Sie müssen die Umfrage mehrmals aufrufen. Ich würde Sie die Dokumentation für einfache Beispiele verweisen: http://kafka-python.readthedocs.io/en/master/usage.html

Ich glaube, eine Standard-Implementierung ist:

for message in self.consumer: 
    # do stuff like: 
    print(msg) 
+0

Leider Nick, ich glaube, Ihr Beispiel ein blockierender Aufruf ist – cs94njw

+0

Einverstanden. Warum erwähnen Sie das? Es hat keinen Einfluss auf den Abrufmechanismus, oder? Ich habe den Quellcode nicht überprüft. – Nick

+0

Wenn keine Nachricht in der Warteschlange ist (nichts zu lesen), wird die for-Schleife nicht verschoben. Dies ist kein Problem, aber es gibt Ihnen weniger Flexibilität. "Soa" Code oben verwendet Polling, wo es in der Warteschlange für ein paar Sekunden warten wird, und dann andere Dinge tun. Ich denke "Soa" war auf der Suche nach einer Umfrage Lösung. – cs94njw

Verwandte Themen