HalloException in thread "main" org.apache.spark.SparkException: Aufgabe nicht serializable
Ich verwende Scala 2.11.8 und Funken 1.6.1. wenn ich Funktion innerhalb Karte nenne, wirft es die folgende Ausnahme:
"Exception in thread "main" org.apache.spark.SparkException: Task not serializable"
Sie unter meinem vollständigen Code finden
package moviestream.recommender
import java.io
import java.io.Serializable
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
import org.jblas.DoubleMatrix
class FeatureExtraction{
val conf = new SparkConf().setMaster("local[2]").setAppName("Recommendation")
val sc = new SparkContext(conf)
val rawData = sc.textFile("data/u.data")
val rawRatings = rawData.map(_.split("\t").take(3))
//create rating object from rawratings
val ratings = rawRatings.map{case Array(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble)}
//user Spark ALS library to train our model
// Build the recommendation model using ALS
val model = ALS.train(ratings,50,10,0.01)
//val model = ALS.trainImplicit(ratings,50,10,0.01,0.1) //last parameter is alpha
val predictedRating = model.predict(789,123)
//top ten recommended movies for user id 789, where k= number of recommended(10) 789=userid
val topKRecs = model.recommendProducts(789,10)
val movies = sc.textFile("data/u.item")
val titles = movies.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap()
//how many movies this user has rated
val moviesForUser = ratings.keyBy(_.user).lookup(789)
//we will take the 10 movies with the highest ratings ction using the field of the object.
//moviesForUser.sortBy(-_.rating).take(10).map(rating=>(titles(rating.product),rating.rating)).foreach(println)
//let’s take a look at the top 10 recommendations for this user and see what the titles
//topKRecs.map(rating=>(titles(rating.product),rating.rating)).foreach(println)
// we will then need to create a DoubleMatrix object
val itemId = 567
val itemFactor = model.productFeatures.lookup(itemId).head
val itemVector = new DoubleMatrix(itemFactor)
//we are ready to apply our similarity metric to each item
/*val sims = model.productFeatures.map{ case (id, factor) =>
val factorVector = new DoubleMatrix(factor)
val sim = cosineSimilarity(factorVector, itemVector)
(id, sim)
}*/
//we can compute the top 10 most similar items by sorting out the similarity score for each item
//val sortedSims = sims.top(10)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>similarity})
//we can sense check our item-to-item similarity
//val sortedSims2 = sims.top(11)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>simintellij idea debugilarity})
//sortedSims2.slice(1,11).map{case (id,sim)=>(titles(id),sim)}.foreach(println)
//Finally,we can print the 10 items with the highest computed similarity metric to our given item:
//println("Result = "+titles(123))
def cosineSimilarity(vect1:DoubleMatrix,vect2:DoubleMatrix): Double = {
vect1.dot(vect2)/(vect1.norm1()*vect2.norm2())
}
val actualRating = moviesForUser.take(1)(0)
val predictedRatings = model.predict(789,actualRating.product)
//println(predictedRatings)
val squaredError = math.pow(predictedRatings - actualRating.rating,2.0)
val usersProducts = ratings.map{case Rating(user,product,rating) => (user,product)}
val predictions = model.predict(usersProducts).map{case Rating(user,product,rating)
=>((user,product),rating)}
val ratingsAndPredictions = ratings.map{case Rating(user,product,rating)=>((user,product),rating)}
.join(predictions)
val MSE = ratingsAndPredictions.map{case ((user,product),(actual,predicted))
=> math.pow((actual-predicted),2)}.reduce(_ + _)/ratingsAndPredictions.count()
//println("Mean Squared Error = " + MSE)
val RMSE = math.sqrt(MSE)
println("Root Mean Squared Error = "+ RMSE)
def avgPrecisionK(actual:Seq[Int],predicted:Seq[Int],k:Int):Double = {
val predk = predicted.take(k)
var score = 0.0
var numHits = 0.0
for((p,i)<- predk.zipWithIndex){
if(actual.contains(p)){
numHits += 1.0
score += numHits/(i.toDouble+1.0)
}
}
if(actual.isEmpty) {
1.0
}
else{
score/scala.math.min(actual.size,k).toDouble
}
}
val actualMovies = moviesForUser.map(_.product)
val predictedMovies = topKRecs.map(_.product)
//predictedMovies.foreach(println)
val apk10 = avgPrecisionK(actualMovies,predictedMovies,10)
//println(apk10)
//Locality Sensitive Hashing
val itemFactors = model.productFeatures.map{case (id,factor)=>factor}.collect()
val itemMatrix = new DoubleMatrix(itemFactors)
//println(itemMatrix.rows,itemMatrix.columns)
val imBroadcast = sc.broadcast(itemMatrix)
//println(imBroadcast)
val allRecs = model.userFeatures.map{case (userId,array)=>
val userVector = new DoubleMatrix(array)
val scores = imBroadcast.value.mmul(userVector)
val sortedWithId = scores.data.zipWithIndex.sortBy(- _._1)
val recommendedIds = sortedWithId.map(_._2 +1).toSeq
(userId,recommendedIds)
}
println(allRecs)
}
Codeschnipsel zu setzen ist cool, aber dies fehlt die Modellerstellung usw. Die Fehlermeldung ist generisch unter Berücksichtigung des Kontexts. Auf allRecs wurde keine Aktion ausgelöst. Spark ist faul. * Fazit: Diese Frage ist aus vielen Gründen off-topic *. – eliasah
^stimme mit @eliasah überein. Beachten Sie auch, dass Spark 1.6 für scala 2.11 manuell kompiliert werden muss. Sie können diese Einheit zwar testen, aber Sie können sie nicht auf einem Cluster bereitstellen, es sei denn, Sie haben eine manuell kompilierte Version von spark. – marios
@Humayoo könnten Sie mehr Details über die Klasse geben, die RDD enthält, die Sie tun, indem Sie model.userFeatures? – mauriciojost