1

Ich versuche, eine einfache Wordcount SparkStreaming-Programm in Cloudera VM auszuführen. Ich verwende Scala im REPL-Modus und verwende keine IDE.Verbindung verweigert während der Ausführung von SparkStreaming-Programm mit Hilfe von Scala

Hier ist mein Code

val ssc = new StreamingContext(sc, Seconds(2)) 

val lines = ssc.socketTextStream("localhost",8585,MEMORY_ONLY) 

val wordsFlatMap = lines.flatMap(_.split(" ")) 

val wordsMap = wordsFlatMap.map(w => (w,1)) 

val wordCount = wordsMap.reduceByKey((a,b) => (a+b)) 

wordCount.print 

ssc.start 

I Verbindungsfehler verweigert bekommen. Ich führe das Programm im REPL-Modus aus. Folgendes ist der Fehler.

scala> ssc.start 
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Starting 1 receivers 
17/04/19 03:06:43 INFO scheduler.ReceiverTracker: ReceiverTracker started 
17/04/19 03:06:43 INFO dstream.ForEachDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.MappedDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: metadataCleanupDelay = -1 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.SocketInputDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.FlatMappedDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.MappedDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.MappedDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.MappedDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.MappedDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.MappedDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.ShuffledDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Slide time = 2000 ms 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Storage level = StorageLevel(false, false, false, false, 1) 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Checkpoint interval = null 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Remember duration = 2000 ms 
17/04/19 03:06:43 INFO dstream.ForEachDStream: Initialized and validated [email protected] 
17/04/19 03:06:43 INFO util.RecurringTimer: Started timer for JobGenerator at time 1492596404000 
17/04/19 03:06:43 INFO scheduler.JobGenerator: Started JobGenerator at 1492596404000 ms 
17/04/19 03:06:43 INFO scheduler.JobScheduler: Started JobScheduler 
17/04/19 03:06:43 INFO streaming.StreamingContext: StreamingContext started 

