2016-03-07 14 views
7

Wir verwenden Spark Streaming 1.6.0 auf AWS EMR 4.3.x, Daten aus einem Kinesis-Stream konsumieren. Wird verwendet, um in Spark korrekt zu funktionieren 1.3.1 Nach der Migration können wir der Last nicht lange standhalten. Ganglia zeigt, dass der verwendete Speicher des Clusters weiter wächst, bis ein Limit ohne GC erreicht wird. Danach gibt es mehrere wirklich lange Mikro-Chargen (in Dutzenden von Minuten anstelle von mehreren Sekunden). Und dann beginnt Spark zu töten und Bouncing Executoren (immer und immer wieder getan),Spark Streaming 1.6.0 - Executors Bouncing

Grundsätzlich wird der Cluster unbrauchbar. Das Problem ist reproduzierbar unter Last, von Zeit zu Zeit. Was könnte der Grund dafür sein, dass Spark zu GC gescheitert ist, ohne Executoren zu töten? Wie können wir den Cluster für Wochen laufen lassen (derzeit kann es nicht für Stunden laufen)

Jede Eingabe ist willkommen.

Wir verwenden die folgenden Definitionen, wenn der Definition eines Jobs:

sparkConf.set("spark.shuffle.consolidateFiles", "true"); 
sparkConf.set("spark.storage.memoryFraction", "0.5"); 
sparkConf.set("spark.streaming.backpressure.enabled", "true"); 
sparkConf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer"); 

Doing eine Vereinigung von

KinesisUtils.createStream(streamingContext, appName, 
         kinesisStreamName, kinesisEndpoint, awsRegionName, initialPositionInStream, checkpointInterval, 
         StorageLevel.MEMORY_AND_DISK_2()); 

ich unsere Anwendung zu einem bloßen Skelett für die Tests ausgezogen haben. Keep-Karte von Byte-Stream zu String Stream, dann in Objekte konvertieren, irrelevante Ereignisse ausfiltern, dann persistieren und nach S3 speichern.

eventStream = eventStream.persist (StorageLevel.MEMORY_AND_DISK_SER_2());

eventStream = eventStream.repartition (configuration.getSparkOutputPartitions()); eventStream.foreachRDD (neue RddByPartitionSaverFunction <> (neue OutputToS3Function()));

Funkenjob wird mit folgenden Konfiguration (kopiert mit Speichergröße Modifikation aus dem Standard-Spark-config) vorgelegt:

spark.executor.extraJavaOptions -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=256M -XX:MaxPermSize=256M 
spark.driver.extraJavaOptions -XX:+UseConcMarkSweepGC -XX:CMSInitiatingOccupancyFraction=70 -XX:MaxHeapFreeRatio=70 -XX:+CMSClassUnloadingEnabled -XX:OnOutOfMemoryError='kill -9 %p' -XX:PermSize=512M -XX:MaxPermSize=512M 

Hinzufügen von Ausnahmen. 1-st-Cluster

16/03/06 13:54:52 WARN BlockManagerMaster: Failed to remove broadcast 1327 with removeFromMaster = true - Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
org.apache.spark.rpc.RpcTimeoutException: Cannot receive any reply in 120 seconds. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at scala.util.Failure$$anonfun$recover$1.apply(Try.scala:185) 
     at scala.util.Try$.apply(Try.scala:161) 
     at scala.util.Failure.recover(Try.scala:185) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.Future$$anonfun$recover$1.apply(Future.scala:324) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at org.spark-project.guava.util.concurrent.MoreExecutors$SameThreadExecutorService.execute(MoreExecutors.java:293) 
     at scala.concurrent.impl.ExecutionContextImpl$$anon$1.execute(ExecutionContextImpl.scala:133) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.complete(Promise.scala:55) 
     at scala.concurrent.impl.Promise$DefaultPromise.complete(Promise.scala:153) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.Future$$anonfun$map$1.apply(Future.scala:235) 
     at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:32) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.processBatch$1(Future.scala:643) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply$mcV$sp(Future.scala:658) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch$$anonfun$run$1.apply(Future.scala:635) 
     at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:72) 
     at scala.concurrent.Future$InternalCallbackExecutor$Batch.run(Future.scala:634) 
     at scala.concurrent.Future$InternalCallbackExecutor$.scala$concurrent$Future$InternalCallbackExecutor$$unbatchedExecute(Future.scala:694) 
     at scala.concurrent.Future$InternalCallbackExecutor$.execute(Future.scala:685) 
     at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:40) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:248) 
     at scala.concurrent.Promise$class.tryFailure(Promise.scala:112) 
     at scala.concurrent.impl.Promise$DefaultPromise.tryFailure(Promise.scala:153) 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:241) 
     at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) 
     at java.util.concurrent.FutureTask.run(FutureTask.java:262) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:178) 
     at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:292) 
     at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     at java.lang.Thread.run(Thread.java:745) 
Caused by: java.util.concurrent.TimeoutException: Cannot receive any reply in 120 seconds 
     at org.apache.spark.rpc.netty.NettyRpcEnv$$anon$1.run(NettyRpcEnv.scala:242) 
     ... 7 more 
16/03/06 13:54:52 ERROR ContextCleaner: Error cleaning broadcast 1327 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

