2016-09-19 4 views
0

Ich versuche, eine Reihe von Transformationen auf Protokolldaten mit Scala durchzuführen, und ich habe Schwierigkeiten mit dem Abgleich Tupel. Ich habe einen Datenrahmen mit Benutzer-IDs, URLs und Daten. Ich kann den Datenrahmen zu einer RDD Karte und durch Schlüssel mit dieser Karte reduzieren:Scala Spark Map Typ passendes Problem

val countsRDD = usersUrlsDays.map { case Row(date:java.sql.Date, user_id:Long, url:String) => Tuple2(Tuple2(user_id, url), 1) }.rdd.reduceByKey(_+_) 

Das gibt mir eine RDD von ((user_id, URL), count):

scala> countsRDD.take(1) 
res9: Array[((Long, String), Int)]  
scala> countsRDD.take(1)(0) 
res10: ((Long, String), Int) 

Jetzt möchte ich dass durch uRL zu invertieren zu ergeben:

(url, [(user_id, count), ...]) 

ich habe dies versucht:

val urlIndex = countsRDD.map{ case Row(((user_id:Long, url:String), count:Int)) => Tuple2(url, List(Tuple2(user_id, count))) }.reduceByKey(_++_) 

Dies erzeugt Fehler Spiel, aber:

scala.MatchError: ... (of class scala.Tuple2) 

ich viele, viele verschiedene Permutationen von diesen zwei Karte mit explizit und implizit Typen nennt versucht haben, und diese scheint am weitesten gekommen um mich zu haben. Ich hoffe, dass hier jemand helfen kann, mich in die richtige Richtung zu lenken.

Antwort

2

So etwas sollte funktionieren:

countsRDD 
    .map{ case ((user_id, url), count) => (url, (user_id, count)) } 
    .groupByKey 
  • countsRDD ist RDD[((String, String), Int)] nicht RDD[Row].
  • Es ist nicht erforderlich, TupleN zu verwenden. Tuple Literale funktionieren gut.
  • Da countsRDD statisch typisiert ist (anders als RDD[Row]), müssen Sie keine Typen angeben.
  • Verwenden Sie nicht reduceByKey für Listenverkettung. Es ist der schlechteste mögliche Ansatz, den Sie verwenden können, und ignoriert die Komplexität der Berechnungen, den Müllkoelektor und den gesunden Menschenverstand. Wenn Sie wirklich brauchen gruppierte Daten verwenden Operation, die dafür ausgelegt ist.