2017-05-31 2 views
1

Ich habe ein Spark-Java-Programm, bei dem ein groupByKey mit einem mapValues-Schritt ausgeführt wird und ein PairRDD mit Wert als Iterable aller eingegebenen rdd-Werte zurückgibt. Ich habe gelesen, dass das Ersetzen von reduceByKey an der Stelle von groupByKey mit mapValues ​​einen Leistungsgewinn bringt, aber ich weiß nicht, wie ich reduceByKey auf mein Problem hier anwenden kann.Wie ersetzt man den groupByKey mit reduceByKey, um in Spark Java als Iterable-Wert zurückzugeben?

Speziell ich habe die ein Eingangspaar RDD, die Wert mit Typ Tuple5 hat. Nach den Umwandlungen groupByKey und mapValues ​​muss ich ein Schlüssel-Wert-Paar RDD erhalten, bei dem der Wert ein Iterable der Eingabewerte sein muss.

JavaPairRDD<Long,Tuple5<...>> inputRDD; 
... 
... 
... 
JavaPairRDD<Long, Iterable<Tuple5<...>>> groupedRDD = inputRDD 
    .groupByKey() 
    .mapValues(
      new Function<Iterable<Tuple5<...>>,Iterable<Tuple5<...>>>() { 

       @Override 
       public Iterable<Tuple5<...>> call(
         Iterable<Tuple5<...>> v1) 
         throws Exception { 

        /* 
        Some steps here..        
        */ 

        return mappedValue; 
       } 
      }); 

Gibt es eine Möglichkeit, durch die ich die obige Transformation reduceByKey mit bekommen konnte?

+0

Was sind 'Einige Schritte hier '? Du brauchst eine Logik, um es zu reduzieren. – philantrovert

+0

In der Funktion 'mapValues' sortiere ich jeden Wert basierend auf einem Schlüssel in' Tuple5'. Ich dachte, dass es hier nicht relevant ist, deshalb habe ich sie nicht aufgenommen. – Vishnu

+0

_Ich habe gelesen, dass das Ersetzen von reduceByKey an der Stelle von groupByKey mit mapValues ​​zu einer Leistungssteigerung führt - Sie haben falsch gelesen. – zero323

Antwort

1

Ich habe Scala auf Spark verwendet, also wird dies nicht die genaue Antwort sein, die Sie bevorzugen könnten. Der Hauptunterschied bei der Codierung zwischen groupByKey/mapValues und reduceByKey kann von diesen article angepasst unter Verwendung eines triviales Beispiel ersichtlich:

val words = Array("one", "two", "two", "three", "three", "three") 
val wordPairsRDD = sc.parallelize(words).map(word => (word, 1)) 

val wordCountsWithGroup = wordPairsRDD. 
    groupByKey. 
    mapValues(_.sum) 
wordCountsWithGroup.collect 
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

val wordCountsWithReduce = wordPairsRDD. 
    reduceByKey(_ + _) 
wordCountsWithReduce.collect 
res2: Array[(String, Int)] = Array((two,2), (one,1), (three,3)) 

In diesem Beispiel, wo x => x.sum (dh _.sum) in mapValues verwendet wird, wird es sein (acc, x) => acc + x (dh _ + _) in reduceByKey. Die Funktionssignaturen sind sehr unterschiedlich. In mapValues verarbeiten Sie eine Sammlung der gruppierten Werte, während Sie in reduceByKey eine Reduzierung durchführen.

+0

Um eine gruppierte Liste einer RDD-Paar zu erhalten, muss ich immer 'groupKey' verwenden, da 'reduceByKey' für Aggregatoperationen wie sum bestimmt ist. Also in meinem Fall ist 'reduceByKey' nicht möglich, oder? – Vishnu

+0

Nachdem ich Ihre Beschreibung im Kommentarbereich der Frage noch einmal gelesen habe, würde ich sagen, dass "groupByKey" wahrscheinlich der richtige Weg ist, da ich denke, dass Reduktion kein richtiges Werkzeug für die Aufgabe ist. –

Verwandte Themen