2017-02-03 6 views
2

Ich habe ein einfaches Protokoll Beispiel:Apache Kamel: Aggregat durch Qualifier von log

2017-02-02 09:58:12,764 - INFO - PRC0XK - logged in 
2017-02-02 09:58:13,766 - INFO - L3J5WW - logged in 
2017-02-02 09:58:14,005 - INFO - 0NKCVZ - call s2 
2017-02-02 09:58:14,767 - INFO - P0QIOW - logged in 
2017-02-02 09:58:15,729 - INFO - E0MVFZ - call s2 
2017-02-02 09:58:16,257 - INFO - L3J5WW - call s2 
2017-02-02 09:58:17,750 - INFO - PRC0XK - call s2 
2017-02-02 09:58:21,908 - INFO - P0QIOW - call s2 
2017-02-02 09:58:30,479 - INFO - PRC0XK - get answer from s2 
2017-02-02 09:58:30,479 - INFO - PRC0XK - logged out 

Es von Feldern wie gebildet wird. Ich möchte es als Eingabe verwenden und die Aktionen eins nach dem anderen durch USERID bilden. Später möchte ich eine andere Protokolldatei hinzufügen, die auf die gleiche Weise gebildet wurde, die auch die einfache modifizierte USERID hat, und sammeln Sie alle Aktionen durch zwei Protokolle zusammen von USERID. Ich versuche, Aggregationsstrategie zu verwenden, aber ich habe einige, die ich nicht erwarte. Mein Kamel Route ist:

<route id="fileeater"> 
<description> 
    this route will eat log file and try to put guid through lot of log entry by some identifier 
</description> 
<from uri="file://data/in?charset=utf-8"/> 
<split streaming="true"> 
    <tokenize token="\n"/> 
    <to uri="log:gotlogline"/> 
    <aggregate strategyRef="SimpleAggregationStrategy" completionSize="4"> 
     <correlationExpression> 
      <constant>true</constant> 
     </correlationExpression> 
     <log logName="LOGEater" message="this is logeater part"/> 
     <to uri="file://data/out"/> 
    </aggregate> 
</split> 

wo SimpleAggregationStrategy ist:

import org.apache.camel.Exchange; 
import org.apache.camel.processor.aggregate.AggregationStrategy; 

public class SimpleAggregationStrategy implements AggregationStrategy{ 

@Override 
public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 

    if(oldExchange == null) { 
     return newExchange; 
    } 

    String oldBody = oldExchange.getIn().getBody(String.class); 
    String newBody = newExchange.getIn().getBody(String.class); 
    String body= oldBody; 
    if (oldBody.split(" - ")[2].equalsIgnoreCase(newBody.split(" - ")[2])){ 
     body = oldBody + "\n" + newBody; 
    } 

    oldExchange.getIn().setBody(body); 

    return oldExchange; 
} 

} 

Also erwarte ich, dass Einträge protokolliert und gruppiert nach USERID:

... 
2017-02-02 09:59:45,599 - INFO - NU7444 - logged in 
2017-02-02 09:59:51,229 - INFO - NU7444 - call s2 
2017-02-02 10:00:09,818 - INFO - NU7444 - get answer from s2 
2017-02-02 10:00:09,818 - INFO - NU7444 - logged out 
... 

Aber ich habe nur zwei Zeilen bekam in der Ausgangsdatei:

2017-02-02 10:00:09,818 - INFO - NU7444 - get answer from s2 
2017-02-02 10:00:09,818 - INFO - NU7444 - logged out 

Meine Gedanken über correlationExpression in Aggregation:

  1. kann ich von Log-Zeile (split ("-") [2] als USERID) verwenden Teil durch Aggregation zusammen zu binden?

  2. Ich lese http://www.catify.com/2012/07/09/parsing-large-files-with-apache-camel/ und festgestellt, dass Aggregation von Header ist schneller als einfache Aggregation. Also, kann ich den Teil der Zeile nach der Aufspaltung als Kopfzeile verwenden und dann in der Aggregation nach Kopfzeile sammeln? Sollte ich Prozessor verwenden, um einen Teil der Zeile (USERID) zu bekommen und es in eine Kopfzeile dafür zu setzen?

+0

gut, ich einen Prozessor hinzufügen, die für jede Zeile Header füllen: 'public class UserIDProcessor implementiert Prozessor { \t public void Prozess (Exchange Exchange) throws Exception { \t \t String input = exchange.getIn() getBody (String. .Klasse); \t if (input.split ("-") .length> 2) { \t \t exchange.getIn(). SetHeader ("LOGLEVEL", input.split ("-") [1]); \t \t exchange.getIn().setHeader ("USERID", input.split ("-") [2]); \t} \t exchange.getIn(). SetBody (Eingabe); \t} 'und machte Aggregation von Header. Aber ich habe immer noch nur für eine USERID, irgendwelche Vorschläge ausgegeben? – smartydoc

Antwort

0

Nun, Leute. Scheint so, als hätte ich nach dem Kamelspielen eine Lösung gefunden. es geht um einen Prozess, der zu einem Header für jeden Protokolleintrag festlegen kann, wie ich in Kommentar erwähnen:

public class UserIDProcessor implements Processor{ 
    public void process(Exchange exchange) throws Exception { 
     String input = exchange.getIn().getBody(String.class); 
     if (input.split(" - ").length > 2){ 
      exchange.getIn().setHeader("LOGLEVEL", input.split(" - ")[1]); 
      exchange.getIn().setHeader("USERID", input.split(" - ")[2]); 
     } 
     exchange.getIn().setBody(input); 
    } 
} 

Dann aggregieren i Nachrichten-Header mit einfachen aggrstrategy:

public class SimpleAggregationStrategy implements AggregationStrategy{ 
    @Override 
    public Exchange aggregate(Exchange oldExchange, Exchange newExchange) { 
     if(oldExchange == null) { 
      return newExchange; 
     } 
     String oldBody = oldExchange.getIn().getBody(String.class); 
     String newBody = newExchange.getIn().getBody(String.class); 
     String body= oldBody + "\r\n" + newBody; 
     oldExchange.getIn().setBody(body); 
     return oldExchange; 
    } 
} 

Und verwenden Sie ganz einfach

<route id="fileeater"> 
    <description> 
     this route will eat log file and try to put guid through lot of log entry by some identifier 
    </description> 
    <from uri="file://data/in?charset=utf-8&amp;delete=false&amp;readLock=idempotent-changed&amp;readLockCheckInterval=5000"/> 
    <split streaming="true"> 
     <tokenize token="\n"/> 
     <process ref="UIDProcessor"/> 
     <aggregate strategyRef="SimpleAggregationStrategy" completionSize="4"> 
      <correlationExpression> 
       <simple>header.USERID</simple> 
      </correlationExpression> 
      <to uri="log:gotlogline"/> 
      <to uri="file://data/out?fileExist=append"/> 
     </aggregate> 
    </split> 
</route> 

auch erhöhen Parsen Geschwindigkeit y: Route (können Sie Timeout-Flag und Abschluss Größe in Aggregation Route Teil basiert auf Ihren Bedürfnissen hinzufügen) Sie können ein parallelProcessing="true" Flag zum Teilen hinzufügen und erhalten ein extrem schnelles Ergebnis.

Verwandte Themen