2016-05-09 7 views
1

Ich habe eine große Menge von Daten, die ich Clustering durchführen möchte. Der Catch ist, ich möchte kein Clustering für das ganze Set, sondern ein Clustering für jeden Benutzer. Im Wesentlichen würde ich zuerst eine groupby-Benutzer-ID erstellen und dann KMeans ausführen.Wie führe ich RDD-Operationen nach einer groupby in Spark aus?

Das Problem ist, wenn Sie eine Groupby tun, würde jede Zuordnung außerhalb des Spark Controller-Kontexts sein, so dass jeder Versuch, RDDs zu erstellen, fehlschlagen würde. Sparks KMeans lib in mllib benötigt eine RDD (damit sie parallelisiert werden kann).

Ich sehe zwei Problemumgehungen, aber ich hatte gehofft, dass es eine bessere Lösung gab.

1) Manuelles Durchlaufen aller Tausenden von Benutzern in der Steuerung (vielleicht Millionen, wenn die Dinge groß werden) und Ausführen von KMeans für jede von ihnen.

2) Führen Sie groupby im Controller aus, und führen Sie dann in map einen nicht parallelen Kmeans aus, der von einer externen Bibliothek bereitgestellt wird.

Bitte sagen Sie mir, es gibt einen anderen Weg, ich hätte lieber alles || wie möglich.

Antwort

1

Edit: Ich wusste nicht, es war pyspark im Moment der Antwort. Allerdings werde ich es als eine Idee, die angepasst werden kann

Ich hatte ein ähnliches Problem und ich war in der Lage, die Leistung zu verbessern, aber es war immer noch nicht die ideale Lösung für mich. Vielleicht könnte es für dich funktionieren.

Die Idee war, die RDD in vielen kleineren RDDs (eine neue für jede Benutzer-ID) zu brechen, sie in einem Array zu speichern und dann die Verarbeitungsfunktion (Clustering in Ihrem Fall) für jede "Sub-RDD" aufzurufen. Der vorgeschlagene Code wird unten (Erklärung in den Kommentaren) gegeben:

// A case class just to use as example 
case class MyClass(userId: Long, value: Long, ...) 

// A Scala local array with the user IDs (Could be another iterator, such as List or Array): 
val userList: Seq[Long] = rdd.map{ _.userId }.distinct.collect.toSeq // Just a suggestion! 

// Now we can create the new rdds: 
val rddsList: Seq[RDD[MyClass]] = userList.map { 
    userId => rdd.filter({ item: MyClass => item.userId == userId }) 
}.toSeq 

// Finally, we call the function we want for each RDD, saving the results in a new list. 
// Note the ".par" call, which is used to start the expensive execution for multiple RDDs at the same time 
val results = rddsList.par.map { 
    r => myFunction(r) 
} 

Ich weiß, das als erste Option in etwa gleich ist, sondern durch die .par Call, ich war in der Lage, die Leistung zu verbessern.

Dieser Aufruf transformiert das Objekt rddsList in ein Objekt ParSeq. Dieses neue Scala-Objekt ermöglicht eine parallele Berechnung, sodass die Kartenfunktion im Idealfall myFunction(r) für mehrere RDDs gleichzeitig aufruft, was die Leistung verbessern kann.

Weitere Informationen zu parallelen Sammlungen finden Sie unter Scala Documentation.

+0

Was ist der Typ von userList? Array? Ich versuche, die "Par" -Methode zu finden –

+0

userList ist ein lokaler Scala-Iterator (Array, Liste, Seq, ...) –

+0

Hmm, so ist Teil der Spark API? Wie hängt es mit einem nativen Scala-Typ zusammen? Das nächste, was ich in den Dokumenten finden kann, ist var rdd = sc.parallelize (data); was nicht dasselbe tut, was du hier sagst. Können Sie auf eine Doc-Seite darüber zeigen? –

Verwandte Themen