2014-02-26 11 views
8

Ich versuche, Kafka-0.8 Log4j Appender laufen und ich bin nicht in der Lage, es zu machen. Ich möchte, dass meine Anwendung Log über Log4j Appender direkt an kafka sendet.Wie benutze ich Kafka 0.8 Log4j Appender

Hier ist meine log4j.properties. Ich konnte keinen passenden Encoder finden, daher konfiguriere ich ihn nur für den Standard Encoder. (z kommentierte ich die Linie.)

log4j.rootLogger=INFO, stdout, KAFKA 

log4j.appender.stdout=org.apache.log4j.ConsoleAppender 
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout 
log4j.appender.stdout.layout.ConversionPattern=%5p [%t] (%F:%L) - %m%n 

log4j.appender.KAFKA=kafka.producer.KafkaLog4jAppender 
log4j.appender.KAFKA.layout=org.apache.log4j.PatternLayout 
log4j.appender.KAFKA.layout.ConversionPattern=%-5p: %c - %m%n 
log4j.appender.KAFKA.BrokerList=hnode01:9092 
log4j.appender.KAFKA.Topic=DKTestEvent 

#log4j.appender.KAFKA.SerializerClass=kafka.log4j.AppenderStringEncoder 

Und das ist meine Beispielanwendung.

import org.apache.log4j.Logger; 
import org.apache.log4j.BasicConfigurator; 
import org.apache.log4j.PropertyConfigurator; 

public class HelloWorld { 

     static Logger logger = Logger.getLogger(HelloWorld.class.getName()); 

     public static void main(String[] args) { 
      PropertyConfigurator.configure(args[0]); 

      logger.info("Entering application."); 
      logger.debug("Debugging!."); 
      logger.info("Exiting application."); 
     } 
} 

Ich habe Maven zum Kompilieren verwendet. I enthalten kafka_2.8.2-0.8.0 und log4j_1.2.17 in meinem pom.xml

Und ich bin immer diese Fehlermeldung:

INFO [main] (Logging.scala:67) - Verifying properties 
INFO [main] (Logging.scala:67) - Property metadata.broker.list is overridden to hnode01:9092 
INFO [main] (Logging.scala:67) - Property serializer.class is overridden to kafka.serializer.StringEncoder 
INFO [main] (HelloWorld.java:14) - Entering application. 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 0 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 1 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 2 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 3 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 4 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 5 for 1 topic(s) Set(DKTestEvent) 
. 
. 
. 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 60 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 61 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (HelloWorld.java:14) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 62 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 63 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 64 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 65 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 66 for 1 topic(s) Set(DKTestEvent) 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 67 for 1 topic(s) Set(DKTestEvent) 
. 
. 
. 
INFO [main] (Logging.scala:67) - Fetching metadata from broker id:0,host:hnode01,port:9092 with correlation id 534 for 1 topic(s) Set(DKTestEvent) 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
ERROR [main] (Logging.scala:67) - 
java.lang.StackOverflowError 
    at java.lang.ClassLoader.defineClass1(Native Method) 
    at java.lang.ClassLoader.defineClass(ClassLoader.java:643) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) 
at java.net.URLClassLoader.access$000(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:212) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:323) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:268) 
at java.lang.ClassLoader.defineClass1(Native Method) 
at java.lang.ClassLoader.defineClass(ClassLoader.java:643) 
at java.security.SecureClassLoader.defineClass(SecureClassLoader.java:142) 
at java.net.URLClassLoader.defineClass(URLClassLoader.java:277) 
at java.net.URLClassLoader.access$000(URLClassLoader.java:73) 
at java.net.URLClassLoader$1.run(URLClassLoader.java:212) 
at java.security.AccessController.doPrivileged(Native Method) 
at java.net.URLClassLoader.findClass(URLClassLoader.java:205) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:323) 
at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:294) 
at java.lang.ClassLoader.loadClass(ClassLoader.java:268) 
at org.apache.log4j.spi.ThrowableInformation.getThrowableStrRep(ThrowableInformation.java:87) 
at org.apache.log4j.spi.LoggingEvent.getThrowableStrRep(LoggingEvent.java:413) 
at org.apache.log4j.WriterAppender.subAppend(WriterAppender.java:313) 
at org.apache.log4j.WriterAppender.append(WriterAppender.java:162) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
at org.apache.log4j.Category.callAppenders(Category.java:206) 
at org.apache.log4j.Category.forcedLog(Category.java:391) 
at org.apache.log4j.Category.error(Category.java:322) 
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) 
at kafka.utils.Logging$$anonfun$swallowError$1.apply(Logging.scala:105) 
at kafka.utils.Utils$.swallow(Utils.scala:189) 
at kafka.utils.Logging$class.swallowError(Logging.scala:105) 
at kafka.utils.Utils$.swallowError(Utils.scala:46) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) 
at kafka.producer.Producer.send(Producer.scala:76) 
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
at org.apache.log4j.Category.callAppenders(Category.java:206) 
at org.apache.log4j.Category.forcedLog(Category.java:391) 
at org.apache.log4j.Category.info(Category.java:666) 
at kafka.utils.Logging$class.info(Logging.scala:67) 
at kafka.client.ClientUtils$.info(ClientUtils.scala:31) 
at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:51) 
at kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) 
at kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:67) 
at kafka.utils.Utils$.swallow(Utils.scala:187) 
at kafka.utils.Logging$class.swallowError(Logging.scala:105) 
at kafka.utils.Utils$.swallowError(Utils.scala:46) 
at kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:67) 
at kafka.producer.Producer.send(Producer.scala:76) 
at kafka.producer.KafkaLog4jAppender.append(KafkaLog4jAppender.scala:96) 
at org.apache.log4j.AppenderSkeleton.doAppend(AppenderSkeleton.java:251) 
at org.apache.log4j.helpers.AppenderAttachableImpl.appendLoopOnAppenders(AppenderAttachableImpl.java:66) 
. 
. 
. 

