2016-04-29 15 views
-1

Ich haben ein RDD mit folgenden Struktur:Spark-Scala: GroupByKey und Art

val rdd = RDD[ (category: String, product: String, score: Double) ] 

Mein Ziel ist es, group die Daten auf Basis der Kategorie, und dann für jede Kategorie sort w.r.t. Punktzahl von Tuple 2 (product, score). Derzeit ist mein Code:

val result = rdd.groupByKey.mapValues(v => v.toList.sortBy(-_._2)) 

Dies erweist sich als sehr teure Operation für die Daten, die ich habe. Ich bin auf der Suche nach Verbesserung der Leistung mit alternativen Ansatz.

+0

Warum ist es so wichtig, zu sortieren? –

+0

Es könnte hilfreich sein, wenn Sie grobe Größen angeben könnten - wie viele Artikel in der Original-RDD, wie viele Kategorien, wie viele Artikel pro Kategorie im Durchschnitt. Wie lange dauert das, auf welcher Art von Hardware? Wie schnell brauchst du es? – DNA

+0

Wie planen Sie die sortierten Daten zu konsumieren? Haben Sie vor, durch alle zu iterieren, wollen Sie nur die beste finden? – marios

Antwort

3

schwierig, ohne zu wissen, Ihre Daten-Set zu beantworten, aber die documentation hat einige Hinweise re: groupByKey Leistung:

Hinweis: Dieser Vorgang ist sehr teuer sein kann. Wenn Sie eine Gruppierung in bestellen, um eine Aggregation (z. B. eine Summe oder einen Durchschnitt) über jeden Schlüssel durchzuführen, bietet die Verwendung von PairRDDFunctions.aggregateByKey oder PairRDDFunctions.reduceByKey eine wesentlich bessere Leistung.

Es hängt also davon ab, was Sie mit den sortierten Listen machen wollen. Wenn Sie die gesamte Liste benötigen, kann es schwierig sein, groupByKey zu verbessern. Wenn Sie eine Art von Aggregation durchführen, sind die oben beschriebenen alternativen Operationen möglicherweise besser (aggregateByKey, reduceByKey).

Abhängig von der Größe Ihrer Listen, kann effizienter sein, eine alternative Sammlung (z. B. veränderbares Array) vor dem Sortieren zu verwenden.

Bearbeiten: Wenn Sie eine relativ kleine Anzahl von Kategorien haben, können Sie versuchen, die ursprüngliche RDD wiederholt zu filtern und jede gefilterte RDD zu sortieren. Obwohl eine ähnliche Menge an Arbeit insgesamt durchgeführt wird, kann es zu einem bestimmten Zeitpunkt weniger Speicher verbrauchen.

Bearbeiten 2: Wenn Speichermangel ein Problem ist, können Sie Ihre Kategorien und Produkte möglicherweise als Integer-IDs und nicht als Zeichenfolgen darstellen und nur später nachschlagen. Auf diese Weise könnte Ihre Haupt-RDD viel kleiner sein.

+0

ja, ich muss die ganze Liste behalten. Dies entspricht einem Geschäftsfall, bei dem ich für jede Kategorie die Produkte anhand ihrer Ränge auflisten muss. – Mohitt

0

Ist Ihre RDD gerecht auf Kategorien verteilt? Je nachdem, wie hoch der Skew-Faktor ist, können Probleme auftreten. so etwas wie dies versuchen, wenn Sie nicht zu viele Schlüsselwerte haben:

val rdd: RDD[(String, String, Double)] = sc.parallelize(Seq(("someCategory","a",1.0),("someCategory","b",3.0),("someCategory2","c",4.0))) 

rdd.keyBy(_._1).countByKey().foreach(println) 
+0

ja, die Verteilung ist nicht schlecht verzerrt. – Mohitt

Verwandte Themen