2016-06-07 10 views
0

I Funken sind mit einem invertierten Index zu erstellen (oder mehr eine „Buchungsliste“, so die Reihenfolge der Buchungen ist wichtig), die wie dieserShop invertierter Index in Spark-

|  key | postings         | 
---------------------------------------------------------- 
|  "a" | 1, 3, 4, 7, 8, 9, 21, 25     | 
|  "b" | 7, 12, 21, 24, 28, 31, 37, 48, 51, 91  | 
|  "c" | 1, 2, 3, 10, 12, 17, 21, 38, 39, 40, 47  | 

Hinweis ein wenig aussieht Der Schlüssel ist zB eine Zeichenfolge, die Einträge sind eine sortierte Liste von z. B. ganzen Zahlen. Ich werde später die Buchungsliste verwenden und mehrmals (und hoffentlich sehr effizient) darüber iterieren.

Ich frage mich, was die beste Option ist, einen solchen Datenrahmen in Spark zu erstellen und schließlich auf Parquet zu speichern. Schlagen Sie z.B. Verwenden Sie verschachtelte Strukturen? Oder besser, ein Array für die Postings zu verwenden (obwohl was ich tun soll, wenn ich mehr als nur eine einzelne ID speichern möchte, sondern eine ID und einen Abstand, d.h. ein Tupel von Integer und Float)? Oder würden Sie vorschlagen, eine solche Buchungsliste überhaupt nicht zu verwenden und eine flache Struktur zu wählen (z. B. Schlüssel, Buchung, bei der derselbe Schlüssel mehrmals erscheint)?

Antwort

1

Ich würde eine Liste verwenden. Es wird ziemlich einfach sein, eine Liste von einfachen Werten wie IntegerType zu sammeln. Wie folgt aus:

val df = Seq(
    ("a",1,1.1),("a",3,2.3),("a",4,1.0),("b",7,4.3),("b",12,11.11),("b",21,0.01) 
).toDF("key","posting","distance") 

val aggregatedDf1 = df.groupBy("key").agg(collect_list(col("posting")) as "postings") 

Es wird schwieriger sein, zu tun collect_list auf einer komplizierten StructType, weil Hive Aggregationsfunktionen nur auf einfache Typen arbeiten.

Um eine StructType zu aggregieren, müssen Sie eine UDAF erstellen. Die UDAF API ist ein bisschen langweilig, so dass Sie ein wenig betrügen, und aggregieren die Spalten in zwei Listen, und dann eine einfache UDF-zip die beiden Listen verwenden, wie folgt aus:

val zipper = udf[Seq[Tuple2[Int,Double]],Seq[Int],Seq[Double]]((a,b) => a.zip(b)) 

val aggregatedDf2 = df.groupBy("key").agg(
    collect_list(col("posting")) as "postings", 
    collect_list(col("distance")) as "distances" 
).withColumn("postings", zipper($"postings", $"distances")).drop("distances") 
+0

Können Sie mir bitte erklären, was die Unterschied ist zwischen UDAF und UDF? – navige

+1

UDF = User Defined Function und es wird verwendet, um eine Funktion auf jede Zeile eines Datenrahmens anzuwenden. UDAF = Benutzerdefinierte Aggregatfunktion und wird verwendet, um benutzerdefinierte Berechnungen für gruppierte Daten durchzuführen (https://databricks.com/blog/2015/09/16/apache-spark-1-5-dataframe-api-highlights.html). – David

+1

Wie Sie sehen können, ist die API für eine 'UDF' ziemlich einfach - Sie brauchen nur eine einzige Codezeile, um sie zu erstellen. Auf der anderen Seite benötigt ein 'UDAF' viele spezielle Beschwörungsformeln, damit es funktioniert. Siehe hier: https://ragrawal.wordpress.com/2015/11/03/spark-custom-udaf-example/ –