2017-05-31 5 views

Antwort

4

Sie müssen die Bytes mit geeigneten Deserializern dekodieren, sagen zu Strings oder Ihrem benutzerdefinierten Objekt.

Wenn Sie nicht die Decodierung tun, erhalten Sie [[email protected] das ist einfach die Textdarstellung von Byte-Arrays in Java.

Kafka weiß nichts über den Inhalt Ihrer Nachrichten und übergibt daher Byte-Arrays von Produzenten an Konsumenten.

In Spark-Streaming Sie Serializer für Schlüssel und Werte verwenden (unter Angabe KafkaWordCount example):

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
    "org.apache.kafka.common.serialization.StringSerializer") 

Mit dem oben Serializer erhalten Sie DStream[String], so dass Sie mit RDD[String] arbeiten.

Wenn Sie jedoch Byte-Arrays direkt in Ihre benutzerdefinierte Klasse deserialisieren möchten, müssen Sie eine benutzerdefinierte Serializer schreiben (die Kafka-spezifisch ist und nichts mit Spark zu tun hat).

Was ich empfehlen würde ist die Verwendung von JSON mit einem festen Schema oder Avro (mit einer Lösung beschrieben in Kafka, Spark and Avro - Part 3, Producing and consuming Avro messages).


In Structured Streaming jedoch die Pipeline könnte wie folgt aussehen:

val fromKafka = spark. 
    readStream. 
    format("kafka"). 
    option("subscribe", "topic1"). 
    option("kafka.bootstrap.servers", "localhost:9092"). 
    load. 
    select('value cast "string") // <-- conversion here 
+0

Dann, wie Avro Kafka Nachricht mit/ohne Schema Registry in Originalobjekt in Spark-Structured-Streaming-Format konvertieren? –

+0

Sie müssen das ursprüngliche Objekt zurück wissen und zum Beispiel den 'map'-Operator verwenden. Noch kein "from_avro" (wenn überhaupt) wie für JSON mit 'from_json'. –

+0

Ich benutzte KafkaAvroDeserializer, um Array [Byte] zu meinem Avro-Objekt zuzuordnen, aber es sagte "Kann Encoder für Typ nicht finden, der in einem Dataset gespeichert wird". Dann biete ich Encoder als implizit def zuEncoded (o: Zhima): Array [Byte] = o.toByteBuffer.array() implizite def vonEncoded (e: Array [Byte]): Zhima = valueDeserializer.deserialize (kafkaConsumeTopicName, e). asInstanceOf [Zhima] Aber es kompromittiert den gleichen Fehler, wie es dann zu lösen? –

Verwandte Themen