2017-05-18 5 views
1

Wie kann ich im Folgenden das Tupel mit dem höchsten Wert erhalten?Den maximalen Wert in Spark RDD finden

Array[(String, Int)] = Array((a,30),(b,50),(c,20)) 

In diesem Beispiel ist das Ergebnis, das ich würde (b,50)

+0

Sie fragen über das Finden von Maximum in einer RDD während ein Beispiel mit 'Array [(String, Int)]' zeigen. Ich vermisse die Verbindung zwischen Sparks RDD API und Scala. Wie groß ist das Input-Array? –

Antwort

3

Sie reduce() verwenden:

val max_tuple = rdd.reduce((acc,value) => { 
    if(acc._2 < value._2) value else acc}) 
//max_tuple: (String, Int) = (b,50) 

Daten

val rdd = sc.parallelize(Array(("a",30),("b",50),("c",20))) 
1

Wenn die Elemente Sie könnten einfach immer Tupel von zwei Elementen sein will:

Array((a,30),(b,50),(c,20)).maxBy(_._2) 

Wie in den docs angegeben.

+0

Gibt es neben maxBy noch eine andere Möglichkeit? – ZeroDarkThirty

+0

Was ist falsch an der Verwendung von 'maxBy'? Es sieht wie der einfachste, geradlinigste Weg aus. – Dani

+0

Ich bekomme den folgenden Fehler, wenn ich MaxBy verwende: 'error: Wert maxBy ist kein Mitglied von org.apache.spark.rdd.RDD [(String, int)]' – ZeroDarkThirty

1

Wenn Sie neu zu entfachen, sollte ich Ihnen sagen, dass Sie verwenden, um Dataframe so viel wie möglich s, sie Haben Sie viele Vorteile im Vergleich mit RDD s, mit Dataframe s können Sie das Maximum wie folgt erhalten:

import spark.implicits._ 
import org.apache.spark.sql.functions.max 
val df = Seq(("a",30),("b",50),("c",20)).toDF("x", "y") 
val x = df.sort($"y".desc).first() 

Das sollte funktionieren, es funktioniert für mich zumindest. hoffe das hilft dir

+0

Wenn Sie in Datenframes konvertieren möchten, verwenden Sie besser die Aggregatfunktion 'max()'. – mtoto

+0

das Problem damit, dass es nur den Wert einer Spalte zurückgibt und wir wollen die ganze Zeile –

0

reduce() gibt für mich falsches Ergebnis zurück. Es gibt einige andere Optionen:

val maxTemp2 = rdd.max()(Ordering[Int].on(x=>x._2)) 
val maxTemp3 = rdd.sortBy[Int](x=>x._2).take(1)(0) 

Daten

val rdd = sc.parallelize(Array(("a",30),("b",50),("c",20))) 
0

rdd.reduceByKey ((a, b) => a + b) .collect.maxBy (_._ 2)

können wir MaxBy auf sammeln wie diese verwenden