2016-11-15 2 views
1

Während Kafka läuft ->Apache Apex ->Hbase, es zu sagen hat folgende Ausnahme in Yarn Aufgaben:InvocationTargetException in Yarn Aufgabe mit Hadoop

com.datatorrent.stram.StreamingAppMasterService: Application master, appId=4, clustertimestamp=1479188884109, attemptId=2 
2016-11-15 11:59:51,068 INFO org.apache.hadoop.service.AbstractService: Service com.datatorrent.stram.StreamingAppMasterService failed in state INITED; cause: java.lang.RuntimeException: java.lang.reflect.InvocationTargetException 
java.lang.RuntimeException: java.lang.reflect.InvocationTargetException 
    at org.apache.hadoop.fs.AbstractFileSystem.newInstance(AbstractFileSystem.java:130) 
    at org.apache.hadoop.fs.AbstractFileSystem.createFileSystem(AbstractFileSystem.java:156) 
    at org.apache.hadoop.fs.AbstractFileSystem.get(AbstractFileSystem.java:241) 
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:333) 
    at org.apache.hadoop.fs.FileContext$2.run(FileContext.java:330) 
    at java.security.AccessController.doPrivileged(Native Method) 
    at javax.security.auth.Subject.doAs(Subject.java:422) 
    at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1614) 
    at org.apache.hadoop.fs.FileContext.getAbstractFileSystem(FileContext.java:330) 
    at org.apache.hadoop.fs.FileContext.getFileContext(FileContext.java:444) 

Und mein DataTorrent Protokoll die folgende Ausnahme zeigt. Ich betreibe die App, die Kafka -> Apex -> Hbase Streaming-Anwendung kommuniziert.

Connecting to ResourceManager at hduser1/127.0.0.1:8032 
16/11/15 17:47:38 WARN client.EventsAgent: Cannot read events for application_1479208737206_0008: java.io.FileNotFoundException: File does not exist: /user/hduser1/datatorrent/apps/application_1479208737206_0008/events/index.txt 
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:66) 
at org.apache.hadoop.hdfs.server.namenode.INodeFile.valueOf(INodeFile.java:56) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsUpdateTimes(FSNamesystem.java:1893) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocationsInt(FSNamesystem.java:1834) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1814) 
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getBlockLocations(FSNamesystem.java:1786) 
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getBlockLocations(NameNodeRpcServer.java:552) 
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getBlockLocations(ClientNamenodeProtocolServerSideTranslatorPB.java:362) 
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java) 
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:619) 
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:962) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2040) 
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2036) 
at java.security.AccessController.doPrivileged(Native Method) 
at javax.security.auth.Subject.doAs(Subject.java:422) 
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1656) 
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2034) 

Sie den Code:

public void populateDAG(DAG dag, Configuration conf){ 
KafkaSinglePortInputOperator in 
    = dag.addOperator("kafkaIn", new KafkaSinglePortInputOperator()); 

in.setInitialOffset(AbstractKafkaInputOperator.InitialOffset.EARLIEST.name()); 
LineOutputOperator out = dag.addOperator("fileOut", new LineOutputOperator()); 

dag.addStream("data", in.outputPort, out.input);} 

LineOutputOperator erstreckt AbstractFileOutputOperator

private static final String NL = System.lineSeparator(); 
private static final Charset CS = StandardCharsets.UTF_8; 

@NotNull 
private String baseName; 

@Override 
public byte[] getBytesForTuple(byte[] t) { 
    String result = new String(t, CS) + NL; 
    return result.getBytes(CS); 
} 

@Override 
protected String getFileName(byte[] tuple) { 
return baseName; 
} 

public String getBaseName() { return baseName; } 
public void setBaseName(String v) { baseName = v; } 

Wie dieses Problem zu lösen?

Danke.

+0

Ich sehe die Protokolle Ich glaube, der Operator ist nicht in der Lage, eine Datei zu finden. Dies kann entweder an falschen Pfadeinstellungen liegen (überprüfen Sie den Dateipfad in HDFS) oder der Dateipfad existiert nicht (weniger wahrscheinlich). Geben Sie weitere Details wie verwendete Operatoren an, die bei der Identifizierung des Problems hilfreich sein könnten. –

+0

Ich habe meinen benutzerdefinierten Apex-Operator implementiert, der BaseOperator erweitert und die Methode process() und endWindow() implementiert hat. Und zwei Variablen Eingabe und Ausgabe und ihre Klassen sind DafaultInput und Ausgabeoperatoren. – syam

Antwort

1

Können Sie einige Details über Ihre Umgebung teilen, z. B. welche Version von Hadoop und Apex? In welchem ​​Protokoll wird diese Ausnahme angezeigt?

nur als einfache Plausibilitätsprüfung können Sie laufen die einfache Maven Urbild Vom Programm auf beschrieben: http://docs.datatorrent.com/beginner/

Wenn das funktioniert, versuchen Sie die FileIO und kafka Anwendungen laufen auf: https://github.com/DataTorrent/examples/tree/master/tutorials

Wenn diese Arbeit Ok, wir können uns die Details deines Codes ansehen.

+0

Hallo ich lief Beispiel von Kafka und es funktioniert nicht richtig und funktioniert für PiDemo und randomNumbers. Meine Umgebung: Betriebssystem: Ubuntu-14.04 Kafka: 2_11.0.10.0.1.0 Apex: 3.5.0 Hbase: 1.2.4 – syam

0

ich die Lösung bekam,

Das Problem Ablauf meiner Lizenz bezogen, neu installierte So neu und arbeitet für die tatsächlichen Code in Ordnung.

Verwandte Themen