scala> 17/04/19 03:06:43 INFO scheduler.ReceiverTracker: Receiver 0 started 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Got job 0 (submitJob at ReceiverTracker.scala:557) with 1 output partitions 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Final stage: ResultStage 0(submitJob at ReceiverTracker.scala:557) 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Parents of final stage: List() 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Missing parents: List() 
17/04/19 03:06:43 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554), which has no missing parents 
17/04/19 03:06:44 INFO scheduler.JobScheduler: Added jobs for time 1492596404000 ms 
17/04/19 03:06:44 INFO scheduler.JobScheduler: Starting job streaming job 1492596404000 ms.0 from job set of time 1492596404000 ms 
17/04/19 03:06:44 INFO spark.SparkContext: Starting job: print at <console>:47 
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(65984) called with curMem=0, maxMem=560497950 
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 64.4 KB, free 534.5 MB) 
17/04/19 03:06:44 INFO storage.MemoryStore: ensureFreeSpace(22354) called with curMem=65984, maxMem=560497950 
17/04/19 03:06:44 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 21.8 KB, free 534.4 MB) 
17/04/19 03:06:44 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:41905 (size: 21.8 KB, free: 534.5 MB) 
17/04/19 03:06:44 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:861 
17/04/19 03:06:44 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (Receiver 0 ParallelCollectionRDD[0] at makeRDD at ReceiverTracker.scala:554) 
17/04/19 03:06:44 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Registering RDD 3 (map at <console>:42) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Got job 1 (print at <console>:47) with 1 output partitions 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Final stage: ResultStage 2(print at <console>:47) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 1) 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Missing parents: List() 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44), which has no missing parents 
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(2400) called with curMem=88338, maxMem=560497950 
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 2.3 KB, free 534.4 MB) 
17/04/19 03:06:45 INFO storage.MemoryStore: ensureFreeSpace(1429) called with curMem=90738, maxMem=560497950 
17/04/19 03:06:45 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 1429.0 B, free 534.4 MB) 
17/04/19 03:06:45 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:41905 (size: 1429.0 B, free: 534.5 MB) 
17/04/19 03:06:45 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861 
17/04/19 03:06:45 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 2 (ShuffledRDD[4] at reduceByKey at <console>:44) 
17/04/19 03:06:45 INFO scheduler.TaskSchedulerImpl: Adding task set 2.0 with 1 tasks 
17/04/19 03:06:45 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, partition 0,ANY, 2644 bytes) 
17/04/19 03:06:45 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0) 
17/04/19 03:06:45 INFO util.RecurringTimer: Started timer for BlockGenerator at time 1492596405400 
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started BlockGenerator 
17/04/19 03:06:45 INFO receiver.BlockGenerator: Started block pushing thread 
17/04/19 03:06:45 INFO scheduler.ReceiverTracker: Registered receiver for stream 0 from 10.0.2.15:50802 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Starting receiver 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStart 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Waiting for receiver to be stopped 
17/04/19 03:06:45 INFO dstream.SocketReceiver: Connecting to localhost:8585 
17/04/19 03:06:45 WARN receiver.ReceiverSupervisorImpl: Restarting receiver with delay 2000 ms: Error connecting to localhost:8585 
java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at java.net.Socket.<init>(Socket.java:425) 
    at java.net.Socket.<init>(Socket.java:208) 
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) 
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59) 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Stopping receiver with message: Restarting receiver with delay 2000ms: Error connecting to localhost:8585: java.net.ConnectException: Connection refused 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Called receiver onStop 
17/04/19 03:06:45 INFO receiver.ReceiverSupervisorImpl: Deregistering receiver 0 
17/04/19 03:06:45 ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0: Restarting receiver with delay 2000ms: Error connecting to localhost:8585 - java.net.ConnectException: Connection refused 
    at java.net.PlainSocketImpl.socketConnect(Native Method) 
    at java.net.AbstractPlainSocketImpl.doConnect(AbstractPlainSocketImpl.java:339) 
    at java.net.AbstractPlainSocketImpl.connectToAddress(AbstractPlainSocketImpl.java:200) 
    at java.net.AbstractPlainSocketImpl.connect(AbstractPlainSocketImpl.java:182) 
    at java.net.SocksSocketImpl.connect(SocksSocketImpl.java:392) 
    at java.net.Socket.connect(Socket.java:579) 
    at java.net.Socket.connect(Socket.java:528) 
    at java.net.Socket.<init>(Socket.java:425) 
    at java.net.Socket.<init>(Socket.java:208) 
    at org.apache.spark.streaming.dstream.SocketReceiver.receive(SocketInputDStream.scala:73) 
    at org.apache.spark.streaming.dstream.SocketReceiver$$anon$2.run(SocketInputDStream.scala:59) 

ich einen anderen Fehler, wenn ich meinen Code geändert, wie unten dargestellt:

var sparkConf = new SparkConf().setAppName("Streaming Example").setMaster("local[2]").set("spark.drive.allowMultipleContexts","true") 
val ssc = new StreamingContext(sparkConf,Seconds(2)) 

-

mir jemand den Fehler beheben helfen kann

