Ich habe eine Spark-Streaming-App in Java geschrieben und Spark 2.1 verwendet. Ich verwende KafkaUtils.createDirectStream
, um Nachrichten von Kafka zu lesen. Ich benutze Kryo-Encoder/Decoder für Kafka-Nachrichten. Ich habe dies in Kafka Eigenschaften angegeben-> key.deserializer, value.deserializer, key.serializer, value.deserializer
Wenn Spark die Nachrichten in einem Mikro-Batch zieht, werden die Nachrichten erfolgreich mit dem Kryo-Decoder decodiert. Ich habe jedoch bemerkt, dass der Spark-Executor eine neue Instanz des kryo-Decoders erstellt, um jede von kafka gelesene Nachricht zu entschlüsseln. Ich überprüfte das, indem ich Logs in den Decoder-Konstruktor legte
Das scheint mir komisch. Sollte nicht für jede Nachricht und jeden Batch die gleiche Instanz des Decoders verwendet werden?Warum erstellt Kafka Direct Stream für jede Nachricht einen neuen Decoder?
-Code, wo ich von kafka lese:
JavaInputDStream<ConsumerRecord<String, Class1>> consumerRecords = KafkaUtils.createDirectStream(
jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, Class1>Subscribe(topics, kafkaParams));
JavaPairDStream<String, Class1> converted = consumerRecords.mapToPair(consRecord -> {
return new Tuple2<String, Class1>(consRecord.key(), consRecord.value());
});
Sehr gut erforscht! #impressed –
@Yuval: Ich benutze Kafka 0.10.x. Spark verwendet zwischengespeicherte Kafka-Konsumenten (pro Executor), bei denen der Cacheschlüssel durch die Verbraucher-ID, die Themen-ID und die Partitions-ID identifiziert wird. Es macht Sinn, einen Decoder pro Kafka-Partition zu haben oder wie Spark sonst Nachrichten parallel dekodiert. Was ich erwarte ist, dass ein neuer Decoder einmal pro Partition in einem zwischengespeicherten Verbraucher erstellt werden muss und das ist es! Ich sehe dieses Problem nicht unter einer geringen Last, sondern nur, wenn ich 1000 von Nachrichten pro Sekunde pumpe. Wahrscheinlich laufe ich in einen "GC" -Zyklus. Haben Sie eine Idee, wie Sie die Protokollierung in der KafkaRDD-Klasse aktivieren können? – scorpio
@scorpio Kafka 0.10.x benötigt überhaupt keinen Decoder. Es gibt den zugrunde liegenden 'ConsumerRecord' zurück und Sie entscheiden, was damit zu tun ist. Erstellen Sie vielleicht eine Instanz eines Decoders in einer 'Karte'? –