2017-11-07 2 views
0

Ich habe eine Logstash-Konfiguration, die JSON-Eingaben von Kafka erhält und an Elasticsearch Output sendet.Logstash geschachtelt JSON Parse

Received JSON hat die folgende Struktur:

TrackingData: { 
    "path": "/hello/world", 
    "method": "GET", 
    "requestDateTime": "2017-11-07T16:12:02.635", 
    "duration": 5104684, 
    "status": 200 
} 

Ich mag würde den status Wert als ein neues Feld hinzuzufügen, so dass sie später in einer neuen Spalte angezeigt werden können, wenn EL Daten sehen Kibana verwenden. Ich verstehe, dass ich dafür eine mutatefilter in der Logstash Config enthalten sollte.

Was sollte die Ruby-Zeichenfolge sein, um den status-Wert aus der JSON-Struktur in ein neues Feld zu bekommen?

Meine Logstash Config ist zur Zeit wie folgt:

input { 
    kafka { 
     bootstrap_servers => ["kafka:9092"] 
     topics => "test-topic" 
     group_id => "test-topic-group" 
    } 
} 
filter { 
    mutate { 
     add_field => { 
      "root_field" => "%{[message]}" 
      "status_field" => "%{[message][TrackingData][status]}" 
     } 
    } 
} 
output { 
    stdout { 
     codec => rubydebug 
    } 
    elasticsearch { 
     hosts => ["elk:9200"] 
} 

Wenn eine Nachricht von Logstash verarbeitet wird, kann ich die root_field analysiert Ordnung in Kibana sehen (ich habe dieses Feld hinzugefügt nur um zu beweisen, dass die json ist Parseable), aber die status_field wird als %{[message][TrackingData][status]} in Kibana angezeigt (dh die Parsing-Zeichenfolge muss falsch sein).

Wie sollte die Parsing-Zeichenfolge angegeben werden, um status Wert aus der obigen Beispiel-JSON-Struktur zu erhalten?

Antwort

0

Zwei Dinge waren falsch. Das erste Problem war, dass JSON selbst empfangen wurde, was falsch war. TrackingData wurde nicht in " umgeben. Richtig JSON sollte wie folgt sein:

"TrackingData": { 
    "path": "/hello/world", 
    "method": "GET", 
    "requestDateTime": "2017-11-07T16:12:02.635", 
    "duration": 5104684, 
    "status": 200 
} 

Zweitens, ich habe codec => "json" hinzugefügt und entfernt filter Codeblock in der Logstash config:

input { 
    kafka { 
     bootstrap_servers => ["kafka:9092"] 
     topics => "test-topic" 
     group_id => "test-topic-group" 
     codec => "json" 
    } 
} 
output { 
    stdout { 
     codec => rubydebug 
    } 
    elasticsearch { 
     hosts => ["elk:9200"] 
} 

Nun JSONs' Schlüssel/Werte analysiert werden empfangen in separate Felder und können daher bei Bedarf in separaten Spalten in Kibana angezeigt werden (oder herausgefiltert werden, wenn sie überhaupt nicht benötigt werden).

Kibana screen grab showing JSON fields in separate columns