2014-09-05 6 views
12

Spark-Kollegen, ich bin ziemlich neu in Spark, deshalb hoffe ich auf Ihre Hilfe in der Tat.Eigenständiger Funkencluster. Job kann nicht programmgesteuert übergeben werden -> java.io.InvalidClassException

Ich versuche, den recht einfachen Job auf Spark Cluster von meinem Laptop zu planen. Obwohl es funktioniert, wenn ich es mit ./spark-submit einreiche, löst es eine Ausnahme aus, wenn ich versuche, es programmatisch zu machen.

Umgebung: - Spark - 1 Master-Knoten und 2 Arbeiterknoten (Standalone-Modus). Spark wurde nicht kompiliert, aber die Binärdateien wurden heruntergeladen. Spark-Version 1.0.2 - Java-Version "1.7.0_45" - Application-JAR befindet sich überall (auf dem Client und auf Worker-Knoten an der gleichen Stelle); - README.md-Datei wird auch auf jeden Knoten kopiert;

Die Anwendung Ich versuche zu laufen:

val logFile = "/user/vagrant/README.md" 

val conf = new SparkConf() 
conf.setMaster("spark://192.168.33.50:7077") 
conf.setAppName("Simple App") 
conf.setJars(List("file:///user/vagrant/spark-1.0.2-bin-hadoop1/bin/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar")) 
conf.setSparkHome("/user/vagrant/spark-1.0.2-bin-hadoop1") 

val sc = new SparkContext(conf) 

val logData = sc.textFile(logFile, 2).cache() 

...

Das Problem ist also, dass diese Anwendung auf Cluster erfolgreich läuft, wenn ich das tue:

./spark-submit --class com.paycasso.SimpleApp --master spark://192.168.33.50:7077 --deploy-mode client file:///home/vagrant/spark-1.0.2-bin-hadoop1/bin/hello-apache-spark_2.10-1.0.0-SNAPSHOT.jar 

Aber es funktioniert nicht, wenn ich versuche, das gleiche programmatisch durch Aufruf sbt run

zu tun

Hier ist die Stacktrace, dass ich auf Master-Knoten erhalten:

14/09/04 15:09:44 ERROR Remoting: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = -6451051318873184044, local class serialVersionUID = 583745679236071411 
java.io.InvalidClassException: org.apache.spark.deploy.ApplicationDescription; local class incompatible: stream classdesc serialVersionUID = -6451051318873184044, local class serialVersionUID = 583745679236071411 
    at java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:617) 
    at java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:1622) 
    at java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1517) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1771) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:1990) 
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1915) 
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1798) 
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1350) 
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:370) 
    at akka.serialization.JavaSerializer$$anonfun$1.apply(Serializer.scala:136) 
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57) 
    at akka.serialization.JavaSerializer.fromBinary(Serializer.scala:136) 
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) 
    at scala.util.Try$.apply(Try.scala:161) 
    at akka.serialization.Serialization.deserialize(Serialization.scala:98) 
    at akka.remote.serialization.MessageContainerSerializer.fromBinary(MessageContainerSerializer.scala:58) 
    at akka.serialization.Serialization$$anonfun$deserialize$1.apply(Serialization.scala:104) 
    at scala.util.Try$.apply(Try.scala:161) 
    at akka.serialization.Serialization.deserialize(Serialization.scala:98) 
    at akka.remote.MessageSerializer$.deserialize(MessageSerializer.scala:23) 
    at akka.remote.DefaultMessageDispatcher.payload$lzycompute$1(Endpoint.scala:55) 
    at akka.remote.DefaultMessageDispatcher.payload$1(Endpoint.scala:55) 
    at akka.remote.DefaultMessageDispatcher.dispatch(Endpoint.scala:73) 
    at akka.remote.EndpointReader$$anonfun$receive$2.applyOrElse(Endpoint.scala:764) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 

Was die Lösung dieses Problems sein könnte? Vielen Dank im Voraus.

+0

Haben Sie versucht, es lokal mit 'sbt run' laufen zu lassen? –

+0

Danke, Tathagata, für deine Hilfe. Ja, ich habe es lokal mit local [10] versucht - es funktioniert. Deshalb sieht es sehr seltsam aus, dass ein sehr einfaches Beispiel so schwierig ist, auf Cluster –

+0

@ Dr.Khu zu laufen: Ich würde genau dasselbe machen wollen. Das oben genannte Programm von dir reicht ein Glas zum Funken einreichen? Ich bin etwas verwirrt. freundlich helfen. – chaosguru

Antwort

10

Nachdem ich viel Zeit verschwendet habe, habe ich das Problem gefunden. Obwohl ich in meiner Bewerbung keine hadoop/hdfs verwendet habe, spielt hadoop client eine wichtige Rolle. Das Problem war in Hadoop-Client-Version, es war anders als die Version von Hadoop, für die Funken gebaut wurde. Spark Hadoop Version 1.2.1, aber in meiner Anwendung war das 2,4.

Als ich die Version von hadoop client in 1.2.1 in meiner App änderte, bin ich in der Lage, den Code im Cluster zu starten.

+0

Nein, ich verstehe nicht, was Sie hier getan haben. In Ihrem Code erwähnt es nirgends, was Hadoop-Client-Version ist. Ich stehe vor dem gleichen Problem, also brauche bitte deine Hilfe. –

+0

Ich meine, dass ich funke der Version 1.0.2 für Hadoop 1.x vorinstalliert habe, aber ich meine 'build.sbt' gab es eine Abhängigkeit zum Spark Core (' "org.apache.spark" %% " spark-core "%" 1.0.2 "') - richtiger - und hadoop client ('" org.apache.hadoop "%" hadoop-client "%" 2.4 "') - das war nicht korrekt, denn spark wurde gebaut für die hadoop Version 1.x. Als ich die Version des hadoop Clients von 2.4 auf 1.2.1 geändert habe, ist diese Ausnahme weg. –

+0

Ich habe das gleiche mit Maven Build von spark1.1.0 mvn -Dhadoop.version = 1.2.1 -DskipTests sauberes Paket verwendet die Version, die Sie erwähnt, aber ich halte java.io.InvalidClassException: org.apache.spark.storage.BlockManagerId; läuft gut bei der Verwendung von Spark submit .. bitte helfen – mithra

0

Statt den regulären Spark-Core-Bibliothek verwenden, können Sie eine solche Abhängigkeit (definiert auf der build.sbt-Datei) verwenden:

resolvers += "Talend" at "https://talend-update.talend.com/nexus/content/repositories/libraries/" 
libraryDependencies += "org.talend.libraries" % "spark-assembly-1.6.0-hadoop2.6.0" % "6.0.0" 

/\ Die Funken Montage -... hadoop ... Bibliotheken können ziemlich groß sein (und daher nicht kompatibel mit einem Git-Push).

List of Spark/Hadoop Libraries Hosted by Talend

0

Wenn Sie eine vorkompilierte Funken verwenden. (Kein sbt oder maven installieren), stellen Sie sicher, dass alle Arbeiterknoten die gleiche Version von Spark verwenden. Ich hatte das gleiche Problem, da einer der Arbeiterknoten eine andere Version von Spark verwendete. behalten Sie die gleichen Versionen über alle Knoten gelöst das Problem für mich. Eine Maschine verwendete funken-2.0.0 - bin-hadoop2.7 anstelle von funken-2.0.0-preview-bin-hadoop2.7

Verwandte Themen