2016-01-05 1 views
6

Ich bin komplett neu bei Kafka und avro und versuche, das konfluente Paket zu verwenden. Wir haben bestehende POJOs, die wir für JPA verwenden, und ich möchte einfach eine Instanz meiner POJOs erstellen können, ohne jeden Wert manuell in einen generischen Datensatz einbeziehen zu müssen. Mir scheint zu fehlen, wie das in der Dokumentation gemacht wird.Konvertieren von pojos in generische Datensätze in confluent.io zum Senden über einen KafkaProducer

Die Beispiele verwenden einen allgemeinen Datensatz und setzen jeden Wert eins nach dem anderen in etwa so:

String key = "key1"; 
String userSchema = "{\"type\":\"record\"," + 
        "\"name\":\"myrecord\"," + 
        "\"fields\":[{\"name\":\"f1\",\"type\":\"string\"}]}"; 
Schema.Parser parser = new Schema.Parser(); 
Schema schema = parser.parse(userSchema); 
GenericRecord avroRecord = new GenericData.Record(schema); 
avroRecord.put("f1", "value1"); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

Es gibt mehrere Beispiele für ein Schema aus einer Klasse bekommen, und ich fand die Anmerkungen dieses Schema wie nötig zu ändern. Wie nehme ich nun eine Instanz eines POJO und sende es einfach an den Serialisierer und lasse die Bibliothek das Schema von der Klasse abgleichen und dann die Werte in einen generischen Datensatz kopieren? Gehe ich das alles falsch? Was ich will, ist am Ende etwas zu tun, wie folgt aus:

String key = "key1"; 
Schema schema = ReflectData.get().getSchema(myObject.getClass()); 
GenericRecord avroRecord = ReflectData.get().getRecord(myObject, schema); 

record = new ProducerRecord<Object, Object>("topic1", key, avroRecord); 
try { 
    producer.send(record); 
} catch(SerializationException e) { 
    // may need to do something with it 
} 

Dank!

Antwort

1

gewickelt ich meine eigenen Serializer in diesem Fall bis zu erstellen:

public class KafkaAvroReflectionSerializer extends KafkaAvroSerializer { 
    private final EncoderFactory encoderFactory = EncoderFactory.get(); 

    @Override 
    protected byte[] serializeImpl(String subject, Object object) throws SerializationException { 
     //TODO: consider caching schemas 
     Schema schema = null; 

     if(object == null) { 
     return null; 
     } else { 
     try { 
      schema = ReflectData.get().getSchema(object.getClass()); 
      int e = this.schemaRegistry.register(subject, schema); 
      ByteArrayOutputStream out = new ByteArrayOutputStream(); 
      out.write(0); 
      out.write(ByteBuffer.allocate(4).putInt(e).array()); 

      BinaryEncoder encoder = encoderFactory.directBinaryEncoder(out, null); 
      DatumWriter<Object> writer = new ReflectDatumWriter<>(schema); 
      writer.write(object, encoder); 
      encoder.flush(); 
      out.close(); 

      byte[] bytes = out.toByteArray(); 
      return bytes; 
     } catch (IOException ioe) { 
      throw new SerializationException("Error serializing Avro message", ioe); 
     } catch (RestClientException rce) { 
      throw new SerializationException("Error registering Avro schema: " + schema, rce); 
     } catch (RuntimeException re) { 
      throw new SerializationException("Error serializing Avro message", re); 
     } 
     } 
    } 
} 
Verwandte Themen