2016-12-13 1 views
13

Ich benutze Spark 1.5.Scala Spark: DataFrames + beitreten: java.util.concurrent.TimeoutException: Futures abgelaufen nach [300 Sekunden]

Ich habe zwei Datenrahmen von der Form:

scala> libriFirstTable50Plus3DF 
res1: org.apache.spark.sql.DataFrame = [basket_id: string, family_id: int] 

scala> linkPersonItemLessThan500DF 
res2: org.apache.spark.sql.DataFrame = [person_id: int, family_id: int] 

libriFirstTable50Plus3DF hat 766.151 Datensätze während linkPersonItemLessThan500DF hat 26.694.353 Datensätze. Beachten Sie, dass ich repartition(number) auf linkPersonItemLessThan500DF verwende, da ich beabsichtige, diese beiden später zu verbinden. Ich folge den obigen Code oben mit:

val userTripletRankDF = linkPersonItemLessThan500DF 
    .join(libriFirstTable50Plus3DF, Seq("family_id")) 
    .take(20) 
    .foreach(println(_)) 

, für die ich erhalte diese Ausgabe:

16/12/13 15:07:10 INFO scheduler.TaskSetManager: Finished task 172.0 in stage 3.0 (TID 473) in 520 ms on mlhdd01.mondadori.it (199/200) 
java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107) 
at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:  at scala.concurrent.Await$.result(package.scala:107) 
at org.apache.spark.sql.execution.joins.BroadcastHashJoin.doExecute(BroadcastHashJoin.scala:110) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
at org.apache.spark.sql.execution.TungstenProject.doExecute(basicOperators.scala:86) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
at org.apache.spark.sql.execution.ConvertToSafe.doExecute(rowFormatConverters.scala:63) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:140) 
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:138) 
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:147) 
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:138) 
at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:190) 
at org.apache.spark.sql.execution.Limit.executeCollect(basicOperators.scala:207) 
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) 
at org.apache.spark.sql.DataFrame$$anonfun$collect$1.apply(DataFrame.scala:1386) 
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:56) 
at org.apache.spark.sql.DataFrame.withNewExecutionId(DataFrame.scala:1904) 
at org.apache.spark.sql.DataFrame.collect(DataFrame.scala:1385) 
at org.apache.spark.sql.DataFrame.head(DataFrame.scala:1315) 
at org.apache.spark.sql.DataFrame.take(DataFrame.scala:1378) 
at org.apache.spark.sql.DataFrame.showString(DataFrame.scala:178) 
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:402) 
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:363) 
at org.apache.spark.sql.DataFrame.show(DataFrame.scala:371) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:72) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:77) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:79) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:81) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:83) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:85) 
at $iwC$$iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:87) 
at $iwC$$iwC$$iwC$$iwC$$iwC.<init>(<console>:89) 
at $iwC$$iwC$$iwC$$iwC.<init>(<console>:91) 
at $iwC$$iwC$$iwC.<init>(<console>:93) 
at $iwC$$iwC.<init>(<console>:95) 
at $iwC.<init>(<console>:97) 
at <init>(<console>:99) 
at .<init>(<console>:103) 
at .<clinit>(<console>) 
at .<init>(<console>:7) 
at .<clinit>(<console>) 
at $print(<console>) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.repl.SparkIMain$ReadEvalPrint.call(SparkIMain.scala:1065) 
at org.apache.spark.repl.SparkIMain$Request.loadAndRun(SparkIMain.scala:1340) 
at org.apache.spark.repl.SparkIMain.loadAndRunReq$1(SparkIMain.scala:840) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:871) 
at org.apache.spark.repl.SparkIMain.interpret(SparkIMain.scala:819) 
at org.apache.spark.repl.SparkILoop.reallyInterpret$1(SparkILoop.scala:857) 
at org.apache.spark.repl.SparkILoop.interpretStartingWith(SparkILoop.scala:902) 
at org.apache.spark.repl.SparkILoop.command(SparkILoop.scala:814) 
at org.apache.spark.repl.SparkILoop.processLine$1(SparkILoop.scala:657) 
at org.apache.spark.repl.SparkILoop.innerLoop$1(SparkILoop.scala:665) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$loop(SparkILoop.scala:670) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply$mcZ$sp(SparkILoop.scala:997) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
at org.apache.spark.repl.SparkILoop$$anonfun$org$apache$spark$repl$SparkILoop$$process$1.apply(SparkILoop.scala:945) 
at scala.tools.nsc.util.ScalaClassLoader$.savingContextLoader(ScalaClassLoader.scala:135) 
at org.apache.spark.repl.SparkILoop.org$apache$spark$repl$SparkILoop$$process(SparkILoop.scala:945) 
at org.apache.spark.repl.SparkILoop.process(SparkILoop.scala:1059) 
at org.apache.spark.repl.Main$.main(Main.scala:31) 
at org.apache.spark.repl.Main.main(Main.scala) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:606) 
at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:672) 
at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180) 
at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:120) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 

und ich verstehe nicht, was das Problem ist. Ist es so einfach wie die Wartezeit zu erhöhen? Ist der Join zu intensiv? Brauche ich mehr Speicher? Ist das Shuffling intensiv? Kann jemand helfen?

+0

lassen Sie mich an eine meiner Antwort erwähnen sehr ähnliche Frage: https://Stackoverflow.com/a/48449467/418293 – mathieu

Antwort

16

Dies passiert, weil Spark Broadcast Hash Join zu tun versucht und einer der DataFrames ist sehr groß, so dass das Senden viel Zeit kostet.

Sie können:

  1. Set höher spark.sql.broadcastTimeout Timeout erhöhen - spark.conf.set("spark.sql.broadcastTimeout", newValueForExample36000)
  2. persist() beide Datenrahmen, dann Funken Shuffle Join - Referenz von here
+0

Nice Dank - lesen Sie ein paar Blogs auf Fehlerbehebung Spark für hohe Arbeitsauslastung und ich denke, ich werde dazu in der Lage sein rt es bis morgen raus. Eine Sache jedoch: Können Sie Ihre Antwort verbessern, indem Sie erklären, wie ich eine eigenständige Funke-Anwendung konfigurieren kann, um 'spark.sql.broadcastTimeout' zu ändern? Mit anderen Worten, was füge ich zu diesem: 'val conf = new SparkConf() setAppName (APP_NAME)' ' val sc = new SparkContext (conf)' .? –

+0

@ ΧρίστοΧΧατζηνικολή Ich habe hinzugefügt, wie man Wert setzt :) –

Verwandte Themen