2016-04-03 7 views
1

HalloException in thread "main" org.apache.spark.SparkException: Aufgabe nicht serializable

Ich verwende Scala 2.11.8 und Funken 1.6.1. wenn ich Funktion innerhalb Karte nenne, wirft es die folgende Ausnahme:

"Exception in thread "main" org.apache.spark.SparkException: Task not serializable" 

Sie unter meinem vollständigen Code finden

package moviestream.recommender 
import java.io 
import java.io.Serializable 

import org.apache.spark.{SparkConf, SparkContext} 
import org.apache.spark.mllib.recommendation.ALS 
import org.apache.spark.mllib.recommendation.Rating 
import org.jblas.DoubleMatrix 


class FeatureExtraction{ 

    val conf = new SparkConf().setMaster("local[2]").setAppName("Recommendation") 
    val sc = new SparkContext(conf) 
    val rawData = sc.textFile("data/u.data") 
    val rawRatings = rawData.map(_.split("\t").take(3)) 

    //create rating object from rawratings 
    val ratings = rawRatings.map{case Array(user,movie,rating) => Rating(user.toInt,movie.toInt,rating.toDouble)} 
    //user Spark ALS library to train our model 
    // Build the recommendation model using ALS 

    val model = ALS.train(ratings,50,10,0.01) 
    //val model = ALS.trainImplicit(ratings,50,10,0.01,0.1) //last parameter is alpha 
    val predictedRating = model.predict(789,123) 
    //top ten recommended movies for user id 789, where k= number of recommended(10) 789=userid 
    val topKRecs = model.recommendProducts(789,10) 
    val movies = sc.textFile("data/u.item") 
    val titles = movies.map(line=>line.split("\\|").take(2)).map(array=>(array(0).toInt,array(1))).collectAsMap() 
    //how many movies this user has rated 
    val moviesForUser = ratings.keyBy(_.user).lookup(789) 
    //we will take the 10 movies with the highest ratings ction using the  field of the  object. 
    //moviesForUser.sortBy(-_.rating).take(10).map(rating=>(titles(rating.product),rating.rating)).foreach(println) 
    //let’s take a look at the top 10 recommendations for this user and see what the titles 
    //topKRecs.map(rating=>(titles(rating.product),rating.rating)).foreach(println) 
    // we will then need to create a DoubleMatrix object 

    val itemId = 567 
    val itemFactor = model.productFeatures.lookup(itemId).head 
    val itemVector = new DoubleMatrix(itemFactor) 


    //we are ready to apply our similarity metric to each item 
    /*val sims = model.productFeatures.map{ case (id, factor) => 
    val factorVector = new DoubleMatrix(factor) 
    val sim = cosineSimilarity(factorVector, itemVector) 
    (id, sim) 
    }*/ 

    //we can compute the top 10 most similar items by sorting out the similarity score for each item 
    //val sortedSims = sims.top(10)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>similarity}) 

    //we can sense check our item-to-item similarity 
    //val sortedSims2 = sims.top(11)(Ordering.by[(Int,Double),Double]{case(id,similarity)=>simintellij idea debugilarity}) 
    //sortedSims2.slice(1,11).map{case (id,sim)=>(titles(id),sim)}.foreach(println) 
    //Finally,we can print the 10 items with the highest computed similarity metric to our given item: 
    //println("Result = "+titles(123)) 

    def cosineSimilarity(vect1:DoubleMatrix,vect2:DoubleMatrix): Double = { 
    vect1.dot(vect2)/(vect1.norm1()*vect2.norm2()) 
    } 

    val actualRating = moviesForUser.take(1)(0) 
    val predictedRatings = model.predict(789,actualRating.product) 
    //println(predictedRatings) 
    val squaredError = math.pow(predictedRatings - actualRating.rating,2.0) 

    val usersProducts = ratings.map{case Rating(user,product,rating) => (user,product)} 
    val predictions = model.predict(usersProducts).map{case Rating(user,product,rating) 
          =>((user,product),rating)} 
    val ratingsAndPredictions = ratings.map{case Rating(user,product,rating)=>((user,product),rating)} 
            .join(predictions) 
    val MSE = ratingsAndPredictions.map{case ((user,product),(actual,predicted)) 
      => math.pow((actual-predicted),2)}.reduce(_ + _)/ratingsAndPredictions.count() 
    //println("Mean Squared Error = " + MSE) 
    val RMSE = math.sqrt(MSE) 
    println("Root Mean Squared Error = "+ RMSE) 
    def avgPrecisionK(actual:Seq[Int],predicted:Seq[Int],k:Int):Double = { 
    val predk = predicted.take(k) 
    var score = 0.0 
    var numHits = 0.0 
    for((p,i)<- predk.zipWithIndex){ 
     if(actual.contains(p)){ 
     numHits += 1.0 
     score += numHits/(i.toDouble+1.0) 
     } 
    } 
    if(actual.isEmpty) { 
     1.0 
    } 
    else{ 
     score/scala.math.min(actual.size,k).toDouble 
    } 


    } 

    val actualMovies = moviesForUser.map(_.product) 
    val predictedMovies = topKRecs.map(_.product) 
//predictedMovies.foreach(println) 
    val apk10 = avgPrecisionK(actualMovies,predictedMovies,10) 
    //println(apk10) 
    //Locality Sensitive Hashing 
    val itemFactors = model.productFeatures.map{case (id,factor)=>factor}.collect() 
    val itemMatrix = new DoubleMatrix(itemFactors) 
    //println(itemMatrix.rows,itemMatrix.columns) 
    val imBroadcast = sc.broadcast(itemMatrix) 
    //println(imBroadcast) 
    val allRecs = model.userFeatures.map{case (userId,array)=> 
       val userVector = new DoubleMatrix(array) 
       val scores = imBroadcast.value.mmul(userVector) 
       val sortedWithId = scores.data.zipWithIndex.sortBy(- _._1) 
       val recommendedIds = sortedWithId.map(_._2 +1).toSeq 
       (userId,recommendedIds) 
    } 
    println(allRecs) 


} 
+1

Codeschnipsel zu setzen ist cool, aber dies fehlt die Modellerstellung usw. Die Fehlermeldung ist generisch unter Berücksichtigung des Kontexts. Auf allRecs wurde keine Aktion ausgelöst. Spark ist faul. * Fazit: Diese Frage ist aus vielen Gründen off-topic *. – eliasah

+1

^stimme mit @eliasah überein. Beachten Sie auch, dass Spark 1.6 für scala 2.11 manuell kompiliert werden muss. Sie können diese Einheit zwar testen, aber Sie können sie nicht auf einem Cluster bereitstellen, es sei denn, Sie haben eine manuell kompilierte Version von spark. – marios

+0

@Humayoo könnten Sie mehr Details über die Klasse geben, die RDD enthält, die Sie tun, indem Sie model.userFeatures? – mauriciojost

Antwort

1

Wie oben in den Kommentaren erwähnt, die Frage zu breit ist. Aber eine Vermutung könnte helfen. Sie verwenden innerhalb map der ausgestrahlte Wert imBroadcast. Ich nehme an, dass es Funktionen enthält, die im selben Bereich wie sparkContext deklariert sind, oder? Verschieben Sie sie dann zu einem separaten Objekt.

Verwandte Themen