19

Ich versuche herauszufinden, wie man den größten Wert in einer Spark-Dataframe-Spalte am besten erhält.Der beste Weg, um den maximalen Wert in einer Spark-Datenframesspalte zu erhalten

Betrachten Sie das folgende Beispiel:

df = spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
df.show() 

Welche schafft:

+---+---+ 
| A| B| 
+---+---+ 
|1.0|4.0| 
|2.0|5.0| 
|3.0|6.0| 
+---+---+ 

Mein Ziel ist es, den größten Wert in Spalte A zu finden (durch Inspektion, ist dies 3,0). Mit PySpark, sind hier vier Ansätze, die ich denken kann:

# Method 1: Use describe() 
float(df.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 

# Method 2: Use SQL 
df.registerTempTable("df_table") 
spark.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 

# Method 3: Use groupby() 
df.groupby().max('A').collect()[0].asDict()['max(A)'] 

# Method 4: Convert to RDD 
df.select("A").rdd.max()[0] 

Jede der oben gibt die richtige Antwort, aber in Abwesenheit eines Spark Profilierungswerkzeuges kann ich nicht sagen, was am besten ist.

Irgendwelche Ideen aus Intuition oder Empirismus, welche der oben genannten Methoden hinsichtlich Spark-Laufzeit oder Ressourcennutzung am effizientesten sind, oder ob es eine direktere Methode als die oben genannten gibt?

+5

Die Methoden 2 und 3 sind äquivalent und verwenden identische physische und optimierte logische Pläne. Methode 4 wendet Reduzieren mit max auf rdd an. Es kann langsamer sein als direkt auf einem DataFrame zu arbeiten. Methode 1 entspricht mehr oder weniger 2 und 3. – zero323

+1

@ zero323 Was ist mit 'df.select (max (" A ")). Collect() [0] .asDict() ['max (A)']'? Sieht äquivalent zu Methode 2 aus, ist jedoch kompakter und auch intuitiver als Methode 3. – desertnaut

+0

- Am langsamsten ist die Methode 4, weil Sie die Umwandlung von DF in RDD für die gesamte Spalte durchführen und dann den Maximalwert extrahieren. –

Antwort

15
>df1.show() 
+-----+--------------------+--------+----------+-----------+ 
|floor|   timestamp|  uid|   x|   y| 
+-----+--------------------+--------+----------+-----------+ 
| 1|2014-07-19T16:00:...|600dfbe2| 103.79211|71.50419418| 
| 1|2014-07-19T16:00:...|5e7b40e1| 110.33613|100.6828393| 
| 1|2014-07-19T16:00:...|285d22e4|110.066315|86.48873585| 
| 1|2014-07-19T16:00:...|74d917a1| 103.78499|71.45633073| 

>row1 = df1.agg({"x": "max"}).collect()[0] 
>print row1 
Row(max(x)=110.33613) 
>print row1["max(x)"] 
110.33613 

Die Antwort ist fast die gleiche wie method3. aber scheint die "asDict()" in method3

entfernt werden kann
+0

kann jemand erklären warum collect() [0] benötigt wird? – jibiel

+2

@jibiel 'collect()' gibt eine Liste (in diesem Fall mit einem einzelnen Element) zurück, so dass Sie auf das erste (einzige) Element in der Liste zugreifen müssen –

2

Falls einige fragt sich, wie es zu tun mit Scala, hier gehen Sie (unter Verwendung von Spark-2.0 +.):

scala> df.createOrReplaceTempView("TEMP_DF") 
scala> val myMax = spark.sql("SELECT MAX(x) as maxval FROM TEMP_DF"). 
    collect()(0).getInt(0) 
scala> print(myMax) 
117 
6

Max-Wert für ein insbesondere Spalte eines Datenrahmens kann erreicht werden durch Einsatz -

your_max_value = df.agg({"your-column": "max"}).collect()[0][0]

0

Bemerkung: Funken soll auf Big Data arbeiten - Distributed Computing. Die Größe des Beispiels DataFrame ist sehr klein, so dass die Reihenfolge der realen Beispiele in Bezug auf das kleine Beispiel geändert werden kann.

Slowest: Method_1, weil .describe ("A") berechnet min, max, Mittelwert, stddev und (5 Berechnungen über die gesamte Spalte) zählen

Medium: Method_4, weil .rdd (DF zu RDD-Transformation) verlangsamt den Prozess.

Schneller: Method_3 ~ Method_2 ~ method_5, weil die Logik sehr ähnlich ist, so Sparks Catalyst Optimizer folgt sehr ähnliche Logik mit minimalen Anzahl von Operationen (max einer bestimmten Spalte zu erhalten, sammeln ein Single-Wert-Datenframe); (.asDict() fügt ein wenig mehr Zeit 3,2 bis 5 Vergleich)

import pandas as pd 
import time 

time_dict = {} 

dfff = self.spark.createDataFrame([(1., 4.), (2., 5.), (3., 6.)], ["A", "B"]) 
#-- For bigger/realistic dataframe just uncomment the following 3 lines 
#lst = list(np.random.normal(0.0, 100.0, 100000)) 
#pdf = pd.DataFrame({'A': lst, 'B': lst, 'C': lst, 'D': lst}) 
#dfff = self.sqlContext.createDataFrame(pdf) 

tic1 = int(round(time.time() * 1000)) 
# Method 1: Use describe() 
max_val = float(dfff.describe("A").filter("summary = 'max'").select("A").collect()[0].asDict()['A']) 
tac1 = int(round(time.time() * 1000)) 
time_dict['m1']= tac1 - tic1 
print (max_val) 

tic2 = int(round(time.time() * 1000)) 
# Method 2: Use SQL 
dfff.registerTempTable("df_table") 
max_val = self.sqlContext.sql("SELECT MAX(A) as maxval FROM df_table").collect()[0].asDict()['maxval'] 
tac2 = int(round(time.time() * 1000)) 
time_dict['m2']= tac2 - tic2 
print (max_val) 

tic3 = int(round(time.time() * 1000)) 
# Method 3: Use groupby() 
max_val = dfff.groupby().max('A').collect()[0].asDict()['max(A)'] 
tac3 = int(round(time.time() * 1000)) 
time_dict['m3']= tac3 - tic3 
print (max_val) 

tic4 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.select("A").rdd.max()[0] 
tac4 = int(round(time.time() * 1000)) 
time_dict['m4']= tac4 - tic4 
print (max_val) 

tic5 = int(round(time.time() * 1000)) 
# Method 4: Convert to RDD 
max_val = dfff.agg({"A": "max"}).collect()[0][0] 
tac5 = int(round(time.time() * 1000)) 
time_dict['m5']= tac5 - tic5 
print (max_val) 

print time_dict 

Ergebnis an einem Rand-Knoten eines Clusters in Millisekunden (ms):

kleine DF (ms): {'m1': 7096, 'm2': 205, 'm3': 165, 'm4': 211, 'm5': 180}

größer DF (ms): {'m1': 10260, 'm2 ': 452,' m3 ': 465,' m4 ': 916,' m5 ': 373}

Verwandte Themen