2016-03-25 10 views
1

Ich benutze einen Kanal-Agent, um externe Daten über Flume-Agent zu sammeln. Der externe Datenstapel beträgt fast 1 MB pro 10 Sekunden. Ich habe Flume Agent wie folgt konfiguriert.Ereignis von Netcat-Quelle geht nicht durch Kafka-Kanal

# Flume agent configuration as /flume/conf/agent.conf 
agent.sources = netcat-source 
agent.channels = kafka-channel 
agent.sinks = logger-sink 

######################################## 
# Netcat Source 
######################################## 

agent.sources.netcat-source.type = netcat 
agent.sources.netcat-source.bind = 0.0.0.0 
agent.sources.netcat-source.port = 4141 
agent.sources.netcat-source.max-line-length = 500000 
agent.sources.netcat-source.channels = kafka-channel 

######################################## 
# Kafka Channel 
######################################## 

agent.channels.kafka-channel.type = org.apache.flume.channel.kafka.KafkaChannel 
agent.channels.kafka-channel.brokerList = 10.212.136.108:9092,10.212.136.108:9092 
agent.channels.kafka-channel.zookeeperConnect = 10.212.136.108:2181,10.212.136.108:2181/kafka 
agent.channels.kafka-channel.topic = channel 
agent.channels.kafka-channel.groupId = fcd-group 


######################################## 
# Logger Sink 
######################################## 

agent.sinks.logger-sink.type = logger 
agent.sinks.logger-sink.channel = kafka-channel 

Ich aktiviert den Agenten in folgender Weise.

Leider stellte sich heraus, dass Netcat Source gut funktionierte und etwas mit Kanal oder Senke schief ging. Vom Ressourcenmonitor von Ubuntu kann ich die folgende Leistung sehen. Network performance. Blue curve indicates input while red one indicates output Ohne andere Anwendung läuft mit Netzwerk Io, ich bin sicher, diese Zahl zeigte, was mit meinem Flume Agent passiert ist.

Ich habe nichts, wenn ich Kafka Inhalt im Thema "Kanal" über Konsole Verbraucher überprüft. Auch wenn ich flume.log überprüfte, bekam ich nur Flume-Ausgabe seines Status ohne Daten.

hatte ich validiert eingehende Daten mit

nc -lk 4141 >> my_data_check_file 

Was mit meinem Kanal falsch ist oder sinken?

P. S. Die Dinge liefen ähnlich schwierig, wenn ich Speicherkanal, Dateikanal verwendete.

Antwort

1

Ah, endlich habe ich dieses Problem selbst gelöst!

Der entscheidende Punkt ist der Zeilenbegrenzer '\ n'.

In Gerinne Quellcode NetcatSource.java, haben wir eine Reihe triky wie folgt

private int processEvents(CharBuffer buffer, Writer writer) throws IOException { 
    int numProcessed = 0; 

    boolean foundNewLine = true; 
    while (foundNewLine) { 
    foundNewLine = false; 

    int limit = buffer.limit(); 
    for (int pos = buffer.position(); pos < limit; pos++) { 
     if (buffer.get(pos) == '\n') { 
     // parse event body bytes out of CharBuffer 
     buffer.limit(pos); // temporary limit 
     ByteBuffer bytes = Charsets.UTF_8.encode(buffer); 
     buffer.limit(limit); // restore limit 
... ... 
... ... 

Die Eingangskräfte Codedaten mit '\ n' zu beenden. Andernfalls werden keine Ereignisse nach Kanal übernommen. Wir können dieses Zeichen nach Bedarf ändern und die angepasste Quelle in $ FLUME_HOME/lib

setzen
Verwandte Themen