2017-01-19 3 views
0

Ich versuche, einige seltsame Verbindungsprobleme auf meinem Funkengitter zu diagnostizieren: Ich sehe eine wahnsinnige Anzahl von unterbrochenen Verbindung.Pyspark alle Verbindungen fallen lassen

ich etwas läuft, die

spark_context.parallelize(tasks)) \ 
       .map(lambda kwargs: my_mapped_fn(**kwargs) \ 
       .reduceByKey(my_reduce_by_key) \ 
       .map(lambda (x,y): (x, my_final_map(x,y))) \ 
       .reduce(my_final_reduce) 

auf einem verteilten pyspark Cluster wie folgt aussieht bin ich ziemlich sicher, dass es während des my_final_map Teil ausfällt, daher meine Verdacht über den Transport Schließung, so viele, dass meine Jobs Scheitern.

Hier ist der Fehler i erhalten:

java.io.IOException: Fehler beim 10.12.9.117:38103 bei org.apache.spark.network.client.TransportClientFactory.createClient (TransportClientFactory.java verbinden: 228) bei org.apache.spark.network.client.TransportClientFactory.createClient (TransportClientFactory.java:179) bei org.apache.spark.network.netty.NettyBlockTransferService $$ anon $ 1.createAndStart (NettyBlockTransferService.scala: 97) bei org.apache.spark.network.shuffle.RetryingBlockFetcher.fetchAllOutstanding (RetryingBlockFetcher.java:140) bei org.apache.spark.network.shuffle.RetryingBlockFetcher.start (Erneut versuchen BlockFetcher.java:120) bei org.apache.spark.network.netty.NettyBlockTransferService.fetchBlocks (NettyBlockTransferService.scala: 106) bei org.apache.spark.network.BlockTransferService.fetchBlockSync (BlockTransferService.scala: 92) bei org.apache.spark.storage.BlockManager.getRemoteBytes (BlockManager.scala: 579) bei org.apache.spark.scheduler.TaskResultGetter $$ anon $ 3 $$ anonfun $ run $ 1.apply $ mcV $ sp (TaskResultGetter.scala: 82) bei org.apache.spark.scheduler.TaskResultGetter $$ anon $ 3 $$ anonfun $ run $ 1.apply (TaskResultGetter.scala: 63) bei org.apache.spark.scheduler.TaskResultGetter $$ anon $ 3 $$ anonfun $ run $ 1.apply (TaskResultGetter.scala: 63) bei org.apache.spark.util.Utils $ .logUncaughtExceptions (Utils.scala: 1951) bei org.apache.spark.scheduler.TaskResultG etter $$ anon $ 3.run (TaskResultGetter.scala: 62) bei java.util.concurrent.ThreadPoolExecutor.runWorker (ThreadPoolExecutor.java:1145) bei java.util.concurrent.ThreadPoolExecutor $ Worker.run (ThreadPoolExecutor.java: 615) bei java.lang.Thread.run (Thread.java:745) verursacht durch: io.netty.channel.AbstractChannel $ AnnotatedConnectException: Verbindung abgelehnt: 10.12.9.117:38103 at sun.nio.ch.SocketChannelImpl. checkConnect (native Methode) bei sun.nio.ch.SocketChannelImpl.finishConnect (SocketChannelImpl.java:744) bei io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect (NioSocketChannel.java:257) bei io.netty .channel.nio.AbstractNioChannel $ AbstractNioUnsafe.finishConnect (AbstractNioChannel.java:291) bei io.netty.ch annel.nio.NioEventLoop.processSelectedKey (NioEventLoop.java:640) bei io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized (NioEventLoop.java:575) bei io.netty.channel.nio.NioEventLoop.processSelectedKeys (NioEventLoop. java: 489) bei io.netty.channel.nio.NioEventLoop.run (NioEventLoop.java:451) bei io.netty.util.concurrent.SingleThreadEventExecutor $ 2.run (SingleThreadEventExecutor.java:140) bei io.netty .util.concurrent.DefaultThreadFactory $ DefaultRunnableDecorator.run (DefaultThreadFactory.java:144) ... 1 mehr

+0

Warum führen Sie "collect()" am Ende des Anrufs aus? – Yaron

+1

teilen Sie bitte den Fehler, den Sie erhalten, die Menge der Daten, die Sie versuchen zu verarbeiten, die Größe Ihres Clusters. Die Quelle des Problems ** könnte ** das 'collect()' sein, das eine große Menge an Daten an den Treiber sendet. – Yaron

+0

Hey @Yaron, danke für deine Hilfe! Ich habe es überprüft, ich habe den Collect auf einen Reduzierungsschritt geschaltet und die lokale Variable entfernt. Immer noch die Fehler gedacht :( –

Antwort

0

Falls jemand das nützlich findet, war die eigentliche Antwort völlig unabhängig von Funke. In der Tat wurde die IP-Adresssuche auf einigen der Knoten unterbrochen.

Verwandte Themen