2016-12-09 4 views
-2

Ich bin ein Programm implementieren, die den gesamten Datenrahmen als Parameter nimmt. Ich weiß, dass dies möglicherweise keine Unterstützung in Spark ist, aber ich würde gerne wissen, ob es einen guten Weg gibt, mein Problem zu lösen.Wie übergeben Sie einen Datenrahmen als Funktionsparameter in Spark

Ich habe einen Spark-Datenrahmen wie folgt aus:

Item_sale_table 
    item_id date Sale Amount 
    aaa  3-11  20 
    aaa  3-12  21 
    aaa  3-13  28 
    ...  ...  ... 
    bbb  3-11  17 
    bbb  3-12  12 
    ...  ...  ... 
    ccc  3-11  9 
    ...  ...  ... 

item_list

item_id description 
aaa   xxxx 
bbb   xxxyx 
ccc   zxsa 
... 

Was ich tun möchte, das ist jedes Element aus item_list Tisch zu bekommen, und sammeln historische Daten aus item_sale Tabelle für dieses Element und wenden Sie eine Funktion (hier ist eine einfache Zählfunktion) darauf an.

So ist der Punkt Prozess Funktion sieht aus wie

def ItemProcess (item_id: String, Dataset: DataFrame) = { 

     val item_count = Dataset.filter(Dataset("item_id").contains(item_id)).count() 

     println(item_id,item_count) 

    } 

Und die Hauptfunktion, die diese Funktion aufrufen

val item_count_collection = item_list.select("item_id").foreach(x => ItemProcess(x(0).toString, item_sale_table)) 

ist Dann bekam ich

ERROR Executor: Exception in task 4.0 in stage 11.0 (TID 504) 
java.lang.NullPointerException 
    at org.apache.spark.sql.DataFrame.resolve(DataFrame.scala:151) 
    at org.apache.spark.sql.DataFrame.col(DataFrame.scala:664) 
    at org.apache.spark.sql.DataFrame.apply(DataFrame.scala:652) 

So habe ich die ganze Datenrahmen in der Foreach-Funktion. Ich denke hier ist das Problem.Aber wie kann man es korrigieren?

======== Update-=======

Ich fand ich würde sogar NullPointerException erhalten, auch wenn ich das Prozess Funktion wie diese

val item_count_collection = item_list.select("item_id").foreach(x => Item_sale_table.filter(Item_sale_table("item_id").contains(x(0).toString)).count()) 

Antwort

0

Aggregate einbetten und (optional) JOIN:

val item_counts = item_sale_table.groupBy("item_id").count() 

optional JOIN:

item_list.join(item_counts, Seq("item_id")) 

oder mit contains (Art und Weise weniger effizient):

item_list.join(
    item_counts, 
    item_counts("item_id").contains(item_list("item_id"))), 
    "left" 
) 
+1

aber UDF, es sei denn mit, ich eine benutzerdefinierte Funktion statt Zählung nicht implementieren kann, wenn ich Ihre Methoden verwenden. Habe ich recht? – lserlohn

Verwandte Themen