Wie würde ich nach list.contains() filtern? Dies ist mein derzeitiger Code, ich habe eine Main-Klasse, die Eingaben von Befehlszeilenargumenten erhält und gemäß dieser Eingabe den entsprechenden Dispatcher ausführt. In diesem Fall ist es ein RecommendationDispatcher Klasse, die alle es ist Magie im Konstruktor tut - trainiert ein Modell und generiert Empfehlungen für verschiedene Benutzer, die eingegeben werden:Funken mit Scala: RDD durch Werte filtern, die in einer anderen Liste enthalten sind
import org.apache.commons.lang.StringUtils.indexOfAny
import java.io.{BufferedWriter, File, FileWriter}
import java.text.DecimalFormat
import Util.javaHash
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.apache.spark.{SparkConf, SparkContext}
class RecommendDispatcher(master:String, inputFile:String, outputFile:String, userList: List[String]) extends java.io.Serializable {
val format : DecimalFormat = new DecimalFormat("0.#####");
val file = new File(outputFile)
val bw = new BufferedWriter(new FileWriter(file))
val conf = new SparkConf().setAppName("Movies").setMaster(master)
val sparkContext = new SparkContext(conf)
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val baseRdd = sparkContext.textFile(inputFile)
val movieIds = baseRdd.map(line => line.split("\\s+")(1)).distinct().map(id => (javaHash(id), id))
val userIds = baseRdd.map(line => line.split("\\s+")(3)).distinct()
.filter(x => userList.contains(x))
.map(id => (javaHash(id), id))
val ratings = baseRdd.map(line => line.split("\\s+"))
.map(tokens => (tokens(3),tokens(1), tokens(tokens.indexOf("review/score:")+1).toDouble))
.map(x => Rating(javaHash(x._1),javaHash(x._2),x._3))
// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)
val users = userIds.collect()
var mids = movieIds.collect()
usrs.foreach(u => {
bw.write("Recommendations for " + u + ":\n")
var ranked = List[(Double, Int)]()
mids.foreach(x => {
val movieId = x._1
val prediction = (model.predict(u._1, movieId), movieId)
ranked = ranked :+ prediction
})
//Sort in descending order
ranked = ranked.sortBy(x => -1 * x._1)
ranked.foreach(x => bw.write(x._1 + " ; " + x._2 + "\n"))
})
bw.close()
}
Und diese Ausnahme wird auf der „.filter“ Linie geworfen:
Exception in thread "main" org.apache.spark.SparkException: Aufgabe nicht serializable
Was ist der Typ von 'userList'? –
It's List [String] – Ethan
@Ethan Task nicht serialisierbare Ausnahmen sind das Ergebnis der Schließung "Leckage". Der beste Weg, diese Art von Fehlern zu debuggen, ist die Verwendung des vollständigen Code-Kontexts. Ich würde vorschlagen, dass Sie Ihren gesamten Code in ein Databricks Community Edition-Notizbuch schreiben und dann den Link hier teilen. Sie können sich unter https://accounts.cloud.databricks.com/registration.html#signup/community anmelden. – Sim