2017-10-20 4 views
1

Ich habe Thema mit 3 Partitionen und ich versuche, aus jeder bestimmten Partition zu lesen folgenden Codevon spezifischem Kafka Thema las Python

from kafka import KafkaConsumer, TopicPartition 

brokers = 'localhost:9092' 
topic = 'b3' 

m = KafkaConsumer(topic, bootstrap_servers=['localhost:9092']) 
par = TopicPartition(topic=topic, partition=1) 
m.assign(par) 

aber ich diese Störung erhalte:

raise IllegalStateError(self._SUBSCRIPTION_EXCEPTION_MESSAGE) 
kafka.errors.IllegalStateError: IllegalStateError: You must choose only one way to configure your consumer: (1) subscribe to specific topics by name, (2) subscribe to topics matching a regex pattern, (3) assign itself specific topic-partitions. 

Kann mir jemand dabei helfen?

Antwort

0

Können Sie den Topic-Parameter von KafkaConsumer() entfernen und es erneut versuchen?

Beispiel:

# manually assign the partition list for the consumer 
from kafka import TopicPartition, KafkaConsumer 
consumer = KafkaConsumer(bootstrap_servers='localhost:1234') 
consumer.assign([TopicPartition('foobar', 2)]) 
msg = next(consumer) 

ref: http://kafka-python.readthedocs.io/en/master/

+0

Wie würden Sie Timeout-Funktion hier einfügen? Ich möchte den Streaming-Prozess stoppen, wenn keine neue Nachricht zB 1s erscheint. –

+0

Schauen Sie sich die Funktion 'poll' an, mit der Sie eine Zeitüberschreitung einstellen können. http://kafka-python.readthedocs.io/en/master/apidoc/KafkaConsumer.html?highlight=poll#kafka.KafkaConsumer.poll –

+0

btw: Habe ich Ihnen die ursprüngliche Frage beantwortet? :-) –