2015-12-02 13 views
7

Fehler:SparkError: Gesamtgröße der serialisierten Ergebnisse von XXXX Aufgaben (2,0 GB) ist größer als spark.driver.maxResultSize (2,0 GB)

ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB) 

Ziel: Erhalten Empfehlung für alle Nutzer unter Verwendung des Modells und sich mit den Testdaten jedes Benutzers überschneiden und ein Überlappungsverhältnis erzeugen.

Ich habe ein Empfehlungsmodell mit Funken MLLIB erstellen. Ich bewerte das Überlappungsverhältnis von Testdaten pro Benutzer und empfohlene Elemente pro Benutzer und erzeuge das mittlere Überlappungsverhältnis.

def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = { 

    val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey 
    val n = testData.count 

    val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20) 
     .mapValues(_.map(r => r.product)) 

    val overlaps = testData.join(recommendations).map(x => { 
     val moviesPerUserInRecs = x._2._2.toSet 
     val moviesPerUserInTest = x._2._1.toSet 
     val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest) 
     if(localHitRatio.size > 0) 
     1 
     else 
     0 
    }).filter(x => x != 0).count 

    var r = 0.0 
    if (overlaps != 0) 
     r = overlaps/n 

    return r 

    } 

Aber das Problem hier ist, dass es über maxResultSize Fehler werfen endet. In meiner Spark-Konfiguration habe ich folgendes getan, um die maxResultSize zu erhöhen.

val conf = new SparkConf() 
conf.set("spark.driver.maxResultSize", "6g") 

Aber das hat das Problem nicht lösen, ging ich in die Menge fast nahe, dass ich den Treiber Speicher noch das Problem zuteilen nicht lösen bekommen haben. Während der Code ausgeführt wird, habe ich meine Funkenarbeit im Auge behalten und was ich gesehen habe, ist ein bisschen rätselhaft.

[Stage 281:==> (47807 + 100)/1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB) 

Bei über Stufe Code ausgeführt wird MatrixFactorization Code in Funken mllib recommendForAll um line 277 (nicht ganz sicher, die Zeilennummer).

private def recommendForAll(
     rank: Int, 
     srcFeatures: RDD[(Int, Array[Double])], 
     dstFeatures: RDD[(Int, Array[Double])], 
     num: Int): RDD[(Int, Array[(Int, Double)])] = { 
    val srcBlocks = blockify(rank, srcFeatures) 
    val dstBlocks = blockify(rank, dstFeatures) 
    val ratings = srcBlocks.cartesian(dstBlocks).flatMap { 
     case ((srcIds, srcFactors), (dstIds, dstFactors)) => 
     val m = srcIds.length 
     val n = dstIds.length 
     val ratings = srcFactors.transpose.multiply(dstFactors) 
     val output = new Array[(Int, (Int, Double))](m * n) 
     var k = 0 
     ratings.foreachActive { (i, j, r) => 
      output(k) = (srcIds(i), (dstIds(j), r)) 
      k += 1 
     } 
     output.toSeq 
    } 
    ratings.topByKey(num)(Ordering.by(_._2)) 
    } 

recommendForAll Methode von recommendProductsForUsers Methode aufgerufen.

Aber sieht aus, als ob die Methode 1M Aufgaben ausdreht. Daten, die gefüttert werden, kommen von 2000 Teildateien, daher bin ich verwirrt, wie es begann, 1M Aufgaben auszuspucken, und ich denke, das könnte das Problem sein.

Meine Frage ist, wie kann ich dieses Problem tatsächlich lösen. Ohne diesen Ansatz ist es sehr schwer zu berechnen overlap ratio oder [email protected]. Dies ist auf Funken 1,5 (cloudera 5.5)

Antwort

0

das 2GB Problem zum Spark Gemeinschaft ist nicht neu: https://issues.apache.org/jira/browse/SPARK-6235

Re/die Partitionsgröße als 2 GB größer, versuchen neu zu partitionieren (myRdd.repartition(parallelism)) Ihre RDD auf eine größere Anzahl von Partitionen (w/r/t/Ihre aktuelle Ebene der Parallelität), wodurch die Größe jeder einzelnen Partition reduziert wird.

Re/die Anzahl der Aufgaben zentriert (daher Partitionen erstellt), meine Hypothese ist, dass es aus der API-Aufruf srcBlocks.cartesian(dstBlocks), die eine Ausgabe RDD aus (z = srcBlocks Anzahl der Partitionen * Anzahl der Partitionen dstBlocks produziert) Partitionen.

In diesem Fall könnten Sie die Verwendung der API myRdd.coalesce(parallelism) anstatt der repartition-API in Betracht ziehen, um das Mischen zu vermeiden (und Probleme mit der Serialisierung von Partitionen).

Verwandte Themen