Unser Projekt hat Scala und Python-Code und wir müssen AVRO-codierte Nachrichten an Kafka senden/konsumieren.Avro Kafka Konvertierungsprobleme zwischen Scala und Python
Ich sende Avro verschlüsselt Nachrichten an Kafka mit Python und Scala. Ich habe Produzenten in scala-Code, die Avro codierte Nachrichten mit Twitter bijection Bibliothek wie folgt an:
val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val avroRecord = new GenericData.Record(schema)
avroRecord.put("url_sha256", row._1)
avroRecord.put("url", row._2._1)
avroRecord.put("timestamp", row._2._2)
val recordBytes = recordInjection.apply(avroRecord)
kafkaProducer.value.send("topic", recordBytes)
Avro Schema sieht wie
{
"namespace": "com.rm.avro",
"type": "record",
"name": "url_info",
"fields":[
{
"name": "url_sha256", "type": "string"
},
{
"name": "url", "type": "string"
},
{
"name": "timestamp", "type": ["long"]
}
]
}
Ich bin in der Lage, sie zu entschlüsseln erfolgreich in KafkaConsumer in scala
val resourcesPath = getClass.getResource("/avro/url_info_schema.avsc")
val schemaFile = scala.io.Source.fromURL(resourcesPath).mkString
kafkaInputStream.foreachRDD(kafkaRDD => {
kafkaRDD.foreach(
avroRecord => {
val parser = new Schema.Parser()
val schema = parser.parse(schemaFile)
val recordInjection = GenericAvroCodecs[GenericRecord](schema)
val record = recordInjection.invert(avroRecord.value()).get
println(record)
}
)
}
Allerdings kann ich nicht messag decodieren Ich es in Python erhalten folgende Ausnahme
'utf8' codec can't decode byte 0xe4 in position 16: invalid continuation byte
Python-Codes sieht aus wie folgt: schema_path = "Avro/url_info_schema.avsc" schema = avro.schema.parse (open (schema_path) .mehr())
for msg in consumer:
bytes_reader = io.BytesIO(msg.value)
decoder = avro.io.BinaryDecoder(bytes_reader)
reader = avro.io.DatumReader(schema)
decoded_msg = reader.read(decoder)
print(decoded_msg)
Auch Python Avro Producer Nachrichten wird nicht von Scala Avro Consumer verstanden. Ich bekomme dort eine Ausnahme. Python Avro Produzent sieht wie folgt aus:
Wie bleibe ich konsistent über Python und Scala? Irgendwelche Zeiger sind groß
herausgefunden, die Lösung. Wird in Kürze eine Lösung veröffentlichen. Es kann anderen helfen. – Abhishek