2016-08-07 9 views
6

Angesichts eines Spark DataFrame , ich möchte den maximalen Wert in einer bestimmten numerischen Spalte 'values' finden, und erhalten Sie die Zeile (n), wo dieser Wert erreicht wurde. Ich kann dies natürlich tun:argmax in Spark DataFrames: wie die Zeile mit dem maximalen Wert abgerufen wird

# it doesn't matter if I use scala or python, 
# since I hope I get this done with DataFrame API 
import pyspark.sql.functions as F 
max_value = df.select(F.max('values')).collect()[0][0] 
df.filter(df.values == max_value).show() 

aber dies ist ineffizient, da es zwei Durchgänge durch df erfordert.

pandas.Series/DataFrame und numpy.array haben argmax/idxmax Methoden, die dies effizient zu tun (in einem Durchgang). Dies gilt auch für Standard-Python (die integrierte Funktion max akzeptiert einen Schlüsselparameter, damit der Index des höchsten Werts gefunden werden kann).

Was ist der richtige Ansatz in Spark? Beachten Sie, dass es mir nichts ausmacht, ob ich alle Zeilen, in denen der Maximalwert erreicht wird, oder nur eine beliebige (nicht leere!) Teilmenge dieser Zeilen bekomme.

+0

In der Regel gibt es keine bessere Lösung ist, die sprachübergreifende und auf beliebige Daten arbeiten können. – zero323

+0

@ zero323 Warum ist es unmöglich, den RDD-Code in der Antwort unten in einer DataFrame-API zu verpacken, indem man ihn in Scala umwandelt und richtige Metadaten für Catalyst hinzufügt, um daran zu arbeiten? – max

+0

Es ist möglich, aber es bricht eindeutig die Annahme, dass _it es egal ist, wenn Sie Scala oder Python verwenden_ Sie können es auch mit Ordnable Datentypen mit SQL allein tun, aber dies ist ein Sonderfall nicht allgemeine Lösung. – zero323

Antwort

10

Wenn Schema is Orderable (Schema enthält nur atomics/Arrays von atomics/rekursiv bestellbaren structs) Sie einfach Aggregationen verwenden können:

Python:

df.select(F.max(
    F.struct("values", *(x for x in df.columns if x != "values")) 
)).first() 

Scala:

df.select(max(struct(
    $"values" +: df.columns.collect {case x if x!= "values" => col(x)}: _* 
))).first 

Ansonsten können Sie über Dataset (Scala nur) reduzieren, aber es erfordert zusätzliche Deserialisierung:

type T = ??? 

df.reduce((a, b) => if (a.getAs[T]("values") > b.getAs[T]("values")) a else b) 
+0

Ein bisschen schwierig, muss ich über diese "struct" -Methode lesen –

+0

Haben Sie etwas dagegen, eine Erklärung/Definition von Orderable Schema zu verknüpfen? Google-Suche fand nur diese Antwort :) – max

+0

https://github.com/apache/spark/blob/d6dc12ef0146ae409834c78737c116050961f350/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/ordering. scala # L89-L96 – zero323

2

Vielleicht ist es eine unvollständige Antwort, aber Sie können DataFrame 's interne RDD verwenden, wenden Sie die max Methode und erhalten Sie die maximale Aufzeichnung mit einem bestimmten Schlüssel.

a = sc.parallelize([ 
    ("a", 1, 100), 
    ("b", 2, 120), 
    ("c", 10, 1000), 
    ("d", 14, 1000) 
    ]).toDF(["name", "id", "salary"]) 

a.rdd.max(key=lambda x: x["salary"]) # Row(name=u'c', id=10, salary=1000) 
+1

Kann ich annehmen, 1 Durchlauf mit RDD API (Scala, um Python Overhead zu vermeiden) ist schneller als 2 Durchläufe mit DataFrame API garantiert? Oder gibt es einige Optimierungen, die Catalyst hier machen könnte? – max

Verwandte Themen