2016-08-02 6 views
5

Ich habe es mit einer Zahlenspalte in einem großen Spark DataFrame zu tun, und ich möchte eine neue Spalte erstellen, die eine aggregierte Liste von eindeutigen Zahlen speichert, die in dieser Spalte erscheinen.Gibt es eine Möglichkeit, einen Limit-Parameter an functions.collect_set in Spark zu übergeben?

Grundsätzlich genau was functions.collect_set tut. Allerdings brauche ich nur bis zu 1000 Elemente in der aggregierten Liste. Gibt es eine Möglichkeit, diesen Parameter irgendwie an functions.collect_set() oder eine andere Art und Weise zu übergeben, um nur bis zu 1000 Elemente in der aggregierten Liste zu erhalten, ohne einen UDAF zu verwenden?

Da die Spalte so groß ist, möchte ich vermeiden, alle Elemente zu sammeln und die Liste danach zu trimmen.

Danke!

Antwort

1

Gebrauch nehmen

val firstThousand = rdd.take(1000) 

kehrt die ersten 1000. sammeln auch eine Filterfunktion hat, die zur Verfügung gestellt werden kann. Auf diese Weise können Sie genauer festlegen, was zurückgegeben wird.

+0

Danke für die Antwort. Jedoch 1) Ich möchte nur eine Liste von _distinct_ Werten. Ich sehe, es gibt eine rdd.distinct(), aber das scheint nicht zu einem Grenzwert-Parameter 2) Nicht sicher, wie eine Filterfunktion in Collect verwenden. Wie würde ich einen Filter verwenden, um nur eine bestimmte Anzahl von Werten zu erhalten? – user1500142

+0

Auch im Idealfall würde ich vermeiden, rdds zu verwenden. Ich bin momentan etwas wie df.groupBy(). Agg ( user1500142

1
scala> df.show 
    +---+-----+----+--------+ 
    | C0| C1| C2|  C3| 
    +---+-----+----+--------+ 
    | 10| Name|2016| Country| 
    | 11|Name1|2016|country1| 
    | 10| Name|2016| Country| 
    | 10| Name|2016| Country| 
    | 12|Name2|2017|Country2| 
    +---+-----+----+--------+ 

scala> df.groupBy("C1").agg(sum("C0")) 
res36: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res36.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
| Name|  30| 
+-----+-------+ 

scala> df.limit(2).groupBy("C1").agg(sum("C0")) 
    res33: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

    scala> res33.show 
    +-----+-------+ 
    | C1|sum(C0)| 
    +-----+-------+ 
    | Name|  10| 
    |Name1|  11| 
    +-----+-------+ 



    scala> df.groupBy("C1").agg(sum("C0")).limit(2) 
res2: org.apache.spark.sql.DataFrame = [C1: string, sum(C0): bigint] 

scala> res2.show 
+-----+-------+ 
| C1|sum(C0)| 
+-----+-------+ 
|Name1|  11| 
|Name2|  12| 
+-----+-------+ 

scala> df.distinct 
res8: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res8.show 
+---+-----+----+--------+ 
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 10| Name|2016| Country| 
| 12|Name2|2017|Country2| 
+---+-----+----+--------+ 

scala> df.dropDuplicates(Array("c1")) 
res11: org.apache.spark.sql.DataFrame = [C0: int, C1: string, C2: int, C3: string] 

scala> res11.show 
+---+-----+----+--------+              
| C0| C1| C2|  C3| 
+---+-----+----+--------+ 
| 11|Name1|2016|country1| 
| 12|Name2|2017|Country2| 
| 10| Name|2016| Country| 
+---+-----+----+--------+ 
+0

Danke für die Antwort, aber das tut nicht ganz das, was ich will. Wenn ich bis zu 1000 verschiedene Werte aus einer Spalte haben möchte, wird "df.limit (1000)" eine harte obere Grenze für die Anzahl der zurückgegebenen Werte setzen, aber ich kann andere Werte verlieren, die ich sonst hinzufügen müsste. – user1500142

+0

Sie haben zwei Methoden distinct und dropDuplicates, die Sie vor den Methoden limit, groupby und agg ausführen können. Distinct betrachtet alle Spalten und droDuplicates ermöglicht es Ihnen, zu steuern, welche Spalten verglichen werden sollen, um Duplikate zu identifizieren. @ user1500142 – mark

2

Ich benutze eine modifizierte Kopie der Funktionen collect_set und collect_list; Aufgrund von Codebereichen müssen sich die geänderten Kopien im selben Paketpfad wie die Originale befinden. Der verlinkte Code funktioniert für Spark 2.1.0; Wenn Sie eine ältere Version verwenden, können sich die Methodensignaturen unterscheiden.

Wurf diese Datei (https://gist.github.com/lokkju/06323e88746c85b2ce4de3ea9cdef9bc) in Ihr Projekt als src/main/org/apache/Funken/SQL/Katalysator/Ausdruck/collect_limit.scala

es als verwenden:

import org.apache.spark.sql.catalyst.expression.collect_limit._ 
df.groupBy('set_col).agg(collect_set_limit('set_col,1000) 
3

Meine Lösung ist sehr ähnlich zu Loki's answer with collect_set_limit.


ich eine UDF verwenden würde, das tun würde, was Sie wollen nach collect_set (oder collect_list) oder ein sehr viel schwieriger UDAF.

Angesichts mehr Erfahrung mit UDFs, würde ich damit zuerst gehen. Obwohl UDFs nicht optimiert sind, ist es für diesen Anwendungsfall in Ordnung.

val limitUDF = udf { (nums: Seq[Long], limit: Int) => nums.take(limit) } 
val sample = spark.range(50).withColumn("key", $"id" % 5) 

scala> sample.groupBy("key").agg(collect_set("id") as "all").show(false) 
+---+--------------------------------------+ 
|key|all         | 
+---+--------------------------------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]| 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]| 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]| 
+---+--------------------------------------+ 

scala> sample. 
    groupBy("key"). 
    agg(collect_set("id") as "all"). 
    withColumn("limit(3)", limitUDF($"all", lit(3))). 
    show(false) 
+---+--------------------------------------+------------+ 
|key|all         |limit(3) | 
+---+--------------------------------------+------------+ 
|0 |[0, 15, 30, 45, 5, 20, 35, 10, 25, 40]|[0, 15, 30] | 
|1 |[1, 16, 31, 46, 6, 21, 36, 11, 26, 41]|[1, 16, 31] | 
|3 |[33, 48, 13, 38, 3, 18, 28, 43, 8, 23]|[33, 48, 13]| 
|2 |[12, 27, 37, 2, 17, 32, 42, 7, 22, 47]|[12, 27, 37]| 
|4 |[9, 19, 34, 49, 24, 39, 4, 14, 29, 44]|[9, 19, 34] | 
+---+--------------------------------------+------------+ 

Siehe functions Objekt (udf Funktion docs).

Verwandte Themen