2017-10-31 2 views
-1

Ich versuche, Datagramme in Spark mit Pyspark zu verbinden. Diese beiden Datenrahmen sind ziemlich groß (einer von ihnen ist mehr als 5 GB), und ich erhalte die Fehler unter:Warum diese Py4JJavaError showString-Fehler beim Verbinden von Spark-Datenrahmen mit pyspark?

--------------------------------------------------------------------------- 
Py4JJavaError        Traceback (most recent call last) 
<ipython-input-37-d940918c3fe6> in <module>() 
     1 train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('type', 'locale', 'locale_name', 'description', 'transferred') 
----> 2 train_holiday_oil_store_transaction_item_test_004.show() 

/usr/local/opt/apache-spark/libexec/python/pyspark/sql/dataframe.py in show(self, n, truncate) 
    334   """ 
    335   if isinstance(truncate, bool) and truncate: 
--> 336    print(self._jdf.showString(n, 20)) 
    337   else: 
    338    print(self._jdf.showString(n, int(truncate))) 

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args) 
    1131   answer = self.gateway_client.send_command(command) 
    1132   return_value = get_return_value(
-> 1133    answer, self.gateway_client, self.target_id, self.name) 
    1134 
    1135   for temp_arg in temp_args: 

/usr/local/opt/apache-spark/libexec/python/pyspark/sql/utils.py in deco(*a, **kw) 
    61  def deco(*a, **kw): 
    62   try: 
---> 63    return f(*a, **kw) 
    64   except py4j.protocol.Py4JJavaError as e: 
    65    s = e.java_exception.toString() 

/usr/local/opt/apache-spark/libexec/python/lib/py4j-0.10.4-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name) 
    317     raise Py4JJavaError(
    318      "An error occurred while calling {0}{1}{2}.\n". 
--> 319      format(target_id, ".", name), value) 
    320    else: 
    321     raise Py4JError(

Py4JJavaError: An error occurred while calling o873.showString. 
: java.util.concurrent.TimeoutException: Futures timed out after [300 seconds] 
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219) 
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223) 
    at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:201) 
    at org.apache.spark.sql.execution.exchange.BroadcastExchangeExec.doExecuteBroadcast(BroadcastExchangeExec.scala:123) 
    at org.apache.spark.sql.execution.InputAdapter.doExecuteBroadcast(WholeStageCodegenExec.scala:248) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeBroadcast$1.apply(SparkPlan.scala:127) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.SparkPlan.executeBroadcast(SparkPlan.scala:126) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.prepareBroadcast(BroadcastHashJoinExec.scala:98) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.codegenOuter(BroadcastHashJoinExec.scala:242) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doConsume(BroadcastHashJoinExec.scala:83) 
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155) 
    at org.apache.spark.sql.execution.ProjectExec.consume(basicPhysicalOperators.scala:36) 
    at org.apache.spark.sql.execution.ProjectExec.doConsume(basicPhysicalOperators.scala:68) 
    at org.apache.spark.sql.execution.CodegenSupport$class.consume(WholeStageCodegenExec.scala:155) 
    at org.apache.spark.sql.execution.InputAdapter.consume(WholeStageCodegenExec.scala:235) 
    at org.apache.spark.sql.execution.InputAdapter.doProduce(WholeStageCodegenExec.scala:263) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.InputAdapter.produce(WholeStageCodegenExec.scala:235) 
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.doProduce(BroadcastHashJoinExec.scala:77) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.joins.BroadcastHashJoinExec.produce(BroadcastHashJoinExec.scala:38) 
    at org.apache.spark.sql.execution.ProjectExec.doProduce(basicPhysicalOperators.scala:46) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:85) 
    at org.apache.spark.sql.execution.CodegenSupport$$anonfun$produce$1.apply(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.CodegenSupport$class.produce(WholeStageCodegenExec.scala:80) 
    at org.apache.spark.sql.execution.ProjectExec.produce(basicPhysicalOperators.scala:36) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doCodeGen(WholeStageCodegenExec.scala:331) 
    at org.apache.spark.sql.execution.WholeStageCodegenExec.doExecute(WholeStageCodegenExec.scala:372) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:117) 
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:138) 
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) 
    at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:135) 
    at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:116) 
    at org.apache.spark.sql.execution.SparkPlan.getByteArrayRdd(SparkPlan.scala:228) 
    at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:311) 
    at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:38) 
    at org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collectFromPlan(Dataset.scala:2853) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) 
    at org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:2153) 
    at org.apache.spark.sql.Dataset$$anonfun$55.apply(Dataset.scala:2837) 
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:65) 
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:2836) 
    at org.apache.spark.sql.Dataset.head(Dataset.scala:2153) 
    at org.apache.spark.sql.Dataset.take(Dataset.scala:2366) 
    at org.apache.spark.sql.Dataset.showString(Dataset.scala:245) 
    at sun.reflect.GeneratedMethodAccessor63.invoke(Unknown Source) 
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
    at java.lang.reflect.Method.invoke(Method.java:498) 
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) 
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) 
    at py4j.Gateway.invoke(Gateway.java:280) 
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) 
    at py4j.commands.CallCommand.execute(CallCommand.java:79) 
    at py4j.GatewayConnection.run(GatewayConnection.java:214) 
    at java.lang.Thread.run(Thread.java:748) 

Und hier meine Codes sind:

train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.join(stores_df, 'store_nbr', 'left_outer') 
train_holiday_oil_store_transaction_item_test_004 = train_holiday_oil_store_transaction_item_test_004.drop('city', 'state', 'store_type', 'cluster') 
train_holiday_oil_store_transaction_item_test_004.show() 

Was ist los? Was ist eine Lösung?

Ich erhöhte die Partition auf 500, so wäre es nicht das Problem.

Ich frage mich auch, was sind die typischen Möglichkeiten, große Datenrahmen bei der Verwendung von pyspark beizutreten? Hat jemand diese Art von Erfahrung?

+1

Bitte lesen & handeln [MCVE]. Auch [fragen] - was hat Ihnen googeln gegeben? – philipxy

+0

Ich habe Wochen mit diesem Problem verbracht, natürlich googelte ich bevor ich frage. Die meisten Antworten erwähnt steigende Timeout oder so, aber ich bin ziemlich neu zu funken und habe keine Ahnung, wie man das macht, da das offizielle Dokument nicht klar ist. Auch wenn ich selbst Funken lerne, würde ich gerne wissen, wie diese Jungs in ihren realen Jobs mit Big Data umgehen, also was ist los mit mir? –

+0

Wenn Sie [ask] lesen, müssen Sie Ihre Recherche zusammenfassen, bitte tun Sie dies. (Also "natürlich" ist irrelevant. Auch die Tatsache ist, dass die meisten unklaren/nicht-MCVE/armen Fragen, wenn sie gegoogelt haben, so schlecht gemacht haben.) Bitte editieren Sie Klarstellungen in Ihre Frage, nicht zu kommentieren. Immer noch [mcve], bitte mach was es sagt. Insbesondere - * Complete * -Code, Eingabe, Ausgabe und erwartete Ausgabe. Und was passiert mit einem kleineren Testeingang? Bitte stellen Sie auch Ihre letzten Fragen in einem anderen Fragebeitrag. – philipxy

Antwort

1

Wenn Sie in Ihrer Fehlermeldung suchen, werden Sie sehen, dass Funke BroadcastHashJoin ruft. Da der Datenrahmen groß ist, führt das Senden zu einem Timeout. Nur wenige Lösungen für dieses Problem

  1. Zunahme spark.sql.broadcastTimeout
  2. Kraftfunken ShuffleHashJoin zu verwenden, indem spark.sql.autoBroadcastJoinThreshold = -1 Einstellung
  3. die gleichen Partitionierungs Verwenden Sie auf beiden Datenrahmen. Zum Beispiel, wenn Sie zwei Datenframes haben und diese basierend auf der ID-Spalte verknüpfen möchten. Sie sollten sie von id-Spalte partitionieren

    df1 = df1.repartiton("id")
    df2 = df2.repartition("id")

Verwandte Themen