2016-04-12 12 views
4

Verwenden von kafka-python-1.0.2.kafka-python - Wie stelle ich eine Partition fest?

Wenn ich ein Thema mit 10 Partitionen habe, wie gehe ich über eine bestimmte Partition, während ich die verschiedenen Partitionen und Nachrichten durchlaufe. Ich kann einfach nicht scheinen, ein Beispiel für diese überall in der Dokumentation finden oder sonst

Aus den Dokumenten, ich verwenden möchte:

consumer.commit(offset=offsets)

Insbesondere wie erstelle ich die Partition und OffsetAndMetadata Wörterbuch erforderlich für Offsets (dict, optional) - {TopicPartition: OffsetAndMetadata}.

Ich hatte gehofft, der Funktionsaufruf nur wäre so etwas wie:

consumer.commit(partition, offset)

aber dies scheint nicht der Fall zu sein.

Vielen Dank im Voraus.

Antwort

3

Es sieht also so aus, als hätte ich es herausgefunden, komisch, wie das passiert, wenn du deine Fragen aufschreibst. Dies scheint zu funktionieren:

meta = consumer.partitions_for_topic(topic) 
options = {} 
options[partition] = OffsetAndMetadata(message.offset, meta) 
consumer.commit(options) 

Weitere Tests sind erforderlich, werden jedoch aktualisiert, wenn sich etwas ändert.

+0

Irgendwas falsch danach? Ich möchte das Gleiche tun. –

+1

So geht es, ich habe mich mit GitLab an das Kafka-Team gewandt. Antwort: "Die Metadaten sind wirklich nur eine undurchsichtige Zeichenfolge. Sie können auch Keine übergeben. Nothing verwendet Metadaten intern, es ist dort als eine Möglichkeit für Sie, um anwendungsspezifische Daten zu speichern, wenn nötig. Aber sehr wenige Leute nutzen diese Funktionalität tatsächlich Vorsicht, wenn Sie diesen Pfad gehen. –

+0

Hier ist der Link zu diesem Thema: https://github.com/dpkp/kafka-python/issues/645 –

2

Die Metadaten müssen nicht verwendet werden. Sehen Sie dieses Beispiel:

from kafka import TopicPartition 
from kafka.structs import OffsetAndMetadata 
... 
topic = 'your_topic' 
partition = 0 
tp = TopicPartition(topic,partition) 
kafkaConsumer = createKafkaConsumer() 
kafkaConsumer.assign([tp]) 
offset = 15394125 
kafkaConsumer.commit({ 
    tp: OffsetAndMetadata(offset, None) 
}) 

Hoffnung, das hilft.

1
from kafka import KafkaConsumer 
from kafka import TopicPartition 

TOPIC = "test_topic" 
PARTITION = 0 

consumer = KafkaConsumer(
    group_id=TOPIC, 
    auto_offset_reset="earliest", 
    bootstrap_servers="localhost:9092", 
    request_timeout_ms=100000, 
    session_timeout_ms=99000, 
    max_poll_records=100, 
) 
topic_partition = TopicPartition(TOPIC, PARTITION) 
# format: topic, partition 
consumer.assign([topic_partition]) 
consumer.seek(topic_partition, 1660000) 
# format: TopicPartition, offset. 1660000 is the offset been set. 
for message in consumer: 
    # do something 
  1. Dies weist nur eine Partition und setzt für diese Partition versetzt, wenn es mehr als eine Partition sind, müssen Sie eine für jeden von ihnen zuweisen und dann den Offset.
  2. Die Antwort von aalmeida88 funktioniert manchmal für mich, wenn es in einigen Situationen funktioniert, und aalmeida88 gab mir Ideen zu suchen und es scheint, dass es auch eine nützliche Methode ist.
  3. Eine andere Sache, die Sie beachten müssen, ist, dass, wenn Sie Partitionen selbst zuweisen, es scheint, dass kafka manager die Verbraucherinformationen nicht erhalten konnte, könnte dies sein, wenn Sie Partitionen zuweisen, in kafka statt zoekeeper, so Kafka-Manager kann diese Information nicht bekommen. Hoffe es hilft!

--- bearbeiten -----

Suchen Sie einen besseren Weg, es zu tun.

topic_partition = TopicPartition(TOPIC, 
           message.partition) 
consumer.seek(topic_partition, offset_value) 
consumer.commit() 

Dies wird die Partition Informationen von Nachricht von kafka erhalten extrahieren und die Klausel speichert Partition manuell zuweisen, wodurch der Komfort bringt, wenn es mehr als eine Partition Offset (nicht selten) muß in Programm festgelegt werden.

ps: Um sicherzustellen, dass eine Partition nur einmal gesetzt wird, sollte ein Flag entsprechend Ihrer Anwendung gesetzt werden.

Verwandte Themen