2017-04-07 6 views
1

Ich Frage eine zwischengespeicherte Hive-Temp-Tabelle mit verschiedenen Abfragen, die verschiedenen Bedingungen mehr als 1500 Mal innerhalb einer for-Schleife. Ich muss sie alle mit unionAll innerhalb der Schleife zusammenführen. Aber ich bekomme Stackoverflow-Fehler aufgrund der Tatsache, dass Spark nicht mit der RDD-Linie Schritt halten kann.Merge große Anzahl von Spark-Datenframes in eine

Pseudocode:

df=[from a hive table] 
tableA=[from a hive table] 
tableA.registerTempTable("tableA") 
HiveContext.sql('CACHE TABLE tableA') 

for i in range(0,2000): 
    if (list[0]['column1']=='xyz'): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    else: 
     df1=query something from tableA 
     df=df.unionAll(df1) 

Dieser Stackoverflow-Fehler aufgrund RDD Linie hart zu werden wirft. So versuchte ich Checkpointing wie folgt:

for i in range(0,2000): 
    if (list[0]['column1']=='xyz'): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    elif(): 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    else: 
     df1=query something from tableA 
     df=df.unionAll(df1) 
    df.rdd.checkpoint 
    df = sqlContext.createDataFrame(df.rdd, df.schema) 

Ich habe den gleichen Fehler. Also habe ich SaveAsTable ausprobiert, was ich immer vermeiden wollte, weil die Jobübergabe zwischen den einzelnen HQL-Abfragen und dem Hive innerhalb einer Schleife verzögert war. Aber dieser Ansatz hat gut funktioniert.

Ich brauche Hilfe bei der Vermeidung von Speichern des Datenrahmens in Bienenstock innerhalb der Schleife. Ich möchte die dfs in einer Art und Weise zusammenführen, die In-Memory und effizient ist. Eine der anderen Optionen, die ich ausprobiert habe, ist, das Abfrageergebnis direkt in eine temporäre Tabelle einzufügen, für die ich einen Fehler erhalte: kann nicht in eine RDD-basierte Tabelle eingefügt werden.

+1

Im Allgemeinen führt diese Art von Schleifen und Vereinigungsoperationen immer zu Problemen in Spark. Welche Art von Abfragen führen Sie aus? Vielleicht gibt es eine schlauere Möglichkeit, Ihren Code umzuformen, was kein Schleifen erfordert. Was sind die Bedingungen? –

+0

Die Bedingungen sind nicht komplex - einige Regex-Übereinstimmungen und einige direkte Ganzzahl-Übereinstimmungen. Aber die Sache ist, dass ich den Endbenutzern ermöglicht habe, diese Bedingungen zu erstellen, die nur sql-basierte schreiben und sie in Spark importieren können, um die Daten zu verarbeiten. Naiv ausgedrückt funktioniert meine Anwendung wie eine SQL-Workbench - der Unterschied besteht darin, dass alle Abfragen ausgeführt werden und das Ergebnis in einer einzigen Tabelle gespeichert wird. – Mike

Antwort

0

vielleicht, Tempentabelle für Ergebnis wird funktionieren.

df1="query something from tableA".registerTempTable("result") 
sqlContext.sql("Insert into result query something from tableA") 
+0

wie ich in der Post erwähnt, es wirft einen Fehler: kann nicht in RDD-basierte Tabelle einfügen. – Mike

Verwandte Themen