2016-12-11 1 views
2

Diese sind Avros mit der Confluent-Plattform serialisiert.Wie lese ich binär serialisierte Avro (Confluent Platform) von Kafka mit Spark Streaming

Ich mag würde ein funktionierendes Beispiel wie folgt finden:

https://github.com/seanpquig/confluent-platform-spark-streaming/blob/master/src/main/scala/example/StreamingJob.scala

aber für Spark Structured Streaming.

kafka 
    .select("value") 
    .map { row => 

    // this gives me test == testRehydrated  
    val test = Foo("bar") 
    val testBytes = AvroWriter[Foo].toBytes(test) 
    val testRehydrated = AvroReader[Foo].fromBytes(testBytes) 


    // this yields mangled Foo data 
    val bytes = row.getAs[Array[Byte]]("value") 
    val rehydrated = AvroReader[Foo].fromBytes(bytes) 
+0

Haben Sie eine funktionierende Lösung gefunden? – aasthetic

+0

@aasthetic siehe unten – zzztimbo

Antwort

1

Ich fand heraus, dass Sie den Confluent-Plattformdecoder verwenden müssen, wenn Sie ihre Sachen lesen möchten.

def decoder: io.confluent.kafka.serializers.KafkaAvroDecoder = { 
    val props = new Properties() 
    props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, getSchemaRegistryUrl()) 
    props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true") 
    val vProps = new kafka.utils.VerifiableProperties(props) 
    new io.confluent.kafka.serializers.KafkaAvroDecoder(vProps) 
} 
+0

danke @zzztimbo, ich habe auch Schema für meine Schlüssel registriert, sie müssen von Long-Typ sein. Ich kann sie in Spark Streaming nicht deserialisieren. Irgendeine Idee? – aasthetic

+1

Es wäre hilfreich, ein vollständiges Beispiel für die Einrichtung von strukturiertem Streaming mit einem Kafka-Thema und einer Schemaregistrierung zu erhalten. Mir ist nicht klar, was ich mit diesem Decoder machen soll. –

Verwandte Themen