Ich bin ein Programm implementieren, die den gesamten Datenrahmen als Parameter nimmt. Ich weiß, dass dies möglicherweise keine Unterstützung in Spark ist, aber ich würde gerne wissen, ob es einen guten Weg gibt, mein Problem zu lösen.Wie übergeben Sie einen Datenrahmen als Funktionsparameter in Spark
Ich habe einen Spark-Datenrahmen wie folgt aus:
Item_sale_table
item_id date Sale Amount
aaa 3-11 20
aaa 3-12 21
aaa 3-13 28
... ... ...
bbb 3-11 17
bbb 3-12 12
... ... ...
ccc 3-11 9
... ... ...
item_list
item_id description
aaa xxxx
bbb xxxyx
ccc zxsa
...
Was ich tun möchte, das ist jedes Element aus item_list
Tisch zu bekommen, und sammeln historische Daten aus item_sale
Tabelle für dieses Element und wenden Sie eine Funktion (hier ist eine einfache Zählfunktion) darauf an.
So ist der Punkt Prozess Funktion sieht aus wie
def ItemProcess (item_id: String, Dataset: DataFrame) = {
val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count()
println(item_id,item_count)
}
Und die Hauptfunktion, die diese Funktion aufrufen
val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table))
ist Dann bekam ich
ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504)
java.lang.NullPointerException
at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151)
at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664)
at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652)
So habe ich die ganze Datenrahmen in der Foreach-Funktion. Ich denke hier ist das Problem.Aber wie kann man es korrigieren?
======== Update-=======
Ich fand ich würde sogar NullPointerException
erhalten, auch wenn ich das Prozess Funktion wie diese
val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count())
aber UDF, es sei denn mit, ich eine benutzerdefinierte Funktion statt Zählung nicht implementieren kann, wenn ich Ihre Methoden verwenden. Habe ich recht? – lserlohn