2015-10-17 12 views
7

Ich versuche serialisierte Avro-Ereignisse aus einer Kafka-Warteschlange zu konsumieren. Die Kafka-Warteschlange wird mit einem einfachen Java-Producer gefüllt. Aus Gründen der Klarheit teilen bin ich die drei Komponenten:Logstash mit Kafka: Kann Avro nicht entschlüsseln

Avro Schemadatei

{"namespace": "example.avro", 
"type": "record", 
"name": "User", 
"fields": [ 
    {"name": "name", "type": "string"}, 
    {"name": "favorite_number", "type": ["int", "null"]}, 
    {"name": "favorite_color", "type": ["string", "null"]} 
] 
} 

Java Producer Code-Schnipsel (User.class wird die unter Verwendung von Avro-Tools)

User user1 = new User(); 
    user1.setName("Alyssa"); 
    user1.setFavoriteNumber(256); 
    user1.setFavoriteColor("blue"); 
    String topic = "MemoryTest"; 

    // Properties set in 'props' 
    KafkaProducer<Message, byte[]> producer = new KafkaProducer<Message, byte[]>(props); 

    ByteArrayOutputStream out = new ByteArrayOutputStream(); 
    DatumWriter<User> writer = new SpecificDatumWriter<User>(User.class); 
    Encoder encoder = EncoderFactory.get().binaryEncoder(out, null); 
    writer.write(user1, encoder); 
    encoder.flush(); 
    out.close(); 
    byte[] serializedBytes = out.toByteArray(); 
    producer.send(new ProducerRecord<Message, byte[]>(topic, serializedBytes)); 

Logstash Config-Datei

Problem

Die Pipeline nicht auf logstash Ebene. Wenn ein neues Ereignis in Kafka geschoben wird, bekomme ich folgendes an der logstash-Konsole:

Alyssa�blue {:exception=>#<NoMethodError: undefined method `decode' for ["avro", {"schema_uri"=>"/opt/ELK/logstash-1.5.4/bin/user.avsc"}]:Array>, :backtrace=>["/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:169:in `queue_event'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-input-kafka-1.0.0/lib/logstash/inputs/kafka.rb:139:in `run'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:177:in `inputworker'", "/opt/ELK/logstash-1.5.4/vendor/bundle/jruby/1.9/gems/logstash-core-1.5.4-java/lib/logstash/pipeline.rb:171:in `start_input'"], :level=>:error} 

Antwort

10

Schließlich herausgefunden, den Fehler. Statt dessen (wie auf Logstash Webseite vorgeschlagen - https://www.elastic.co/guide/en/logstash/current/plugins-codecs-avro.html)

codec => { 
    avro => { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
    } 
} 

Die korrekte Syntax ist (wie in den Plugin-Dokumentation vorgeschlagen https://github.com/logstash-plugins/logstash-codec-avro/blob/master/DEVELOPER.md):

codec => avro { 
     schema_uri => "/opt/ELK/logstash-1.5.4/bin/user.avsc" 
} 

Ich denke, die Syntax geändert wird.

Verwandte Themen