2016-11-05 3 views
7

Ich versuche, einen Funken Cluster innerhalb einer virtuellen Maschine mit IP 10.20.30.50 und Port 7077 in einer Java-Anwendung ausgeführt zu verbinden und das Wort zählt Beispiel auszuführen:Funken- und Java: Exception in awaitResult geworfen

SparkConf conf = new SparkConf().setMaster("spark://10.20.30.50:7077").setAppName("wordCount"); 
JavaSparkContext sc = new JavaSparkContext(conf); 
JavaRDD<String> textFile = sc.textFile("hdfs://localhost:8020/README.md"); 
String result = Long.toString(textFile.count()); 
JavaRDD<String> words = textFile.flatMap((FlatMapFunction<String, String>) s -> Arrays.asList(s.split(" ")).iterator()); 
JavaPairRDD<String, Integer> pairs = words.mapToPair((PairFunction<String, String, Integer>) s -> new Tuple2<>(s, 1)); 
JavaPairRDD<String, Integer> counts = pairs.reduceByKey((Function2<Integer, Integer, Integer>) (a, b) -> a + b); 
counts.saveAsTextFile("hdfs://localhost:8020/tmp/output"); 
sc.stop(); 
return result; 

Die Java-Anwendung zeigt den folgenden Stack-Trace:

Running Spark version 2.0.1 
Unable to load native-hadoop library for your platform... using builtin-java classes where applicable 
Changing view acls to: lii5ka 
Changing modify acls to: lii5ka 
Changing view acls groups to: 
Changing modify acls groups to: 
SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(lii5ka); groups with view permissions: Set(); users with modify permissions: Set(lii5ka); groups with modify permissions: Set() 
Successfully started service 'sparkDriver' on port 61267. 
Registering MapOutputTracker 
Registering BlockManagerMaster 
Created local directory at /private/var/folders/4k/h0sl02993_99bzt0dzv759000000gn/T/blockmgr-51de868d-3ba7-40be-8c53-f881f97ced63 
MemoryStore started with capacity 2004.6 MB 
Registering OutputCommitCoordinator 
Logging initialized @48403ms 
jetty-9.2.z-SNAPSHOT 
Started [email protected]{/jobs,null,AVAILABLE} 
Started [email protected]{/jobs/json,null,AVAILABLE} 
Started [email protected]{/jobs/job,null,AVAILABLE} 
Started [email protected]{/jobs/job/json,null,AVAILABLE} 
Started [email protected]{/stages,null,AVAILABLE} 
Started [email protected]39c{/stages/json,null,AVAILABLE} 
Started [email protected]{/stages/stage,null,AVAILABLE} 
Started [email protected]{/stages/stage/json,null,AVAILABLE} 
Started [email protected]{/stages/pool,null,AVAILABLE} 
Started [email protected]{/stages/pool/json,null,AVAILABLE} 
Started [email protected]{/storage,null,AVAILABLE} 
Started [email protected]{/storage/json,null,AVAILABLE} 
Started [email protected]{/storage/rdd,null,AVAILABLE} 
Started [email protected]{/storage/rdd/json,null,AVAILABLE} 
Started [email protected]{/environment,null,AVAILABLE} 
Started [email protected]{/environment/json,null,AVAILABLE} 
Started [email protected]{/executors,null,AVAILABLE} 
Started [email protected]{/executors/json,null,AVAILABLE} 
Started [email protected]{/executors/threadDump,null,AVAILABLE} 
Started [email protected]{/executors/threadDump/json,null,AVAILABLE} 
Started [email protected]{/static,null,AVAILABLE} 
Started [email protected]{/,null,AVAILABLE} 
Started [email protected]{/api,null,AVAILABLE} 
Started [email protected]{/stages/stage/kill,null,AVAILABLE} 
Started [email protected]{HTTP/1.1}{0.0.0.0:4040} 
Started @48698ms 
Successfully started service 'SparkUI' on port 4040. 
Bound SparkUI to 0.0.0.0, and started at http://192.168.0.104:4040 
Connecting to master spark://10.20.30.50:7077... 
Successfully created connection to /10.20.30.50:7077 after 25 ms (0 ms spent in bootstraps) 
Connecting to master spark://10.20.30.50:7077... 
Still have 2 requests outstanding when connection from /10.20.30.50:7077 is closed 
Failed to connect to master 10.20.30.50:7077 

