2016-04-13 9 views
2

Ich muss über den Inhalt eines DF mit verschiedenen SELECT-Anweisungen innerhalb einer foreach-Schleife iterieren, Ausgabe in Textdateien schreiben. Jede SELECT-Anweisung in der foreach-Schleife gibt eine NullPointerException zurück. Ich kann nicht sehen, warum das so ist. Eine SELECT-Anweisung innerhalb einer For-Schleife gibt diesen Fehler nicht zurück.Spark scala: SELECT in einer foreach-Schleife gibt java.lang.NullPointerException zurück

Dies ist der Testfall.

// step 1 of 6: create the table and load two rows 
vc.sql(s"""CREATE TEMPORARY TABLE TEST1 (
c1  varchar(4) 
,username varchar(5) 
,numeric integer) USING com.databricks.spark.csv OPTIONS (path "/tmp/test.txt")""") 

// step 2 of 6: confirm that the data is queryable 
vc.sql("SELECT * FROM TEST1").show() 
+----+--------+-------+ 
| c1|username|numeric| 
+----+--------+-------+ 
|col1| USER1|  0| 
|col1| USER2|  1| 
+----+--------+-------+ 

// Step 3 of 6: create a dataframe for the table 
var df=vc.sql("""SELECT * FROM TEST1""") 


// step 4 of 6: create a second dataframe that we will use as a loop iterator 
var df_usernames=vc.sql(s"""SELECT DISTINCT username FROM TEST1 """) 

// step 5 of 6: first foreach loop works ok: 
df_usernames.foreach(t => 
    { 
     println("(The First foreach works ok: loop iterator t is " + t(0).toString()) 
    } 
) 
(The First foreach works ok: loop iterator t is USER1 
(The First foreach works ok: loop iterator t is USER2 

// step 6 of 6: second foreach with any embedded SQL returns an error 
df_usernames.foreach(t => 
    { 
     println("(The second foreach dies: loop iterator t is " +  t(0).toString()) 
     vc.sql("""SELECT c1 FROM TEST1""").show() 
    } 
)  
The second foreach dies: loop iterator t is USER1 
org.apache.spark.SparkException: Job aborted due to stage failure: Task 158  in stage 94.0 failed 1 times, most recent failure: Lost task 158.0 in stage 94.0 (TID 3525, localhost): java.lang.NullPointerException 
    at org.apache.spark.sql.SQLContext.parseSql(SQLContext.scala:195) 

Antwort

1

Es ist nicht möglich. Sie können SQL-Abfrage innerhalb foreach starten, ohne

>>> df_usernames.collect.foreach(
... lambda x: sqlContext.sql("""SELECT c1 FROM TEST1""").show()) 
ersten sammeln Aufruf
+1

Dies ist nicht die gleichen foreach Gedanken wie die die OP wurde mit und es ist keine gute Praxis ohne zu wissen, die Mächtigkeit und die Größe der Daten zu sammeln. Es wird nicht skaliert, wenn Sie sagen, 2M Benutzer pro Beispiel. – eliasah

+0

Gibt es eine Möglichkeit, dies zu erreichen, ohne Collect zu verwenden? Für jede "Zeile" in der RDD muss ich mit den vorhandenen Daten vergleichen (die ich von SparkSession.sql laden kann). – KangarooWest