Ich bin über Fehler immer kontinuierlich, wenn ich das `t beenden Programm.

Wenn ich etwas vermisse, lass es mich wissen.

Antwort

1

Versuchen Sie, die appender async zu setzen, wie folgt aus: log4j.appender.KAFKA.ProducerType = Asynchron

scheint vernünftig, dass es in einer unendlichen Schleife geht, weil der kafka Produzent in sich hat .. Anmeldung

5

Ich denke, Jonas hat das Problem identifiziert, das heißt, die Kafka Producer Protokollierung wird ebenfalls im Kafka Appender geloggt und verursacht eine Endlosschleife und einen eventuellen Stack Overflow (kein Wortspiel beabsichtigt) Sie können alle Kafka Logs so konfigurieren, dass sie zu einem anderen Appender gehen . Die folgende Übersicht zeigt die Ausgabe gesendet werden an stdout:

log4j.logger.kafka=INFO, stdout 

So sollten Sie mit den folgenden in Ihrem log4j.properties am Ende

log4j.rootLogger=INFO, stdout, KAFKA 
log4j.logger.kafka=INFO, stdout 
log4j.logger.HelloWorld=INFO, KAFKA 
2

konnte ich auch über den log4j in Kafka erzeugen 0.8.2.2 . Hier ist meine log4j Konfiguration:

<?xml version="1.0" encoding="UTF-8" ?> 
<!DOCTYPE log4j:configuration SYSTEM "log4j.dtd"> 

<log4j:configuration xmlns:log4j="http://jakarta.apache.org/log4j/"> 

    <appender name="console" class="org.apache.log4j.ConsoleAppender"> 
     <param name="Target" value="System.out" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%-5p %c{1} - %m%n" /> 
     </layout> 
    </appender> 
    <appender name="fileAppender" class="org.apache.log4j.RollingFileAppender"> 
     <param name="Threshold" value="INFO" /> 
     <param name="MaxBackupIndex" value="100" /> 
     <param name="File" value="/tmp/agna-LogFile.log" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%d %-5p [%c{1}] %m %n" /> 
     </layout> 
    </appender> 
    <appender name="kafkaAppender" class="kafka.producer.KafkaLog4jAppender"> 
     <param name="Topic" value="kafkatopic" /> 
     <param name="BrokerList" value="localhost:9092" /> 
     <param name="syncSend" value="true" /> 
     <layout class="org.apache.log4j.PatternLayout"> 
     <param name="ConversionPattern" value="%d{yyyy-MM-dd HH:mm:ss} %-5p %c{1}:%L %% - %m%n" /> 
     </layout> 
    </appender> 
    <logger name="org.apache.kafka"> 
     <level value="error" /> 
     <appender-ref ref="console" /> 
    </logger> 
    <logger name="com.example.kafkaLogger"> 
     <level value="debug" /> 
     <appender-ref ref="kafkaAppender" /> 
    </logger> 
    <root> 
     <priority value="debug" /> 
     <appender-ref ref="console" /> 
    </root> 
</log4j:configuration> 

Hier ist der Quellcode:

package com.example; 

import org.slf4j.Logger; 
import org.slf4j.LoggerFactory; 

import org.json.simple.JSONArray; 
import org.json.simple.JSONObject; 
import java.util.Properties; 
import java.util.concurrent.ExecutionException; 
import org.apache.kafka.clients.producer.ProducerConfig; 
import org.apache.kafka.clients.producer.KafkaProducer; 

import org.apache.kafka.clients.producer.ProducerRecord; 
import org.apache.kafka.common.serialization.StringSerializer; 

public class JsonProducer { 
    static Logger defaultLogger = LoggerFactory.getLogger(JsonProducer.class); 
    static Logger kafkaLogger = LoggerFactory.getLogger("com.example.kafkaLogger"); 

    public static void main(String args[]) { 

     JsonProducer obj = new JsonProducer(); 

     String str = obj.getJsonObjAsString(); 

     // Use the logger 
     kafkaLogger.info(str); 

     try { 
      // Construct and send message 
      obj.constructAndSendMessage(); 
     } catch (InterruptedException e) { 
      defaultLogger.error("Caught interrupted exception " + e); 
     } catch (ExecutionException e) { 
      defaultLogger.error("Caught execution exception " + e); 
     } 
    } 

    private String getJsonObjAsString() { 
     JSONObject obj = new JSONObject(); 
     obj.put("name", "John"); 
     obj.put("age", new Integer(55)); 
     obj.put("address", "123 MainSt, Palatine, IL"); 

     JSONArray list = new JSONArray(); 
     list.add("msg 1"); 
     list.add("msg 2"); 
     list.add("msg 3"); 

     obj.put("messages", list); 

     return obj.toJSONString(); 
    } 

    private void constructAndSendMessage() throws InterruptedException, ExecutionException { 
     Properties props = new Properties(); 
     props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); 
     props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 
     props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); 

     KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); 

     boolean sync = false; 
     String topic = "kafkatopic"; 
     String key = "mykey"; 
     String value = "myvalue1 mayvalue2 myvalue3"; 
     ProducerRecord<String, String> producerRecord = new ProducerRecord<String, String>(topic, key, value); 
     if (sync) { 
      producer.send(producerRecord).get(); 
     } else { 
      producer.send(producerRecord); 
     } 
     producer.close(); 
    } 
} 

Das gesamte Projekt ist eine unter dem folgenden Link:

https://github.com/ypant/kafka-json-producer.git

Verwandte Themen