2016-07-28 8 views
0

Ich versuche, Daten im AVRO-Format von meinem Java-Code zu Kafka HDFS mit kafka HDFS-Anschluss schreiben und ich bekomme einige Probleme. Als ich das einfache Schema und die Daten auf der konfluenten Plattform bereitgestellten verwenden, ich bin in der Lage Daten zu HDFS zu schreiben, aber wenn ich versuche, komplexes Avro-Schema zu verwenden, erhalte ich diesen Fehler in den HDFS Steckern Protokolle:Fehler beim Schreiben auf HDFS mit Kafka HDFS Connect

ERROR Task hdfs-sink-0 threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:142) 
org.apache.kafka.connect.errors.DataException: Did not find matching union field for data: PROD 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:973) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:981) 
    at io.confluent.connect.avro.AvroData.toConnectData(AvroData.java:782) 
    at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:103) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:346) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:226) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:170) 
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:142) 
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:140) 
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:175) 
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
    at java.util.concurrent.FutureTask.run(FutureTask.java:266) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) 
    at java.lang.Thread.run(Thread.java:745) 

ich bin mit konfluenten Plattform 3.0.0

Mein Java-Code:

Properties props = new Properties(); 
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerUrl); 
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,io.confluent.kafka.serializers.KafkaAvroSerializer.class); 
props.put("schema.registry.url", <url>); 
// Set any other properties 
KafkaProducer producer = new KafkaProducer(props); 

Schema schema = new Schema.Parser().parse(new FileInputStream("avsc/schema.avsc")); 
DatumReader<Object> reader = new GenericDatumReader<Object>(schema); 

InputStream input = new FileInputStream("json/data.json"); 
DataInputStream din = new DataInputStream(input); 
Decoder decoder = DecoderFactory.get().jsonDecoder(schema, din); 

Object datum = null; 
while (true) { 
    try { 
     datum = reader.read(null, decoder); 
    } catch (EOFException e) { 
     break; 
    } 
} 

ProducerRecord<Object, Object> message = new ProducerRecord<Object, Object>(topic, datum); 
producer.send(message); 
producer.close(); 

Das Schema (diese aus AVDL Datei erstellt wird):

{ 
    "type" : "record", 
    "name" : "RiskMeasureEvent", 
    "namespace" : "risk", 
    "fields" : [ { 
    "name" : "info", 
    "type" : { 
     "type" : "record", 
     "name" : "RiskMeasureInfo", 
     "fields" : [ { 
     "name" : "source", 
     "type" : { 
      "type" : "record", 
      "name" : "Source", 
      "fields" : [ { 
      "name" : "app", 
      "type" : { 
       "type" : "record", 
       "name" : "Application", 
       "fields" : [ { 
       "name" : "csi_id", 
       "type" : "string" 
       }, { 
       "name" : "name", 
       "type" : "string" 
       } ] 
      } 
      }, { 
      "name" : "env", 
      "type" : { 
       "type" : "record", 
       "name" : "Environment", 
       "fields" : [ { 
       "name" : "value", 
       "type" : [ { 
        "type" : "enum", 
        "name" : "EnvironmentConstants", 
        "symbols" : [ "DEV", "UAT", "PROD" ] 
       }, "string" ] 
       } ] 
      } 
      }, ... 

Die JSON-Datei:

{ 
    "info": { 
    "source": { 
     "app": { 
     "csi_id": "123", 
     "name": "ABC" 
     }, 
     "env": { 
     "value": { 
      "risk.EnvironmentConstants": "PROD" 
     } 
     }, ... 

Es scheint ein Problem mit Schema zu sein, aber ich kann das Problem nicht identifizieren.

Antwort

1

Ich bin ein Ingenieur für Confluent. Dies ist ein Fehler in der Art und Weise, wie der Avro Converter das Union-Schema behandelt, das Sie für env haben. Ich habe issue-393 erstellt, um dieses Problem zu beheben. Ich habe auch eine pull request mit dem Fix zusammengestellt. Dies sollte bald zusammengeführt werden.

J

+0

Hallo Jeremy, danke für deine Reparatur. Ich habe den neuesten Code der Schemaregistrierung von Ihrer Zweigstelle heruntergeladen. Da es noch nicht im konfluenten Paket enthalten ist, habe ich den Code für Apache kafka und kafka-hdfs-connect heruntergeladen und lokal erstellt. Bei dem Versuch, den hdfs-Konnektor auszuführen, erhalte ich einen Fehler beim Laden der AvroConverter-Datei (die sich in der Schemaregistrierung befindet). Darf ich wissen, wie ich den Konnektor so konfigurieren kann, dass er das Glas finden kann? – iiSGii