2017-07-03 3 views
1

Ich arbeite an einem Programm, das GPS-Daten von einem mqtt-Broker und lädt es in den Hadoop-Cluster. Beim Versuch, die Daten in hdfs zu schreiben, bekomme ich und IOException. Hier finden Sie die vollständige Stacktrace:IOException beim Schreiben einer Datei nach Hdfs

java.io.IOException: Failed on local exception: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status; Host Details : local host is: "quickstart.cloudera/192.168.25.170"; destination host is: "quickstart.cloudera":8020; 
    at org.apache.hadoop.net.NetUtils.wrapException(NetUtils.java:765) 
    at org.apache.hadoop.ipc.Client.call(Client.java:1165) 
    at org.apache.hadoop.ipc.ProtobufRpcEngine$Invoker.invoke(ProtobufRpcEngine.java:184) 
    at com.sun.proxy.$Proxy7.create(Unknown Source) 
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:165) 
    at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:84) 
    at com.sun.proxy.$Proxy7.create(Unknown Source) 
    at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolTranslatorPB.create(ClientNamenodeProtocolTranslatorPB.java:187) 
    at org.apache.hadoop.hdfs.DFSOutputStream.<init>(DFSOutputStream.java:1250) 
    at org.apache.hadoop.hdfs.DFSOutputStream.newStreamForCreate(DFSOutputStream.java:1269) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1063) 
    at org.apache.hadoop.hdfs.DFSClient.create(DFSClient.java:1021) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:232) 
    at org.apache.hadoop.hdfs.DistributedFileSystem.create(DistributedFileSystem.java:75) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:806) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:787) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:686) 
    at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:675) 
    at com.mqttHadoopLoader.hadoop.MqttLoader.HdfsWriter.writeToHdfs(HdfsWriter.java:19) 
    at com.mqttHadoopLoader.hadoop.MqttLoader.MqttDataLoader.messageArrived(MqttDataLoader.java:43) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.handleMessage(CommsCallback.java:354) 
    at org.eclipse.paho.client.mqttv3.internal.CommsCallback.run(CommsCallback.java:162) 
    at java.lang.Thread.run(Thread.java:748) 
Caused by: com.google.protobuf.InvalidProtocolBufferException: Message missing required fields: callId, status 
    at com.google.protobuf.UninitializedMessageException.asInvalidProtocolBufferException(UninitializedMessageException.java:81) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.buildParsed(RpcPayloadHeaderProtos.java:1094) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto$Builder.access$1300(RpcPayloadHeaderProtos.java:1028) 
    at org.apache.hadoop.ipc.protobuf.RpcPayloadHeaderProtos$RpcResponseHeaderProto.parseDelimitedFrom(RpcPayloadHeaderProtos.java:986) 
    at org.apache.hadoop.ipc.Client$Connection.receiveResponse(Client.java:850) 
    at org.apache.hadoop.ipc.Client$Connection.run(Client.java:781) 

Es scheint, wie der Fehler auftritt, wenn ich versuche, den Outputstream zu schaffen, aber es ist schwer zu sagen, weil mein Eclipse Debugger nicht richtig funktioniert (es sagt, dass es keine Verbindung herstellen kann die VM und ich habe eine Vielzahl von Fixes hier auf Stackoverflow ausprobiert). Hier ist mein Code für meine HdfsWriter:

String destFile = "hdfs://127.0.0.0.1:8020/gpsData/output/gps_data.txt"; 
//Note *this is just a placeholder IP address for the purpose of this post. I do have the fully correct IP address for the program. 

    public void writeToHdfs(String gpsInfo) { 
     try { 
      Configuration conf = new Configuration(); 
      System.out.println("Connecting to -- " + conf.get("fs.defaultFS")); 

      FileSystem fs = FileSystem.get(URI.create(destFile), conf); 
      System.out.println(fs.getUri()); 

      // Error seems to occur here 
      OutputStream outStream = fs.create(new Path(destFile)); 

      byte[] messageByt = gpsInfo.getBytes(); 
      outStream.write(messageByt); 
      outStream.close(); 

      System.out.println(destFile + " copied to HDFS"); 

     } catch (FileNotFoundException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } catch (IOException e) { 
      // TODO Auto-generated catch block 
      e.printStackTrace(); 
     } 
    } 