17/04/19 03:18:52 INFO spark.SparkContext: Running Spark version 1.5.0-cdh5.5.0 
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing view acls to: cloudera 
    17/04/19 03:18:52 INFO spark.SecurityManager: Changing modify acls to: cloudera 
    17/04/19 03:18:52 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(cloudera); users with modify permissions: Set(cloudera) 
    17/04/19 03:18:53 INFO slf4j.Slf4jLogger: Slf4jLogger started 
    17/04/19 03:18:53 INFO Remoting: Starting remoting 
    17/04/19 03:18:53 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://[email protected]:42235] 
    17/04/19 03:18:53 INFO Remoting: Remoting now listens on addresses: [akka.tcp://[email protected]:42235] 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'sparkDriver' on port 42235. 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering MapOutputTracker 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering BlockManagerMaster 
    17/04/19 03:18:53 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-b87051bc-5b7f-4c4f-975f-a0661b3ec29f 
    17/04/19 03:18:53 INFO storage.MemoryStore: MemoryStore started with capacity 534.5 MB 
    17/04/19 03:18:53 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3c5d465-ca27-4aa0-ad43-47088abb7703/httpd-01babb12-0237-4faa-9917-394a768cbcaa 
    17/04/19 03:18:53 INFO spark.HttpServer: Starting HTTP Server 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:52313 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'HTTP file server' on port 52313. 
    17/04/19 03:18:53 INFO spark.SparkEnv: Registering OutputCommitCoordinator 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]:4040: java.net.BindException: Address already in use 
    java.net.BindException: Address already in use 
     at sun.nio.ch.Net.bind0(Native Method) 
     at sun.nio.ch.Net.bind(Net.java:444) 
     at sun.nio.ch.Net.bind(Net.java:436) 
     at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) 
     at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) 
     at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.spark-project.jetty.server.Server.doStart(Server.java:293) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913) 
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
     at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904) 
     at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246) 
     at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) 
     at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854) 
     at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
     at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38) 
     at $line31.$read$$iwC$$iwC.<init>(<console>:40) 
     at $line31.$read$$iwC.<init>(<console>:42) 
     at $line31.$read.<init>(<console>:44) 
     at $line31.$read$.<init>(<console>:48) 
     at $line31.$read$.<clinit>(<console>) 
     at $line31.$eval$.<init>(<console>:7) 
     at $line31.$eval$.<clinit>(<console>) 
     at $line31.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    17/04/19 03:18:53 WARN component.AbstractLifeCycle: FAILED [email protected]: java.net.BindException: Address already in use 
    java.net.BindException: Address already in use 
     at sun.nio.ch.Net.bind0(Native Method) 
     at sun.nio.ch.Net.bind(Net.java:444) 
     at sun.nio.ch.Net.bind(Net.java:436) 
     at sun.nio.ch.ServerSocketChannelImpl.bind(ServerSocketChannelImpl.java:214) 
     at sun.nio.ch.ServerSocketAdaptor.bind(ServerSocketAdaptor.java:74) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.open(SelectChannelConnector.java:187) 
     at org.spark-project.jetty.server.AbstractConnector.doStart(AbstractConnector.java:316) 
     at org.spark-project.jetty.server.nio.SelectChannelConnector.doStart(SelectChannelConnector.java:265) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.spark-project.jetty.server.Server.doStart(Server.java:293) 
     at org.spark-project.jetty.util.component.AbstractLifeCycle.start(AbstractLifeCycle.java:64) 
     at org.apache.spark.ui.JettyUtils$.org$apache$spark$ui$JettyUtils$$connect$1(JettyUtils.scala:236) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.ui.JettyUtils$$anonfun$3.apply(JettyUtils.scala:246) 
     at org.apache.spark.util.Utils$$anonfun$startServiceOnPort$1.apply$mcVI$sp(Utils.scala:1913) 
     at scala.collection.immutable.Range.foreach$mVc$sp(Range.scala:141) 
     at org.apache.spark.util.Utils$.startServiceOnPort(Utils.scala:1904) 
     at org.apache.spark.ui.JettyUtils$.startJettyServer(JettyUtils.scala:246) 
     at org.apache.spark.ui.WebUI.bind(WebUI.scala:136) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at org.apache.spark.SparkContext$$anonfun$13.apply(SparkContext.scala:474) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.SparkContext.<init>(SparkContext.scala:474) 
     at org.apache.spark.streaming.StreamingContext$.createNewSparkContext(StreamingContext.scala:854) 
     at org.apache.spark.streaming.StreamingContext.<init>(StreamingContext.scala:81) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:25) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:30) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:32) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:34) 
     at $line31.$read$$iwC$$iwC$$iwC$$iwC.<init>(<console>:36) 
     at $line31.$read$$iwC$$iwC$$iwC.<init>(<console>:38) 
     at $line31.$read$$iwC$$iwC.<init>(<console>:40) 
     at $line31.$read$$iwC.<init>(<console>:42) 
     at $line31.$read.<init>(<console>:44) 
     at $line31.$read$.<init>(<console>:48) 
     at $line31.$read$.<clinit>(<console>) 
     at $line31.$eval$.<init>(<console>:7) 
     at $line31.$eval$.<clinit>(<console>) 
     at $line31.$eval.$print(<console>) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
     at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
     at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
     at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
     at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
     at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
     at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
     at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
     at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
     at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
     at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
     at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
     at org.apache.spark.repl.Main$.main(Main.scala:31) 
     at org.apache.spark.repl.Main.main(Main.scala) 
     at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
     at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
     at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
     at java.lang.reflect.Method.invoke(Method.java:606) 
     at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
     at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
     at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
     at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
     at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null} 
    17/04/19 03:18:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null} 
    17/04/19 03:18:53 WARN util.Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041. 
    17/04/19 03:18:53 INFO server.Server: jetty-8.y.z-SNAPSHOT 
    17/04/19 03:18:53 INFO server.AbstractConnector: Started [email protected]:4041 
    17/04/19 03:18:53 INFO util.Utils: Successfully started service 'SparkUI' on port 4041. 
    17/04/19 03:18:53 INFO ui.SparkUI: Started SparkUI at http://localhost:4041 
    17/04/19 03:18:53 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set. 
    17/04/19 03:18:53 INFO storage.BlockManagerMaster: Registered BlockManager 
