Multiple Aggregationen würde sehr teuer sein, zu berechnen, so würde ich raten Sie Annäherung Distinct Count zu verwenden:
val df = Seq((1,3,4),(1,2,3),(2,3,4),(2,3,5)).toDF("col1","col2","col3")
val exprs = df.columns.map((_ -> "approx_count_distinct")).toMap
df.agg(exprs).show()
// +---------------------------+---------------------------+---------------------------+
// |approx_count_distinct(col1)|approx_count_distinct(col2)|approx_count_distinct(col3)|
// +---------------------------+---------------------------+---------------------------+
// | 2| 2| 3|
// +---------------------------+---------------------------+---------------------------+
Die approx_count_distinct
Methode beruht auf HyperLogLog unter der Haube.
Der HyperLogLog Algorithmus und seine Variante HyperLogLog ++ (in Funken implementiert) stützt sich auf die folgenden cleveren Beobachtung.
Wenn die Zahlen gleichmäßig über einen Bereich verteilt sind, kann die Anzahl einzelner Elemente aus der größten Anzahl führender Nullen in der Binärdarstellung der Zahlen angenähert werden.
Wenn wir beispielsweise eine Zahl beobachten, deren Ziffern in binärer Form die Form 0…(k times)…01…1
haben, dann können wir schätzen, dass es in der Menge 2k Elemente in der Menge gibt. Dies ist eine sehr grobe Schätzung, aber sie kann mit einem Skizzieralgorithmus sehr präzise verfeinert werden.
Eine gründliche Erklärung der Mechanik hinter diesem Algorithmus finden Sie in der original paper.
Hinweis: Start 1.6 Funken, wenn Funken rufen SELECT SOME_AGG(DISTINCT foo)), SOME_AGG(DISTINCT bar)) FROM df
jede Klausel für jede Klausel separate Aggregation ausgelöst werden soll. Dies ist anders als SELECT SOME_AGG(foo), SOME_AGG(bar) FROM df
wo wir einmal aggregieren. Daher ist die Leistung nicht vergleichbar, wenn count(distinct(_))
und approxCountDistinct
(oder approx_count_distinct
) verwendet werden.
Es ist eines der Verhaltensänderungen seit Spark-1.6:
Mit der verbesserten Anfrageplaner für Abfragen, die unterschiedliche Aggregationen (SPARK-9241), der Plan einer Abfrage eine einzige eindeutige Aggregation hat wurde in eine robustere Version geändert. Um zu dem Plan zurückzukehren, der von Spark 1.5 erstellt wurde, setzen Sie bitte spark.sql.specializeSingleDistinctAggPlanning auf true. (SPARK-12077)
Referenz: Approximate Algorithms in Apache Spark: HyperLogLog and Quantiles.
Nun, es kommt darauf an. Wenn Sie einen echten großen Cluster haben, können Sie Ihre Daten partitionieren. Danach können Sie eine Schleife erstellen, die jede Spalte zählen kann. Diese Zählungen werden in paralellel funktionieren. Um es klar zu stellen, wenn Sie einen Cluster mit 1000 Arbeitern haben, können Sie Ihre Daten auf 200 partitionieren. Dann können Sie zu jeder Zeit 5 Spalten zählen. Aber dein Problem ist nicht so trivial. –