2017-08-28 1 views
2

Ich versuche Nachrichten von Kafka zu lesen, deshalb habe ich einfache Verbraucher geschrieben, um Nachrichten von Kafka zu lesen.Wie liest man Batch-Nachrichten in konfluenten Kafka Python?

While True: 
     message = consumer.poll(timeout=1.0) 
     # i am doing something with messages 

in der obigen Codeausgabe des Nachrichtentyps ist das Nachrichtenobjekt. Wie kann ich als ein Array von Nachrichten erhalten?

gibt es irgendeine möglichkeit ??

Hinweis: Nicht viele grundlegende Konfiguration der Verbraucher.

Antwort

5

librdkafka (die zugrundeliegende C-Bibliothek) gibt Nachrichten nur einzeln an die Anwendung zurück, aber intern werden Nachrichten per Stapel von Brokern abgerufen, sodass keine Leistungseinbußen auftreten. Nachrichten werden in einem internen Puffer in die Warteschlange gestellt und warten auf die Abfrage durch Ihre App.

Es gibt Konfigurationen tune das Verhalten:

fetch.wait.max.ms (Standard 100), die Zeit bis zum broker gegebenen Daten zu akkumulieren fetch.message.max.bytes (default 1048576, 1 GB) zu senden, die maximale Größe von Chargen queued.max.messages.kbytes (default 1000000), die maximale Größe der Daten in der internen Warteschlange. Wenn Sie nicht regelmäßig abfragen, werden die Daten nicht aus der Warteschlange gelöscht und Sie können keine weiteren Daten abrufen.

Und viele andere, die Sie finden Sie hier: https://github.com/edenhill/librdkafka/blob/0.11.0.x/CONFIGURATION.md


Wenn Sie wirklich eine Reihe von Daten aufgrund Ihrer Art und Weise wollen Daten zu verarbeiten, was Sie tun können, ist Call Umfrage mit niedrigen Timeout in einer Schleife wie Sie tun, und stoppen Sie Ihre Schleife, wenn Sie x Nachrichten oder nach y ms, sammeln sie in einer Sammlung. Verarbeiten Sie das generierte Array und wiederholen Sie die Schleife.

Das gleiche gilt für die Herstellung: Sie produzieren Daten nacheinander, aber die Nachrichten werden gestapelt, bevor sie an Broker gesendet werden.

+0

Können wir den zugrunde liegenden C-Code ändern, um einen Stapel von Nachrichten zurückzugeben? Da das Iterieren in Python und nur das Abrufen der Nachricht den gesamten Prozess verlangsamen kann, wird es im Fall des Zurückgebens einer Menge von Nachrichten von C selbst schneller sein. –

+1

Es war der Fall vorher, aber es wurde benchmarked gab es keinen Nachteil (in C), um eine Nachricht zu einer Zeit gegen eine Charge zurückzugeben, aufgrund der Art, wie die Zuordnung erfolgt ist. Sie können poll (0) in einer Schleife verwenden, um Ihren Batch zu erstellen - ich kenne Python nicht gut, aber vielleicht gibt es ein Problem (oder Sie könnten darüber diskutieren) auf GitHub, die dafür besser geeignet als Stack-Überlauf ist – Treziac

+0

Als Referenz (ich nehme an, es ist deins): https://github.com/conflutinc/confluent-kafka-python/issues/252 – Treziac

Verwandte Themen