org.apache.spark.SparkException: Only one SparkContext may be running in this JVM (see SPARK-2243). To ignore this error, set spark.driver.allowMultipleContexts = true. The currently running SparkContext was created at: 
org.apache.spark.SparkContext.<init>(SparkContext.scala:82) 
org.apache.spark.repl.SparkILoop.createSparkContext(SparkILoop.scala:1017) 
?

+0

Haben Sie tatsächlich eine offene Buchse an Port 8585? –

Antwort

1

Ansatz 1

Der Fehler, den Sie sehen werden, ist wie, weil Sie socketTextStream() verwendet haben erwartet. So Funke erzeugt eine Instanz von SocketInputDStream, die verwendet java.net.socket

Und java.net.socket ist ein Client-Socket, was bedeutet, dass es Server auf Ihrer angegebenen Adresse und Portnummer ausgeführt werden erwartet.

Sie müssen also einige Dienste auf Port 8585 Ihres lokalen Computers ausführen.

Um zu sehen, was ich meine, versuchen Sie Folgendes (Sie müssen möglicherweise nicht Master oder AppName in Ihrer Umgebung festlegen).

import org.apache.spark.streaming.Seconds 
import org.apache.spark.streaming.StreamingContext 
import org.apache.spark.SparkConf 

object MyStream 
{ 
    def main(args:Array[String]) 
    { 
    val sc = new StreamingContext(new SparkConf().setMaster("local").setAppName("socketstream"),Seconds(10)) 
    val mystreamRDD = sc.socketTextStream("bbc.co.uk",80) 
    mystreamRDD.print() 
    sc.start() 
    sc.awaitTermination() 
    } 
} 

zurückkehren Das macht keinen Inhalt, weil die App HTTP auf der Website bbc spricht nicht, aber es kann keine Verbindung verweigert Ausnahme erhalten.

Um einen lokalen Server unter Linux zu laufen, Sie netcat mit einem einfachen Befehl wie

cat data.txt | ncat -l -p 8585 

Wenn der obige Code gibt den gleichen Fehler Ansatz 2.

Ansatz 2 verwenden können dann folgen

jedoch eine Reihe von Dingen könnte den Fehler verursachen:

  • Sie versuchen, eine Verbindung mit der falschen IP/Port herzustellen.
  • Sie haben Ihren Server nicht gestartet.
  • Ihr Server wartet nicht auf Verbindungen.
  • Ihr Server hat zu viele ausstehende Verbindungen, die darauf warten, akzeptiert zu werden.
  • Eine Firewall blockiert Ihre Verbindung, bevor sie Ihren Server erreicht.

Hoffe, das kann Ihnen helfen.

+1

Genau wie hinzufügen, dass der zweite Fehler BindException für SparkUI ist. Es sieht so aus, als wäre der Name der Eigenschaft falsch geschrieben. Es ist spark.drive.allowMultipleContexts; es sollte spark.driver.allowMultipleContexts sein – sparker

+0

Danke! :) Funktioniert jetzt gut. – Swa

Verwandte Themen