2017-02-11 2 views
0

Unser Projekt hat Scala und Python-Code und wir müssen AVRO-codierte Nachrichten an Kafka senden/konsumieren.Avro Kafka Konvertierungsprobleme zwischen Scala und Python

Ich sende Avro verschlüsselt Nachrichten an Kafka mit Python und Scala. Ich habe Produzenten in scala-Code, die Avro codierte Nachrichten mit Twitter bijection Bibliothek wie folgt an:

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc") 
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString 
val schema = parser.parse(schemaFile) 
val recordInjection = GenericAvroCodecs[GenericRecord](schema) 
val avroRecord = new GenericData.Record(schema) 
avroRecord.put("url_sha256", row._1) 
avroRecord.put("url", row._2._1) 
avroRecord.put("timestamp", row._2._2) 
val recordBytes = recordInjection.apply(avroRecord) 
kafkaProducer.value.send("topic", recordBytes) 

Avro Schema sieht wie

{ 
    "namespace": "com.rm.avro", 
    "type": "record", 
    "name": "url_info", 
    "fields":[ 
    { 
     "name": "url_sha256", "type": "string" 
    }, 
    { 
     "name": "url", "type": "string" 
    }, 
    { 
     "name": "timestamp", "type": ["long"] 
    } 
] 

}

Ich bin in der Lage, sie zu entschlüsseln erfolgreich in KafkaConsumer in scala

val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc") 
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString 


kafkaInputStream.foreachRDD(kafkaRDD => { 
    kafkaRDD.foreach(

    avroRecord => { 
     val parser = new Schema.Parser() 
     val schema = parser.parse(schemaFile) 
     val recordInjection = GenericAvroCodecs[GenericRecord](schema) 
     val record = recordInjection.invert(avroRecord.value()).get 
     println(record) 
    } 
) 

} 

Allerdings kann ich nicht messag decodieren Ich es in Python erhalten folgende Ausnahme

'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte 

Python-Codes sieht aus wie folgt: schema_path = "Avro/url_info_schema.avsc" schema = avro.schema.parse (open (schema_path) .mehr())

for msg in consumer: 
    bytes_reader = io.BytesIO(msg.value) 
    decoder = avro.io.BinaryDecoder(bytes_reader) 
    reader = avro.io.DatumReader(schema) 
    decoded_msg = reader.read(decoder) 
    print(decoded_msg) 

Auch Python Avro Producer Nachrichten wird nicht von Scala Avro Consumer verstanden. Ich bekomme dort eine Ausnahme. Python Avro Produzent sieht wie folgt aus:

Wie bleibe ich konsistent über Python und Scala? Irgendwelche Zeiger sind groß

+0

herausgefunden, die Lösung. Wird in Kürze eine Lösung veröffentlichen. Es kann anderen helfen. – Abhishek

Antwort

1

Ich benutzte binären Kodierer in Python und nichts in Scala. Gerade hatte von

eine Zeile zu ändern
val recordInjection = GenericAvroCodecs[GenericRecord](schema) 

zu

val recordInjection = GenericAvroCodecs.toBinary[GenericRecord](schema) 

Ich hoffe, andere finden es nützlich. Keine Änderungen in Python-Code erforderlich

Verwandte Themen