2016-05-20 16 views
3

Ich habe eine Nachricht, die durch mehrere Systeme fließt, jedes System protokolliert die Nachrichteneingabe und endet mit einem Zeitstempel und einer Uuid messageId. Ich bin die Einnahme alle Protokolle durch:Berechne die Zeit zwischen den Ereignissen

filebeat --> logstash --> elastic search --> kibana 

Als Ergebnis habe ich jetzt diese Ereignisse:

@timestamp      messageId        event 
May 19th 2016, 02:55:29.003  00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 
May 19th 2016, 02:55:29.200  00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 
May 19th 2016, 02:55:29.205  00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 
May 19th 2016, 02:55:29.453  00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit 

Ich möchte einen Bericht erstellen (idealerweise eine gestapelte Balken oder Spalte) Zeit damit verbracht, in Jedes System:

messageId        in1:1->2:in2 
00e02f2f-32d5-9509-870a-f80e54dc8775 197:5:248 

Was ist der beste Weg, dies zu tun? Logstash-Filter? Kibana berechnete Felder?

+0

Es ist ein gutes Beispiel dafür in den logstash [Aggregate Filter] beschrieben (https://www.elastic.co/guide/en/logstash/current/plugins-filters-aggregate.html) – Val

+0

@val , Hmm, dieses Beispiel benötigt die verstrichene Zeit, um in der Protokollzeile zu sein - berechnet es nicht aus zwei separaten Protokollzeilen. Vielleicht kann ich das zusammen mit dem 'Elapsed {}' Plugin irgendwie benutzen. – RaGe

Antwort

9

Sie können dies nur mit dem Logstash aggregate filter erreichen, jedoch müssten Sie das, was der elapsed filter bereits tut, wesentlich neu implementieren, also wäre das eine Schande, oder?

Lassen Sie uns dann eine Mischung aus dem Logstash aggregate filter und dem elapsed filter verwenden. Letzteres wird verwendet, um die Zeit jeder Stufe zu messen, und die erstgenannte wird verwendet, um alle Zeitinformationen in das letzte Ereignis zu aggregieren.

Seitennotiz: Sie könnten Ihr Zeitstempelformat überdenken, um es etwas standardmäßiger für das Parsing zu machen. Ich habe sie in ISO 8601 umgewandelt, um das Parsen einfacher zu machen, aber Sie können Ihren eigenen Regex erstellen.

So beginne ich aus den folgenden Protokolle:

2016-05-19T02:55:29.003 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Enter 
2016-05-19T02:55:29.200 00e02f2f-32d5-9509-870a-f80e54dc8775 system1Exit 
2016-05-19T02:55:29.205 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Enter 
2016-05-19T02:55:29.453 00e02f2f-32d5-9509-870a-f80e54dc8775 system2Exit 

Zuerst bin ich mit drei elapsed Filter (eines für jede Stufe in1, 1->2 und in2) und dann drei Aggregat-Filter, um alle zu sammeln die Zeitinformation. Es sieht wie folgt aus:

filter { 
    grok { 
    match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] 
    add_tag => [ "%{event}" ] 
    } 
    date { 
    match => [ "timestamp", "ISO8601"] 
    } 
    # Measures the execution time of system1 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Enter" 
    end_tag => "system1Exit" 
    new_event_on_match => true 
    add_tag => ["in1"] 
    } 
    # Measures the execution time of system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system2Enter" 
    end_tag => "system2Exit" 
    new_event_on_match => true 
    add_tag => ["in2"] 
    } 
    # Measures the time between system1 and system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Exit" 
    end_tag => "system2Enter" 
    new_event_on_match => true 
    add_tag => ["1->2"] 
    } 
    # Records the execution time of system1 
    if "in1" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] = [(event['elapsed_time']*1000).to_i]" 
     map_action => "create" 
    } 
    } 
    # Records the time between system1 and system2 
    if "1->2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event['elapsed_time']*1000).to_i" 
     map_action => "update" 
    } 
    } 
    # Records the execution time of system2 
    if "in2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event['elapsed_time']*1000).to_i; event['report'] = map['report'].join(':')" 
     map_action => "update" 
     end_of_task => true 
    } 
    } 
} 

Nach den ersten beiden Veranstaltungen finden Sie eine neue Veranstaltung wie diese erhalten, was zeigt, dass 197ms in system1 ausgegeben worden:

