2017-12-15 4 views
6

Ich habe eine Spark-Streaming-App in Java geschrieben und Spark 2.1 verwendet. Ich verwende KafkaUtils.createDirectStream, um Nachrichten von Kafka zu lesen. Ich benutze Kryo-Encoder/Decoder für Kafka-Nachrichten. Ich habe dies in Kafka Eigenschaften angegeben-> key.deserializer, value.deserializer, key.serializer, value.deserializer

Wenn Spark die Nachrichten in einem Mikro-Batch zieht, werden die Nachrichten erfolgreich mit dem Kryo-Decoder decodiert. Ich habe jedoch bemerkt, dass der Spark-Executor eine neue Instanz des kryo-Decoders erstellt, um jede von kafka gelesene Nachricht zu entschlüsseln. Ich überprüfte das, indem ich Logs in den Decoder-Konstruktor legte

Das scheint mir komisch. Sollte nicht für jede Nachricht und jeden Batch die gleiche Instanz des Decoders verwendet werden?Warum erstellt Kafka Direct Stream für jede Nachricht einen neuen Decoder?

-Code, wo ich von kafka lese:

JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
     jssc, 
     LocationStrategies.PreferConsistent(), 
     ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams)); 

JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> { 
    return new Tuple2<String, Class1>(consRecord.key(), consRecord.value()); 
}); 

Antwort

3

Wenn wir sehen wollen, wie Spark-Daten von Kafka holt intern, werden wir bei KafkaRDD.compute aussehen müssen, die ein Verfahren für jeden RDD realisiert wird, der Rahmen wird beschrieben, wie, na ja, berechnen, dass RDD:

override def compute(thePart: Partition, context: TaskContext): Iterator[R] = { 
    val part = thePart.asInstanceOf[KafkaRDDPartition] 
    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part)) 
    if (part.fromOffset == part.untilOffset) { 
    logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " + 
    s"skipping ${part.topic} ${part.partition}") 
    Iterator.empty 
    } else { 
    new KafkaRDDIterator(part, context) 
    } 
} 

Was hier wichtig ist, ist die else-Klausel, die eine KafkaRDDIterator schafft. Diese intern hat:

val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[K]] 

val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties]) 
    .newInstance(kc.config.props) 
    .asInstanceOf[Decoder[V]] 

die, wie Sie sehen, die beide eine Instanz des Schlüsseldekodierer erzeugt und den Wert Decoder über Reflexion, für jede zugrundeliegenden Partition. Dies bedeutet, dass pro Nachricht aber pro Kafka-Partition nicht generiert wird.

Warum wird es auf diese Weise implementiert? Ich weiß es nicht. Ich gehe davon aus, dass ein Schlüssel- und Wertdecoder einen vernachlässigbaren Leistungseinbruch im Vergleich zu allen anderen Zuweisungen in Spark haben sollte.

Wenn Sie Ihre App profiliert und festgestellt haben, dass es sich um einen Hot-Pfad für die Zuweisung handelt, könnten Sie ein Problem beheben. Ansonsten würde ich mir keine Sorgen machen.

+0

Sehr gut erforscht! #impressed –

+0

@Yuval: Ich benutze Kafka 0.10.x. Spark verwendet zwischengespeicherte Kafka-Konsumenten (pro Executor), bei denen der Cacheschlüssel durch die Verbraucher-ID, die Themen-ID und die Partitions-ID identifiziert wird. Es macht Sinn, einen Decoder pro Kafka-Partition zu haben oder wie Spark sonst Nachrichten parallel dekodiert. Was ich erwarte ist, dass ein neuer Decoder einmal pro Partition in einem zwischengespeicherten Verbraucher erstellt werden muss und das ist es! Ich sehe dieses Problem nicht unter einer geringen Last, sondern nur, wenn ich 1000 von Nachrichten pro Sekunde pumpe. Wahrscheinlich laufe ich in einen "GC" -Zyklus. Haben Sie eine Idee, wie Sie die Protokollierung in der KafkaRDD-Klasse aktivieren können? – scorpio

+0

@scorpio Kafka 0.10.x benötigt überhaupt keinen Decoder. Es gibt den zugrunde liegenden 'ConsumerRecord' zurück und Sie entscheiden, was damit zu tun ist. Erstellen Sie vielleicht eine Instanz eines Decoders in einer 'Karte'? –

Verwandte Themen