2016-06-11 6 views
2

Wie würde ich nach list.contains() filtern? Dies ist mein derzeitiger Code, ich habe eine Main-Klasse, die Eingaben von Befehlszeilenargumenten erhält und gemäß dieser Eingabe den entsprechenden Dispatcher ausführt. In diesem Fall ist es ein RecommendationDispatcher Klasse, die alle es ist Magie im Konstruktor tut - trainiert ein Modell und generiert Empfehlungen für verschiedene Benutzer, die eingegeben werden:Funken mit Scala: RDD durch Werte filtern, die in einer anderen Liste enthalten sind

import org.apache.commons.lang.StringUtils.indexOfAny 
import java.io.{BufferedWriter, File, FileWriter} 
import java.text.DecimalFormat 
import Util.javaHash 
import org.apache.spark.mllib.recommendation.ALS 
import org.apache.spark.mllib.recommendation.Rating 


import org.apache.spark.{SparkConf, SparkContext} 

class RecommendDispatcher(master:String, inputFile:String, outputFile:String, userList: List[String]) extends java.io.Serializable { 

    val format : DecimalFormat = new DecimalFormat("0.#####"); 
    val file = new File(outputFile) 
    val bw = new BufferedWriter(new FileWriter(file)) 

    val conf = new SparkConf().setAppName("Movies").setMaster(master) 
    val sparkContext = new SparkContext(conf) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext) 
    val baseRdd = sparkContext.textFile(inputFile) 




    val movieIds = baseRdd.map(line => line.split("\\s+")(1)).distinct().map(id => (javaHash(id), id)) 

    val userIds = baseRdd.map(line => line.split("\\s+")(3)).distinct() 
             .filter(x => userList.contains(x)) 
             .map(id => (javaHash(id), id)) 


    val ratings = baseRdd.map(line => line.split("\\s+")) 
    .map(tokens => (tokens(3),tokens(1), tokens(tokens.indexOf("review/score:")+1).toDouble)) 
     .map(x => Rating(javaHash(x._1),javaHash(x._2),x._3)) 

    // Build the recommendation model using ALS 
    val rank = 10 
    val numIterations = 10 
    val model = ALS.train(ratings, rank, numIterations, 0.01) 

    val users = userIds.collect() 
    var mids = movieIds.collect() 

    usrs.foreach(u => { 
     bw.write("Recommendations for " + u + ":\n") 
     var ranked = List[(Double, Int)]() 
     mids.foreach(x => { 
     val movieId = x._1 
     val prediction = (model.predict(u._1, movieId), movieId) 
     ranked = ranked :+ prediction 
     }) 
     //Sort in descending order 
     ranked = ranked.sortBy(x => -1 * x._1) 
     ranked.foreach(x => bw.write(x._1 + " ; " + x._2 + "\n")) 
    }) 

    bw.close() 

} 

Und diese Ausnahme wird auf der „.filter“ Linie geworfen:

Exception in thread "main" org.apache.spark.SparkException: Aufgabe nicht serializable

+0

Was ist der Typ von 'userList'? –

+0

It's List [String] – Ethan

+0

@Ethan Task nicht serialisierbare Ausnahmen sind das Ergebnis der Schließung "Leckage". Der beste Weg, diese Art von Fehlern zu debuggen, ist die Verwendung des vollständigen Code-Kontexts. Ich würde vorschlagen, dass Sie Ihren gesamten Code in ein Databricks Community Edition-Notizbuch schreiben und dann den Link hier teilen. Sie können sich unter https://accounts.cloud.databricks.com/registration.html#signup/community anmelden. – Sim

Antwort

2

ich denke, dass ein guter Ansatz ist Ihr userList in konvertieren zu einem broadcast variable.

val broadcastUserList= sc.broadcast(userList) 
val userIds = baseRdd.map(line => line.split("\\s+")(3)).distinct() 
             .filter(x => broadcastUserList.value.contains(x)) 
             .map(id => (javaHash(id), id)) 
+0

Scheint keinen Effekt zu haben, das Ergebnis ist das gleiche. – Ethan

+0

Gibt einen Hashwert für den Eingabezeichenfolgenwert zurück: id – Ethan

+0

Wie geht das? @Ethan –

0

ich Sim bin zu raten, ist recht über diese seine closure „Leckage“ und den Beispielcode Du stark vereinfacht vorgesehen ist.

Wenn Ihr Haupt sieht wie folgt aus:

object test 
{ 
    def main(args: Array[String]): Unit = 
    { 
    val sc = ... 
    val rdd1 = ... 
    val userList = ... 
    val rdd2 = rdd1.filter { list.contains(_) } 
    } 
} 

Dann keine Serialisierung Fehler auftritt. "userList", das serialisierbar ist, hat kein Problem, zu den Executoren serialisiert zu werden ...

Die Probleme beginnen, wenn Sie anfangen, Ihr "großes" Haupt in verschiedene Klassen zu modellieren. Hier

ist ein Beispiel dafür, wie die Dinge vielleicht falsch:

class FilterLogic 
{ 
    val userList = List(1) 
    def filterRDD(rdd : RDD[ Int ]) : RDD[ Int ] = 
    { 
    rdd.filter { list.contains(_) } 
    } 
} 

object Test 
{ 
    def main(args: Array[String]): Unit = 
    { 
    val sc = ... 
    val rdd1 = ... 
    val rdd2 = new FilterLogic().filterRDD(rdd1)// This will result in a serialization error!!! 
    } 
} 

Jetzt, wo userlist ist ein Wert, der Logic-Klasse, wenn es um den Testamentsvollstrecker serialisiert über werden muss, fordert sie die gesamte Verpackung Logic Klasse auch serialisiert werden (Warum? Denn in Scala ist userList eigentlich ein Getter in der Logic-Klasse).

Ein paar Möglichkeiten, dieses Problem zu lösen:

1) userlist kann innerhalb der filterRDD Funktion erstellt werden, ist es nicht ein val von Logic dann ist (funktioniert aber Grenzen Code-Sharing/Modellierung)

1.1) ähnliche Idee, wie so eine temporäre val in der der filterRDD Funktion verwendet:

val list_ = list ; rdd.filter { list_.contains(_) } 

funktioniert, aber so hässlich, es ist fast schmerzhaft ...

2) Logik-Klasse kann Serializable gemacht werden (manchmal ist es vielleicht nicht möglich, es serialisierbar zu machen)

Schließlich kann die Verwendung eines Broadcasts seine Vorteile haben (oder nicht haben), aber es hängt nicht mit dem Serialisierungsfehler zusammen.

0

Ich habe versucht, die RecommendDispatcher-Klasse zu serialisieren, habe aber immer noch die gleiche Ausnahme. Also entschied ich mich dafür, den Code in die Main-Klasse zu legen und das löste mein Problem.

Verwandte Themen