Dies ist die MQTT Methode, die für die HdfsWriter ruft:

public void messageArrived(String topic, MqttMessage message) 
     throws Exception { 
      System.out.println(message); 
      HdfsWriter hdfsWriter = new HdfsWriter(); 
      hdfsWriter.writeToHdfs(message.toString()); 
    } 

Ich bin immer noch wirklich neu Hadoop so dass jeder und alle Hilfe groß sein würde.

UPDATE ich meine Debug-Arbeits haben und Sie definitiv sagen kann, dass der Fehler tritt auf, wenn ich versuche, ein Filesystem-Methode aufzurufen. Zum Beispiel wird der Fehler auch von fs.exists(pt) und fs.setReplication() ausgelöst.

+0

Sie müssen sorgfältig die Fehlermeldung lesen. Es zeigt Ihnen, was falsch ist: * InvalidProtocolBufferException: Nachricht fehlende erforderliche Felder: CallId, Status; *. Sie senden also ungültige Daten an den Server. Das Problem liegt in dem, was Sie senden, nicht darin, wie Sie es senden. –

+0

@JB Nizet Ja ich habe ** die Fehlermeldung ** gelesen. Mehrmals. Wieder und wieder. Ich weiß nicht einmal, über welche "Nachricht" die Fehlermeldung spricht. Wenn Sie sich meinen Code angesehen haben, den ich eingefügt habe, würden Sie sehen, dass ich nichts habe, was direkt "Nachricht" nennt. Ich habe mein Debug funktioniert und kann Ihnen sagen, dass es mit jeder FileSystem-Nachricht geschieht, die ich versuche aufzurufen (dh es passiert auch mit 'fs.exists (pt)' oder 'fs.setReplication (pt, (short) 1)')). Ich kann jedoch nicht finden, wo dieser Fehler direkt auftritt. – ebbBliss

Antwort

0

Ich glaube, hdfs mit Google Protobuf-Bibliothek. Und Ihr Client-Code scheint falsche (inkompatible) Version von Protobuf zu verwenden. Versuche, in diese Richtung zu graben.

0

Das Protokoll zwischen dem HDFS-Client und dem NameNode verwendet Google Protocol Buffers zum Serialisieren von Nachrichten. Der Fehler weist darauf hin, dass die vom Client gesendete Nachricht nicht alle erwarteten Felder enthielt, sodass sie nicht mit dem Server kompatibel ist.

Dies deutet darauf hin, dass Sie eine Version des HDFS-Clients ausführen, die älter als die Version von NameNode ist. Beispiel: Das Feld callId wurde in das Feature implementiert, das von Apache JIRA-Problem HADOOP-9762 erfasst und in Apache Hadoop 2.1.0-beta ausgeliefert wird. Ein Client vor dieser Version hätte in seinen Nachrichten callId nicht enthalten, sodass er mit einem NameNode, der 2.1.0-beta oder höher ausführt, nicht kompatibel wäre.

Ich empfehle Ihnen, Ihre Clientanwendung zu überprüfen, um sicherzustellen, dass sie die Hadoop-Clientbibliotheksversion verwendet, die der Hadoop-Clusterversion entspricht. Von der Stapelüberwachung scheint es, dass Sie die Cloudera-Verteilung verwenden. Wenn dies der Fall ist, haben Sie wahrscheinlich den größten Erfolg, wenn Sie die entsprechende Cliudera-abhängige Version der Client-Bibliothek in ihrem Maven-Repository verwenden. Details finden Sie unter Using the CDH 5 Maven Repository.

Verwandte Themen