2017-08-29 3 views
1

Ich habe eine Anwendung zum Download bestimmter Web-Inhalte, aus einem Strom von URLs von 1 Kafka-Produzent generiert. Ich habe ein Thema mit 5 Partitionen erstellt und es gibt 5 Kafka-Konsumenten. Das Zeitlimit für den Download der Webseite beträgt jedoch 60 Sekunden. Während eine der URL heruntergeladen wird, geht der Server davon aus, dass die Nachricht verloren gegangen ist, und sendet die Daten erneut an verschiedene Konsumenten.Wie erzwinge ich einen Verbraucher zum Lesen einer bestimmten Partition in kafka

Ich habe versucht, alles in

erwähnt

Kafka consumer configuration/performance issues

und

https://github.com/spring-projects/spring-kafka/issues/202

Aber ich halte verschiedene Fehler jedes Mal bekommen.

Ist es möglich, einen bestimmten Verbraucher mit einer Partition in kafka zu binden? ich kafka-Python für meine Anwendung

Antwort

0

Ich habe benutze nie das Python-Client aber die Java ein unterstützt die assign Methode, die Sie anstelle die subscribe zu fragen, verwenden können, um bestimmte Partitionen für das Thema zugeordnet werden. Natürlich verlieren Sie die Funktion zur automatischen Neuverteilung, und Sie müssen diesen Anwendungsfall manuell behandeln.

0

Vielleicht denke ich, was wirklich in Ihrem Fall passiert. Wenn Ihr Kunde die URL von Kafka holt, und dann gehen Sie, um den Inhalt herunterzuladen, und Sie sagten, dass es ungefähr 60s kostet, es zu tun. So blockieren Ihre Verbraucher es wegen des Downloads und konnten keinen Herzschlag an den Kafka-Server senden. Daher denkt der kafka-Server, dass dieser Consumer down ist, also führt er eine Gruppenneubalance durch und sendet die nicht gemeldete Nachricht erneut an andere Comsumer.

So gibt es zwei Lösungen, die Sie könnten versuchen:

  1. setzen die configs session_timeout_ms bis 60000 oder größer. Der Standard ist 30s, es ist nicht genug für Sie.

  2. Eine bessere Lösung ist mit Multithreading zu tun. Wenn Ihr Kunde eine Nachricht von Kafka abruft und dann einen neuen Thread startet, um den Inhalt herunterzuladen, wird die consumer.poll nicht blockiert, also kann es gut funktionieren.

+0

habe ich versucht, die erste Lösung, die Sie erwähnt. Wie Sie gesagt war ich immer Fehler von _ CommitFailedError: kann Commit nicht abgeschlossen werden, da hat die Gruppe bereits neu gewichtet und die Partitionen mit einem anderen Element zugeordnet. Dies bedeutet, dass die Zeit b/w nachfolgende Aufrufe() war länger als die konfigurierte session.timeout.ms abfragt, die typischerweise bedeutet, dass die poll Schleife zu viel Zeit msg Verarbeitung ausgibt. Sie können diese Adresse entweder durch das Session-Timeout zu erhöhen oder durch die maximale Größe von Chargen reduzieren returnd in poll() mit max.poll.records._ Ich werde die zweite Lösung versuchen und zu aktualisieren – ashdnik

1

Ich vermisste die Dokumentation von Kafka-Python. Mit der TopicPartition-Klasse können Sie einem bestimmten Consumer eine Partition zuweisen.

http://kafka-python.readthedocs.io/en/master/

>>> # manually assign the partition list for the consumer 
>>> from kafka import TopicPartition 
>>> consumer = KafkaConsumer(bootstrap_servers='localhost:1234') 
>>> consumer.assign([TopicPartition('foobar', 2)]) 
>>> msg = next(consumer) 
Verwandte Themen