2

spark streaming UI Kafka direkte Verbraucher begann, Lesevorgänge auf 450 Ereignisse (5 * 90 Partitionen) pro Batch (5 Sekunden) zu begrenzen, lief es gut für 1 oder 2 Tage davor (etwa 5000 bis 40000 Ereignisse pro Batch)Spark Streaming Kafka direkte Verbraucherverbrauch Geschwindigkeit fallen

Ich benutze Funke Standalone-Cluster (Spark und Spark-Streaming-kafka Version 1.6.1) in AWS läuft und S3-Bucket für Checkpoint-Verzeichnis StreamingContext.getOrCreate(config.sparkConfig.checkpointDir, createStreamingContext) verwenden, gibt es keine Verzögerungen zu planen und genügend Festplatte Platz auf jedem Arbeiterknoten.

Hat keine Kafka Client initiert Parameter ändern, ziemlich sicher, dass Kafkas Struktur hat sich nicht geändert:

val kafkaParams = Map("metadata.broker.list" -> kafkaConfig.broker) 
val topics = Set(kafkaConfig.topic) 
val stream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
    ssc, kafkaParams, topics) 

Auch kann nicht verstehen, warum, wenn eine direkte Verbraucher Beschreibung sagt The consumed offsets are by the stream itself ich noch Kontrollpunkt Verzeichnis verwenden müssen beim Erstellen des Streamingkontextes?

+0

Ist 'spark.streaming.backpressure.enabled' auf' true' gesetzt? –

+0

ja, ich werde versuchen, es zu deaktivieren –

+0

sieht aus wie es geholfen –

Antwort

1

Dies ist normalerweise das Ergebnis der Aktivierung des Gegendrucks über die Einstellung spark.streaming.backpressure.enabled auf True. Wenn der Backpressure-Algorithmus feststellt, dass mehr Daten eintreffen als üblich, fängt er an, jeden Batch auf eine ziemlich kleine Größe zu beschränken, bis er sich selbst wieder "stabilisieren" kann. Dies hat manchmal falsche positive Ergebnisse und bewirkt, dass Ihr Stream die Verarbeitungsrate verlangsamt.

Wenn Sie die Heuristik ein wenig optimieren wollen, gibt es einige undokumentierte Flaggen es verwendet (nur sicherstellen, dass Sie wissen, was Sie tun):

val proportional = conf.getDouble("spark.streaming.backpressure.pid.proportional", 1.0) 
val integral = conf.getDouble("spark.streaming.backpressure.pid.integral", 0.2) 
val derived = conf.getDouble("spark.streaming.backpressure.pid.derived", 0.0) 
val minRate = conf.getDouble("spark.streaming.backpressure.pid.minRate", 100) 

Wenn Sie die blutigen Details wollen, PIDRateEstimator ist was du suchst.