0

Ich bin neu in diesem ganzen Kafka/Spark Sache. Ich habe Spark Streaming (PySpark), um Daten von einem Kafka-Produzenten aufzunehmen. Es läuft gut für eine Minute und wirft dann immer eine kafka.common.OffsetOutOfRangeException. Der Kafka-Konsument ist Version 0.8 (0.10 wird anscheinend nicht für PySpark unterstützt). Ich habe einen Master-Knoten mit 3 Mitarbeitern auf AWS Ubuntu 14.04. Ich weiß nicht, ob das relevant ist, aber die Kafka-Logs hier sind relativ groß (~ 1-10kb) und ich habe die Producer/Broker/Consumer-Konfigurationen entsprechend angepasst. Die Daten werden gut durchgespielt, obwohl vielleicht langsamer als das, was der Produzent wahrscheinlich produziert (das könnte die Ursache des Problems sein?). Kafka OffsetOutOfRangeExceptionKafka + Funken Streaming: kafka.common.OffsetOutOfRangeException

Aber die Zeit ist eine Stunde meine Retention und die Größe ist 1 GB in jedem der Knoten server.properties, und was noch wichtiger ist, gibt es keine Änderung in der Zeit Spark:

Es wurde ein ähnliches Problem durch eine Erhöhung der Verweilzeit/Größe hier gelöst bis zum Fehler und die eingestellte Aufbewahrungszeit/Größe.

Gibt es noch eine andere Möglichkeit zur Anpassung, vielleicht in den Spark-Streaming-Konfigurationen? Alle Antworten, die ich online sehe, haben mit Kafka-Provisioning zu tun, aber in meinem Fall scheint es keinen Unterschied zu machen.

EDIT 1: Ich habe versucht a) mehrere Streams vom Hersteller lesen und b) den Producer Stream selbst mit time.sleep(1.0) verlangsamen. Beide hatten keine nachhaltige Wirkung.

heißt

n_secs = 1 
ssc = StreamingContext(sc, n_secs) 
kds = [KafkaUtils.createDirectStream(ssc, ['test-video'], { 
        'bootstrap.servers':'localhost:9092', 
        'group.id':'test-video-group', 
        'fetch.message.max.bytes':'15728640', 
        'auto.offset.reset':'largest'}) for _ in range(n_streams)] 

stream = ssc.union(*kds) 
+0

Sieht aus, als ob Sie den neuen Verbraucher in 0.8 verwenden, richtig? Ich schätze das von den Bootstrap-Servern anstelle einer zk-Verbindung. Wie verpflichten Sie Offsets? – dawsaw

+0

@dawsaw Es wird automatisch ausgeführt, aber über das Wochenende habe ich festgestellt, dass es sich bei Spark Streaming um ein Gegendruckproblem handelt. – thefourtheye

Antwort

0

Ist es möglich, dass Ihr Produzent zu schnell zu viele Nachrichten erzeugt, so dass 1G an jedem Broker ist nicht genug? 1G scheint in der ganzen Realität sehr niedrig zu sein. Nachdem Spark Streaming den Offset-Bereich festgelegt hat, den es im Mikro-Batch verarbeiten muss, und versuchen, die Nachrichten vom Broker basierend auf dem Offset abzurufen, sind die Nachrichten aufgrund der Größenbeschränkung nicht mehr verfügbar. Bitte erhöhen Sie die Broker-Größe zu etwas größer wie 100G und sehen, ob das Ihr Problem behebt.

+0

Ich stimme zu (nach einigen weiteren Tests), dass der Produzent mit Sicherheit Nachrichten generiert, mit denen Spark Streaming nicht mithalten kann. Aber ich denke, ich muss ändern, was ich produziere und/oder Spark-Konfigurationen, anstatt die Kafka-Bereitstellung zu ändern. – thefourtheye

Verwandte Themen