org.apache.spark.SparkException: Exception thrown in awaitResult 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:77) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$1.applyOrElse(RpcTimeout.scala:75) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:36) ~[scala-library-2.11.8.jar:na] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at scala.PartialFunction$OrElse.apply(PartialFunction.scala:167) ~[scala-library-2.11.8.jar:na] 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:83) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:88) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.rpc.RpcEnv.setupEndpointRef(RpcEnv.scala:96) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.deploy.client.StandaloneAppClient$ClientEndpoint$$anonfun$tryRegisterAllMasters$1$$anon$1.run(StandaloneAppClient.scala:106) ~[spark-core_2.11-2.0.1.jar:2.0.1] 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_102] 
     at java.util.concurrent.FutureTask.run(FutureTask.java:266) [na:1.8.0_102] 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) [na:1.8.0_102] 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) [na:1.8.0_102] 
     at java.lang.Thread.run(Thread.java:745) [na:1.8.0_102] 
Caused by: java.io.IOException: Connection from /10.20.30.50:7077 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelInactive(TransportResponseHandler.java:128) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at org.apache.spark.network.server.TransportChannelHandler.channelInactive(TransportChannelHandler.java:109) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.handler.timeout.IdleStateHandler.channelInactive(IdleStateHandler.java:257) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelInactive(ChannelInboundHandlerAdapter.java:75) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at org.apache.spark.network.util.TransportFrameDecoder.channelInactive(TransportFrameDecoder.java:182) ~[spark-network-common_2.11-2.0.1.jar:2.0.1] 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelInactive(AbstractChannelHandlerContext.java:208) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelInactive(AbstractChannelHandlerContext.java:194) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.DefaultChannelPipeline.fireChannelInactive(DefaultChannelPipeline.java:828) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$7.run(AbstractChannel.java:621) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) ~[netty-all-4.0.29.Final.jar:4.0.29.Final] 
     ... 1 common frames omitted 

im Spark-Master-Protokoll auf 10.20.30.50, erhalte ich die folgende Fehlermeldung:

16/11/05 14:47:20 ERROR OneForOneStrategy: Error while decoding incoming Akka PDU of length: 1298 
akka.remote.transport.AkkaProtocolException: Error while decoding incoming Akka PDU of length: 1298 
Caused by: akka.remote.transport.PduCodecException: Decoding PDU failed. 
    at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:167) 
    at akka.remote.transport.ProtocolStateActor.akka$remote$transport$ProtocolStateActor$$decodePdu(AkkaProtocolTransport.scala:580) 
    at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:375) 
    at akka.remote.transport.ProtocolStateActor$$anonfun$4.applyOrElse(AkkaProtocolTransport.scala:343) 
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
    at akka.actor.FSM$class.processEvent(FSM.scala:604) 
    at akka.remote.transport.ProtocolStateActor.processEvent(AkkaProtocolTransport.scala:269) 
    at akka.actor.FSM$class.akka$actor$FSM$$processMsg(FSM.scala:598) 
    at akka.actor.FSM$$anonfun$receive$1.applyOrElse(FSM.scala:592) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:467) 
    at akka.remote.transport.ProtocolStateActor.aroundReceive(AkkaProtocolTransport.scala:269) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:238) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:220) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:397) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: com.google.protobuf.InvalidProtocolBufferException: Protocol message contained an invalid tag (zero). 
    at com.google.protobuf.InvalidProtocolBufferException.invalidTag(InvalidProtocolBufferException.java:89) 
    at com.google.protobuf.CodedInputStream.readTag(CodedInputStream.java:108) 
    at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6643) 
    at akka.remote.WireFormats$AkkaProtocolMessage.<init>(WireFormats.java:6607) 
    at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6703) 
    at akka.remote.WireFormats$AkkaProtocolMessage$1.parsePartialFrom(WireFormats.java:6698) 
    at com.google.protobuf.AbstractParser.parsePartialFrom(AbstractParser.java:141) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:176) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:188) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:193) 
    at com.google.protobuf.AbstractParser.parseFrom(AbstractParser.java:49) 
    at akka.remote.WireFormats$AkkaProtocolMessage.parseFrom(WireFormats.java:6821) 
    at akka.remote.transport.AkkaPduProtobufCodec$.decodePdu(AkkaPduCodec.scala:168) 
    ... 19 more 

