2016-11-23 3 views
1

Meine Frage ist über Alternativen/Optimierung zu groupBy() Betrieb auf RDD. Ich habe Millionen von Message Instanzen, die basierend auf einer ID gruppiert werden müssen. Ich benutze groupBy(), um das Ziel zu erreichen, aber ich weiß, dass es teuer ist und es sehr viel Verarbeitungszeit erfordert.Wie optimiert man groupBy() Operation auf Spark RDD

Also habe ich versucht reduceByKey(func) und combineByKey() als Alternative, die in einigen der Ressourcen vorgeschlagen wurde, dass ich ging durch, aber es ist nicht eine Anpassung an mein Szenario, in dem Meine Forderung ist die Gruppierung nur. Aber meine Datengröße ist riesig, wo groupBy verbraucht mehr als 70% meiner Arbeitszeit. So sind alle Vorschläge oder Lösungen geschätzt.

Meine Eingangsdaten

Seq (neue Nachricht ("1", "abc", "Request"), neue Nachricht ("1", "cba", "Response"), neue Nachricht ("2", "def", "Request"), neue Nachricht ("2", "fed", "Response"), neue Nachricht ("3", "ghi", "Request"), neu Message ("3", "IHG", "Response"))

aus Gründen der Einfachheit halber habe ich Eingabedaten in Seq gegeben

def groupMessages(sourceRdd: RDD[Message]): Unit { 

    val messageIdRdd = sourceRdd.mapPartitions(partitionData => { 
     partitionData.map(row => (row.uniqueID(), row)) 

    }) 

    /* 
    Group messages based on its message id. This functionality is to combine request response 
    together as single transaction based on message id. 
    */ 
    val groupbyRdd = messageIdRdd.groupBy(_._1) 
} 

Erwartetes Ergebnis

(2, CompactBuffer ((2, Nachricht (2, def, Request)), (2, Message (2, gefüttert, Response)))) (3, CompactBuffer ((3, Nachricht (3, ghi, Anfrage)), (3, Nachricht (3, ihg, Antwort)))) (1, CompactBuffer ((1, Nachricht (1, abc, Anfrage)), (1, Nachricht (1, cba, Antwort))))

+0

was meinst du mit 'group rows'? Bitte illustrieren Sie mit Beispieldaten und erwarteter Ausgabe. – mtoto

+0

@ mtoto danke für deine antwort. Ich habe meinen Beitrag aktualisiert. hoffe, dass die angegebenen Details klar genug sind. – BDR

+2

"Meine Anforderung ist nur Gruppierung". Gruppierung ist von Natur aus eine teure Operation in einem großen Datensatz.Es mag keine Wunderwaffe geben –

Antwort

0

Sie sollten groupByKey() statt groupBy() verwenden, damit die Ausführung im Cluster erfolgt.

val myRDD = sc.parallelize(List(("1", "abc", "Request"), ("1", "cba", "Response"), ("2", "def", "Request"), ("2", "fed", "Response"), ("3", "ghi", "Request"), ("3", "ihg", "Response"))) 

Ihre groupBy() ist:

scala> myRDD.map(x=>x).groupBy(_._1).take(10).foreach(println) 
(2,CompactBuffer((2,def,Request), (2,fed,Response))) 
(3,CompactBuffer((3,ghi,Request), (3,ihg,Response))) 
(1,CompactBuffer((1,abc,Request), (1,cba,Response)))) 

groupByKey() Code wäre:

scala> myRDD.map(x=>(x._1,x)).groupByKey().take(10).foreach(println) 
(2,CompactBuffer((2,def,Request), (2,fed,Response))) 
(3,CompactBuffer((3,ghi,Request), (3,ihg,Response))) 
(1,CompactBuffer((1,abc,Request), (1,cba,Response)) 

Hoffe, es hilft.

+0

ist irgendein Leistungsunterschied zwischen groupByKey() und groupBy()? – BDR

+0

Ja für riesige Datenmengen. groupBy(): Die Verarbeitung erfolgt im Treiber/Client-Programm. groupByKey() passiert bei Mappern (im Cluster), denken Sie daran, dies ist auch teuer, aber Sie sagten, Sie haben keine andere Wahl, als Gruppierung zu verwenden, weil dies viel Mischen (Bewegung von Daten an die Schlüsselposition) beinhaltet. – KiranM

+0

der Punkt, den Sie gemacht haben, ist nicht korrekt. Ich habe dazu eine Detailanalyse gemacht. https://balajireddyblog.blogspot.in/2016/12/groupby-vs-groupbykey-in-this-posti.html – BDR

-2

Wenn y Unsere Anforderung ist nur Gruppierung Sie können für groupByKey() gehen. Wenn Sie eine Aggregation (Summe, Durchschnitt, usw.) durchführen wollen, können Sie entweder reduceByKey() oder combinedByKey() verwenden. Der Hauptunterschied zwischen reduceByKey() und combinedByKey() ist der Rückgabetyp. Der Rückgabetyp reduceByKey() sollte mit dem Werttyp übereinstimmen, in combinedByKey() jedoch möglicherweise anders. Da Ihre Antwort so aussieht, als ob Sie nur an der Gruppierung interessiert sind, können Sie sich für groupByKey() entscheiden.

+0

ja. Meine Anforderung ist nur Gruppierung. Aber meine Datenmenge ist riesig, wo groupBy mehr als 70% meiner Arbeitszeit verbraucht. Also suche ich nach einer besseren Lösung. – BDR

+0

Wie viele Partitionen haben Sie? –

+0

Gerade jetzt 20+. Aber die Zahlen werden in naher Zukunft steigen. – BDR