16/03/06 13:55:04 ERROR YarnClusterScheduler: Lost executor 6 on ip-***-194.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 
1 
16/03/06 13:55:10 ERROR YarnClusterScheduler: Lost executor 1 on ip-***-193.ec2.internal: Container killed by YARN for exceeding memory limits. 11.3 GB of 11.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead. 

zweiter Cluster Versuch:

16/03/07 14:24:38 ERROR server.TransportChannelHandler: Connection to ip-***-22.ec2.internal/N.N.N.22:40791 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network.timeout if this is wrong. 
16/03/07 14:24:38 ERROR client.TransportResponseHandler: Still have 12 requests outstanding when connection from ip-***-22.ec2.internal/N.N.N.22:40791 is closed 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-47-1457357970366 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
16/03/07 14:24:38 ERROR netty.NettyBlockTransferService: Error while uploading block input-15-1457357969730 
java.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     a.io.IOException: Connection from ip-***-22.ec2.internal/N.N.N.22:40791 closed 
     at org.apache.spark.network.client.TransportResponseHandler.channelUnregistered(TransportResponseHandler.java:124) 
     at org.apache.spark.network.server.TransportChannelHandler.channelUnregistered(TransportChannelHandler.java:94) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 
t io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.ChannelInboundHandlerAdapter.channelUnregistered(ChannelInboundHandlerAdapter.java:53) 
     at io.netty.channel.AbstractChannelHandlerContext.invokeChannelUnregistered(AbstractChannelHandlerContext.java:158) 
     at io.netty.channel.AbstractChannelHandlerContext.fireChannelUnregistered(AbstractChannelHandlerContext.java:144) 
     at io.netty.channel.DefaultChannelPipeline.fireChannelUnregistered(DefaultChannelPipeline.java:739) 
     at io.netty.channel.AbstractChannel$AbstractUnsafe$8.run(AbstractChannel.java:659) 
     at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357) 
     at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357) 
     at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111) 
     at java.lang.Thread.run(Thread.java:745) 

Vielen Dank im Voraus ...

+0

Haben Sie eine Lösung oder das Problem gefunden? –

+0

Hallo. Nein, wir haben noch keine Lösung gefunden. Das Problem ist nur mit einer großen Anzahl von Kinesis-Shards (120) reproduzierbar. Nur dann beginnen die Executoren zu springen. Ideen? Dank – visitor

+0

Executor Bouncing gestoppt in Spark 2.0.0 (EMR 5.0.0). Es ist ein neues Problem aufgetreten, das eine lange Ausführung derselben Anwendung verhindert: http: // stackoverflow.com/questions/39289345/spark-streaming-2-0-0-freezes-nach-mehreren-Tagen-unter-Last – visitor

Antwort

-2

ich etwas in der gleichen Richtung bekommen .. Wenn ein Mikro Batch mehr nahm als 120 Sekunden zu vervollständigen, es gefeuert:

16/03/14 22:57:30 INFO SparkStreaming$: Batch size: 2500, total read: 4287800 
16/03/14 22:57:35 INFO SparkStreaming$: Batch size: 2500, total read: 4290300 
16/03/14 22:57:42 INFO SparkStreaming$: Batch size: 2500, total read: 4292800 
16/03/14 22:57:45 INFO SparkStreaming$: Batch size: 2500, total read: 4295300 
16/03/14 22:59:45 ERROR ContextCleaner: Error cleaning broadcast 11251 
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [120 seconds]. This timeout is controlled by spark.rpc.askTimeout 
     at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63) 
     at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59) 
     at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76) 
     at org.apache.spark.storage.BlockManagerMaster.removeBroadcast(BlockManagerMaster.scala:136) 
     at org.apache.spark.broadcast.TorrentBroadcast$.unpersist(TorrentBroadcast.scala:228) 
     at org.apache.spark.broadcast.TorrentBroadcastFactory.unbroadcast(TorrentBroadcastFactory.scala:45) 
     at org.apache.spark.broadcast.BroadcastManager.unbroadcast(BroadcastManager.scala:67) 
     at org.apache.spark.ContextCleaner.doCleanupBroadcast(ContextCleaner.scala:233) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:189) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1$$anonfun$apply$mcV$sp$2.apply(ContextCleaner.scala:180) 
     at scala.Option.foreach(Option.scala:236) 
     at org.apache.spark.ContextCleaner$$anonfun$org$apache$spark$ContextCleaner$$keepCleaning$1.apply$mcV$sp(ContextCleaner.scala:180) 
     at org.apache.spark.util.Utils$.tryOrStopSparkContext(Utils.scala:1180) 
     at org.apache.spark.ContextCleaner.org$apache$spark$ContextCleaner$$keepCleaning(ContextCleaner.scala:173) 
     at org.apache.spark.ContextCleaner$$anon$3.run(ContextCleaner.scala:68) 
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [120 seconds] 
     at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
     at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
     at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
     at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53) 
     at scala.concurrent.Await$.result(package.scala:107) 
     at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75) 
     ... 12 more 

Ich bin im lokalen Modus laufen und verbrauchen f aus Kinesis. Ich verwende auch keine Broadcast-Variablen.