2017-02-14 4 views
0

Ich habe eine RDD von Produktbewertungen mit dem MLlib Rating-Objekt, das nur ein Tupel von (int userId, int productId, doppelte Bewertung) ist. Ich möchte jedes Element aus der RDD entfernen, das eine Überprüfung eines Produkts mit zu geringen Bewertungen darstellt.Eine RDD basierend auf der Anzahl der Vorkommen filtern

Zum Beispiel könnten die RDD sein:

Rating(35, 1, 5.0) 
Rating(18, 1, 4.0) 
Rating(29, 2, 3.0) 
Rating(12, 2, 2.0) 
Rating(65, 3, 1.0) 

und wenn ich, dass jedes Produkt mit weniger als 2 Bewertungen entfernen gefiltert, wäre es nur die letzte Bewertung herauszufiltern und die ersten vier zurückgeben. (Ich möchte mit einem viel höheren Wert als 2 filtern, aber nur zum Beispiel).

Zur Zeit hat mich diesen Code, der eine Folge des Produkt-IDs in der Reihenfolge der Anzahl der Bewertungen gibt, aber ich war nicht sicher, einen Weg von der Haupt RDD zu filtern, basierend auf, dass und wie es scheint ineffizient sowieso:

val mostRated = ratings.map(_._2.product) 
         .countByValue 
         .toSeq 
         .sortBy(- _._2) 
         .map(_._1) 

Antwort

0

Sie können Gruppe der rdd von ProductId und dann auf dem Filter basiert, wenn die Länge der Gruppe größer als der Schwellenwert ist (1 hier). Verwenden Sie flatMap die Ergebnisse aus den gruppierten rdd zu extrahieren:

case class Rating(UserId: Int, ProductId: Int, Rating: Double) 

val ratings = sc.parallelize(Seq(Rating(35, 1, 5.0), 
    Rating(18, 1, 4.0), 
    Rating(29, 2, 3.0), 
    Rating(12, 2, 2.0), 
    Rating(65, 3, 1.0))) 

val prodMinCounts = ratings.groupBy(_.ProductId). 
          filter(_._2.toSeq.length > 1). 
          flatMap(_._2) 
prodMinCounts.collect 
// res14: Array[Rating] = Array(Rating(35,1,5.0), Rating(18,1,4.0), Rating(29,2,3.0), Rating(12,2,2.0)) 
Verwandte Themen