-2

Ich habe einen Datensatz test1.txt. Es enthält Daten wie untenWie aggregiert man Daten in Spark mit Scala?

2::1::3 
1::1::2 
1::2::2 
2::1::5 
2::1::4 
3::1::2 
3::1::1 
3::2::2 

Ich habe Datenrahmen mit dem folgenden Code erstellt.

case class Test(userId: Int, movieId: Int, rating: Float) 
def pRating(str: String): Rating = { 
val fields = str.split("::") 
assert(fields.size == 3) 
Test(fields(0).toInt, fields(1).toInt, fields(2).toFloat) 
} 

val ratings = spark.read.textFile("C:/Users/test/Desktop/test1.txt").map(pRating).toDF() 
2,1,3 
1,1,2 
1,2,2 
2,1,5 
2,1,4 
3,1,2 
3,1,1 
3,2,2 

Aber ich möchte die Ausgabe wie folgt drucken i.e. Entfernen von doppelten Kombinationen und anstelle von field(2) value sum of values1,1, 2.0.

1,1,2.0 
1,2,2.0 
2,1,12.0 
3,1,3.0 
3,2,2.0 

Bitte helfen Sie mir dabei, wie dies erreicht werden kann.

+1

dataframe.groupBy ("column1", "column2"). Sum ("column3") sollte funktionieren – Fabich

+0

Thanks funktioniert –

Antwort

0
ratings.groupBy("userId","movieId").sum(rating) 
+0

Dieser Code erfüllt nicht die Anforderung, doppelte Zeilen zu entfernen. Du brauchst 'distinct' vor' groupBy'. – Sim

3

Um Duplikate zu löschen, verwenden Sie df.distinct. Um Sie zuerst zu aggregieren groupBy und dann agg. Putting das alles zusammen:

case class Rating(userId: Int, movieId: Int, rating: Float) 

def pRating(str: String): Rating = { 
    val fields = str.split("::") 
    assert(fields.size == 3) 
    Rating(fields(0).toInt, fields(1).toInt, fields(2).toFloat) 
} 

val ratings = spark.read.textFile("C:/Users/test/Desktop/test1.txt").map(pRating) 
val totals = ratings.distinct 
    .groupBy('userId, 'movieId) 
    .agg(sum('rating).as("rating")) 
    .as[Rating] 

Ich bin nicht sicher, dass Sie das Endergebnis als Dataset[Rating] wollen würden und ob die distinct und sum Logik ist genau so, wie Sie es als Beispiel in der Frage wollen würden, ist nicht ganz klar, aber, hoffentlich, das wird dir geben, was du brauchst.

Verwandte Themen