2017-09-18 38 views
0

Dies ist meine Funktion:Warum scheitert foreach-Operator mit NullPointerException bei Verwendung von Dataset?

def TestForeach(dataFrame: DataFrame)={ 
    dataFrame.select("user_id").dropDuplicates().foreach(row =>{ 
    dataFrame.filter("user_id == "+row.getString(0)) 
    }) 
} 

ich diesen Fehler:

ERROR Executor: Exception in task 2.0 in stage 4.0 (TID 16) 
java.lang.NullPointerException 
at org.apache.spark.sql.Dataset.filter(Dataset.scala:1318) 

Wie Dataframes mit dem gleichen User_id zu bekommen?

+0

Können Sie bitte die Ausnahme zu der Frage hinzufügen? Das würde es vollständiger machen. Vielen Dank! –

Antwort

2

foreach führt die Aktion für Executors aus, während dataFrame nur für den Treiber verfügbar ist.

Sie sollten collect vor foreach. Mit der Änderung, foreach ist dann Scala nicht Spark foreach.

0

Sie können dataframes nicht innerhalb einer Umwandlung oder Aktion verwenden. Sie müssen zuerst Ihre Benutzer-IDs sammeln:

def testForeach(dataFrame: DataFrame): Seq[DataFrame] = { 
    val userIds: Array[String] = dataFrame.select("user_id").distinct.map(_.getString(0)).collect 
    userIds.map(uid => dataFrame.filter($"user_id" === uid)).toSeq 
} 
Verwandte Themen