Weitere Informationen

  • Das Beispiel funktioniert gut, wenn ich new SparkConf().setMaster("local") statt
  • ich mit spark-shell --master spark://10.20.30.50:7077 am selben Maschine zum Spark-Master anschließen kann
+0

Sie können keine Verbindung zu dem Knoten auf dem lokalen Computer mit diesem ip '10.20.30.50: 7077' – pamu

+0

warum nicht? Spark läuft in einer virtuellen Maschine auf meinem Host, auf die über diese IP zugegriffen werden kann - also sehe ich nicht, warum ich nicht in der Lage sein sollte, mich damit zu verbinden? Ist das eine besondere Einschränkung in Spark? –

+0

Sie haben mir nie gesagt, dass es eine virtuelle Maschine zwischen – pamu

Antwort

7

Sieht aus wie Netzwerk Fehler an erster Stelle (aber eigentlich NICHT) in der Verkleidung der Version Mismatch von Funken. Sie können auf die korrekte Version von Spark-Gläsern verweisen, meistens auf Montagebecher.

Dieses Problem kann aufgrund von Versionsfehlern in Hadoop RPC-Aufruf mit Protobuffer auftreten.

when a protocol message being parsed is invalid in some way, e.g. it contains a malformed varint or a negative byte length.

  • Meine Erfahrung mit protobuf kann InvalidProtocolBufferException passieren, nur, wenn die Nachricht (programmatisch zu analysieren nicht in der Lage war, wenn Sie protobuf Nachricht parsen, kann beschädigte Nachricht legth Null oder eine Nachricht sein. ..).

  • Funken verwendet Akka Schauspieler für Message Passing zwischen Master/Fahrer und Arbeiter und Internally akka uses googles protobuf to communicate. see method below from AkkaPduCodec.scala)

    override def decodePdu(raw: ByteString): AkkaPdu = { 
        try { 
         val pdu = AkkaProtocolMessage.parseFrom(raw.toArray) 
         if (pdu.hasPayload) Payload(ByteString(pdu.getPayload.asReadOnlyByteBuffer())) 
         else if (pdu.hasInstruction) decodeControlPdu(pdu.getInstruction) 
         else throw new PduCodecException("Error decoding Akka PDU: Neither message nor control message were contained", null) 
        } catch { 
         case e: InvalidProtocolBufferException ⇒ throw new PduCodecException("Decoding PDU failed.", e) 
        } 
        } 
    

Aber in Ihrem Fall, da seine Versionskonflikt, kann nicht neue protobuf Version Nachricht von der alten Version analysiert werden Parser ... oder etwas wie ...

Wenn Sie Maven andere Abhängigkeiten verwenden, bitte. Rezension.

+1

Für mich war es die Scala-Version (Patch). Vielen Dank! – combinatorist

2

Es stellte sich heraus, dass ich Spark-Version 1.5.2 in der virtuellen Maschine ausgeführt und Version 2.0.1 der Spark-Bibliothek in Java verwendet hatte. Ich reparierte das Problem, indem Sie die entsprechende Spark-Bibliothek Version in meinem pom.xml mit denen

<dependency> 
    <groupId>org.apache.spark</groupId> 
    <artifactId>spark-core_2.10</artifactId> 
    <version>1.5.2</version> 
</dependency> 

Ein weiteres Problem ist (die später aufgetreten) war, dass ich auch mit dem der Scala Version Pin die Bibliothek gebaut wurde. Dies ist das _2.10 Suffix in der artifactId.

Grundsätzlich @ RamPrassad Antwort wies mich in die richtige Richtung, aber gab keinen klaren Rat, was ich tun muss, um mein Problem zu beheben.

Übrigens: Ich konnte Spark in der virtuellen Maschine nicht aktualisieren, da es mir von der HortonWorks Distribution gebracht wurde ...

+0

"Wenn Sie maven andere Abhängigkeiten pls. Review verwenden." ... Mir war nicht einmal bewusst, dass Sie Maven verwenden und oben in meiner Antwort vorgeschlagen. –

Verwandte Themen