{ 
       "@timestamp" => "2016-05-21T04:20:51.731Z", 
         "tags" => [ "elapsed", "elapsed_match", "in1" ], 
       "elapsed_time" => 0.197, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.003Z" 
} 

Nach dem dritten Fall, Sie eine Veranstaltung wie diese erhalten, die, wie viel Zeit zeigt zwischen system1 und system2, dh 5ms ausgegeben wird:

{ 
       "@timestamp" => "2016-05-21T04:20:51.734Z", 
         "tags" => [ "elapsed", "elapsed_match", "1->2" ], 
       "elapsed_time" => 0.005, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.200Z" 
} 

Nach dem vierten Fall, werden Sie ein neues Ereignis wie dieses erhalten, die zeigt, wie viel ti Ich wurde in System2, d. h. 248ms, verbracht. Dieses Ereignis enthält auch ein report Feld mit all Timing-Informationen der Nachricht

{ 
       "@timestamp" => "2016-05-21T04:20:51.736Z", 
         "tags" => [ "elapsed", "elapsed_match", "in2" ], 
       "elapsed_time" => 0.248, 
        "messageId" => "00e02f2f-32d5-9509-870a-f80e54dc8775", 
    "elapsed_timestamp_start" => "2016-05-19T00:55:29.205Z" 
        "report" => "197:5:248" 
} 
+0

Danke, ich ging das gleiche Kaninchenloch runter, das hilft mir schneller dort hin zu kommen.Die Sache, die ich an den verstrichenen Filtern bemerkt habe, ist, dass, da jede meiner Protokollzeilen nur mit einem der mehreren verstrichenen Filter übereinstimmt, alle nicht übereinstimmenden Filter ein "_grokparsefailure" -Tag hinzufügen. Ich habe ein "tag_on_failure => []" zu meinen verstrichenen Filtern hinzugefügt. – RaGe

+0

Haben Sie es geschafft, dass es so funktioniert, wie Sie es erwartet haben? – Val

+0

Versuchen Sie, die Fehler zu erkennen, die ich mit 'to_i' sehe, anscheinend ist die Eingabe in den Kartenzeilen ein Array anstelle einer Zeichenkette. 'Aggregierte Ausnahme aufgetreten. Fehler: undefinierte Methode 'to_i 'für [197.0]: Array' – RaGe

1

Ich mache hier musste einige Optimierungen für diese Arbeit in logstash 5.4 ist der überarbeitete Code.

filter { 
    grok { 
    match => ["message", "%{TIMESTAMP_ISO8601:timestamp} %{UUID:messageId} %{WORD:event}"] 
    add_tag => [ "%{event}" ] 
    } 
    date { 
    match => [ "timestamp", "ISO8601"] 
    } 
    # Measures the execution time of system1 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Enter" 
    end_tag => "system1Exit" 
    new_event_on_match => true 
    add_tag => ["in1"] 
    } 
    # Measures the time between system1 and system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system1Exit" 
    end_tag => "system2Enter" 
    new_event_on_match => true 
    add_tag => ["1->2"] 
    } 
    # Measures the execution time of system2 
    elapsed { 
    unique_id_field => "messageId" 
    start_tag => "system2Enter" 
    end_tag => "system2Exit" 
    new_event_on_match => true 
    add_tag => ["in2"] 
    } 
    # Records the execution time of system1 
    if "in1" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] = (event.get('elapsed_time')*1000).to_i" 
     map_action => "create" 
    } 
    } 
    # Records the time between system1 and system2 
    if "1->2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event.get('elapsed_time')*1000).to_i" 
     map_action => "update" 
    } 
    } 
    # Records the execution time of system2 
    if "in2" in [tags] and "elapsed" in [tags] { 
    aggregate { 
     task_id => "%{messageId}" 
     code => "map['report'] << (event.get('elapsed_time')*1000).to_i" 
     map_action => "update" 
     end_of_task => true 
    } 
    } 
} 
+0

Verstrichene und aggregierte Filter werden nicht ordnungsgemäß funktionieren wenn mehrere Logstash-Worker verwendet werden. Wie lösen wir in diesem Fall das gleiche Problem? Vielen Dank. –

+0

Sie zwingen es, eine einzige Arbeit zu verwenden. Dies könnte ein Engpass sein, aber andere Elemente müssen nicht den gleichen Weg folgen –

Verwandte Themen