2017-12-14 12 views
0

ich die neue Version von Kafka-Plugin für elkstackwie kafka Themen filtern, basierend auf ihren Namen in logstash conf im Ausgangsbereich if-else-Bedingung für elastische Suche mit

input { 
    kafka { 
    bootstrap_servers => "1XX.X.X.X:9092" 
    topics => ["test_all_logs","test_apiserver_logs","test_orchestrator_logs","test_credentialstore_logs","test_gfac_logs","local_api-orch_logs","__consumer_offsets,local_gfac_logs"] 
    auto_offset_reset => "earliest" 
    decorate_events => "true" 
    } 
} 

filter{ 

json { 
    source => "message" 
    target => "doc" 
    } 
    mutate { add_field => { "level" => "%{[doc][level]}"}} 
    mutate { add_field => { "logger" => "%{[doc][loggerName]}" } } 
} 

output { 
    stdout { codec => rubydebug } 
    elasticsearch { 
    if [@metadata][kafka][topic]== "test_all_logs" { 
     hosts => ["localhost:9200"] 
     index => ["test-all-logs-%{+YYYY.MM.dd}"] 
    } 
    if [@metadata][kafka][topic] == "test_apiserver_logs" { 
     hosts => ["localhost:9200"] 
     index => ["test_apiserver_logs-%{+YYYY.MM.dd}"] 
    } 
    if [@metadata][kafka][topic] == "test_orchestrator_logs" { 
     hosts => ["localhost:9200"] 
     index => ["test_orchestrator_logs-%{+YYYY.MM.dd}"] 
    } 
    if [@metadata][kafka][topic] == "test_credentialstore_logs" { 
     hosts => ["localhost:9200"] 
     index => ["test_credentialstore_logs-%{+YYYY.MM.dd}"] 
    } 
    if [@metadata][kafka][topic} == "test_gfac_logs" { 
     hosts => ["localhost:9200"] 
     index => ["test_gfac_logs-%{+YYYY.MM.dd}"] 
    } 
    } 
} 

ich viele Links bin nach bin mit die haben hatte nicht die richtige Antwort: How to write Logstash filter to filter kafka topics

https://www.elastic.co/guide/en/logstash/current/plugins-inputs-kafka.html#plugins-inputs-kafka-decorate_events

ich habe einen Vorschlag, den ich decorate_events => "true" hinzufügen sollte. und hinzugefügt @ Metadaten-Attribut.

ich ausgeführt, um den Befehl:

/opt/logstash/bin/logstash -f airavata/logstash-airavata.conf --path.data ./airavata/ 

Aber ich bin das unten stehende Störung zu erhalten:

Failed to execute action {:action=>LogStash::PipelineAction::Create/pipeline_id:main, :exception=>"LogStash::ConfigurationError", :message=>"Expected one of #, => at line 24, column 8 (byte 579) after output {\n stdout { codec => rubydebug }\n\n elasticsearch {\n if ", 

:backtrace=>["/opt/logstash/logstash-core/lib/logstash/compiler.rb:42:in `compile_ast'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:50:in `compile_imperative'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:54:in `compile_graph'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:12:in `block in compile_sources'", "org/jruby/RubyArray.java:2486:in `map'", "/opt/logstash/logstash-core/lib/logstash/compiler.rb:11:in `compile_sources'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:107:in `compile_lir'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:49:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline.rb:215:in `initialize'", "/opt/logstash/logstash-core/lib/logstash/pipeline_action/create.rb:35:in `execute'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:335:in `block in converge_state'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:332:in `block in converge_state'", "org/jruby/RubyArray.java:1734:in `each'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:319:in `converge_state'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:166:in `block in converge_state_and_update'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:141:in `with_pipelines'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:164:in `converge_state_and_update'", "/opt/logstash/logstash-core/lib/logstash/agent.rb:90:in `execute'", "/opt/logstash/logstash-core/lib/logstash/runner.rb:362:in `block in execute'", "/opt/logstash/vendor/bundle/jruby/2.3.0/gems/stud-0.0.23/lib/stud/task.rb:24:in `block in initialize'"]} 

jemand mir helfen Fehler Kann ich tue, mit der richtigen Syntax, nicht ungekocht Syntax?

Antwort

0

Ja, Sie müssen auf jeden Fall ein decorate_events Attribut auf True setzen. Es handelt sich um einen booleschen Wert, sodass Sie ihn nicht in Anführungszeichen setzen müssen. Fügen Sie einfach folgende Zeile in Abschnitt Kafka-Eingang:

decorate_events => true 

Bedingte Anweisungen nicht innerhalb Elasticsearch Ausgabe-Plugin erlaubt. Wickeln Sie stattdessen Ihren gesamten Elasticsearch Block wie folgt ein:

if [@metadata][kafka][topic] == "topicA" { 
    elasticsearch { 
     index => "indexA" 
     ... 
    }  
} else { 
    elasticsearch { 
     index => "indexB" 
     ... 
    }   
} 
Verwandte Themen