Ich habe Probleme mit dem Abrufen von Nachrichten von Kafka in einer Consumer Group. Mein Consumer-Objekt ordnet eine bestimmte Partition mitKafka Consumer Poll Nachrichten mit Python
self.ps = TopicPartition(topic, partition)
und dass, nachdem die Verbrauchern zuordnet diese Partition:
self.consumer.assign([self.ps])
Danach kann mir die Nachrichten innerhalb der Trennwand zählen mit
self.consumer.seek_to_beginning(self.ps)
pos = self.consumer.position(self.ps)
und .....
In meinem tpoic sind über 30000 Nachrichten. Das Problem ist, dass ich nur genau eine Nachricht bekomme.
Consumer-Konfiguration mit: max_poll_records= 200
AUTO_OFFSET_RESET
ist früheste
Und hier ist meine Funktion mit dieser ich die Nachrichten zu bekommen versuchen:
def poll_messages(self):
data = []
messages = self.consumer.poll(timeout_ms=6000)
for partition, msgs in six.iteritems(messages):
for msg in msgs:
data.append(msg)
return data
Auch wenn ich vor dem Offset zum ersten verfügbaren gehen Starten Sie die Abfragen der Nachrichten Ich bekomme nur eine Nachricht.
Ich hoffe, jemand kann mir erklären, was ich falsch mache. Vielen Dank im Voraus.
Beste Wünsche Jörn
Leider Nick, ich glaube, Ihr Beispiel ein blockierender Aufruf ist – cs94njw
Einverstanden. Warum erwähnen Sie das? Es hat keinen Einfluss auf den Abrufmechanismus, oder? Ich habe den Quellcode nicht überprüft. – Nick
Wenn keine Nachricht in der Warteschlange ist (nichts zu lesen), wird die for-Schleife nicht verschoben. Dies ist kein Problem, aber es gibt Ihnen weniger Flexibilität. "Soa" Code oben verwendet Polling, wo es in der Warteschlange für ein paar Sekunden warten wird, und dann andere Dinge tun. Ich denke "Soa" war auf der Suche nach einer Umfrage Lösung. – cs94njw