ERROR TaskSetManager: Total size of serialized results of XXXX tasks (2.0 GB) is bigger than spark.driver.maxResultSize (2.0 GB)
Ziel: Erhalten Empfehlung für alle Nutzer unter Verwendung des Modells und sich mit den Testdaten jedes Benutzers überschneiden und ein Überlappungsverhältnis erzeugen.
Ich habe ein Empfehlungsmodell mit Funken MLLIB erstellen. Ich bewerte das Überlappungsverhältnis von Testdaten pro Benutzer und empfohlene Elemente pro Benutzer und erzeuge das mittlere Überlappungsverhältnis.
def overlapRatio(model: MatrixFactorizationModel, test_data: org.apache.spark.rdd.RDD[Rating]): Double = {
val testData: RDD[(Int, Iterable[Int])] = test_data.map(r => (r.user, r.product)).groupByKey
val n = testData.count
val recommendations: RDD[(Int, Array[Int])] = model.recommendProductsForUsers(20)
.mapValues(_.map(r => r.product))
val overlaps = testData.join(recommendations).map(x => {
val moviesPerUserInRecs = x._2._2.toSet
val moviesPerUserInTest = x._2._1.toSet
val localHitRatio = moviesPerUserInRecs.intersect(moviesPerUserInTest)
if(localHitRatio.size > 0)
1
else
0
}).filter(x => x != 0).count
var r = 0.0
if (overlaps != 0)
r = overlaps/n
return r
}
Aber das Problem hier ist, dass es über maxResultSize
Fehler werfen endet. In meiner Spark-Konfiguration habe ich folgendes getan, um die maxResultSize
zu erhöhen.
val conf = new SparkConf()
conf.set("spark.driver.maxResultSize", "6g")
Aber das hat das Problem nicht lösen, ging ich in die Menge fast nahe, dass ich den Treiber Speicher noch das Problem zuteilen nicht lösen bekommen haben. Während der Code ausgeführt wird, habe ich meine Funkenarbeit im Auge behalten und was ich gesehen habe, ist ein bisschen rätselhaft.
[Stage 281:==> (47807 + 100)/1000000]15/12/01 12:27:03 ERROR TaskSetManager: Total size of serialized results of 47809 tasks (6.0 GB) is bigger than spark.driver.maxResultSize (6.0 GB)
Bei über Stufe Code ausgeführt wird MatrixFactorization Code in Funken mllib recommendForAll
um line 277
(nicht ganz sicher, die Zeilennummer).
private def recommendForAll(
rank: Int,
srcFeatures: RDD[(Int, Array[Double])],
dstFeatures: RDD[(Int, Array[Double])],
num: Int): RDD[(Int, Array[(Int, Double)])] = {
val srcBlocks = blockify(rank, srcFeatures)
val dstBlocks = blockify(rank, dstFeatures)
val ratings = srcBlocks.cartesian(dstBlocks).flatMap {
case ((srcIds, srcFactors), (dstIds, dstFactors)) =>
val m = srcIds.length
val n = dstIds.length
val ratings = srcFactors.transpose.multiply(dstFactors)
val output = new Array[(Int, (Int, Double))](m * n)
var k = 0
ratings.foreachActive { (i, j, r) =>
output(k) = (srcIds(i), (dstIds(j), r))
k += 1
}
output.toSeq
}
ratings.topByKey(num)(Ordering.by(_._2))
}
recommendForAll
Methode von recommendProductsForUsers
Methode aufgerufen.
Aber sieht aus, als ob die Methode 1M Aufgaben ausdreht. Daten, die gefüttert werden, kommen von 2000 Teildateien, daher bin ich verwirrt, wie es begann, 1M Aufgaben auszuspucken, und ich denke, das könnte das Problem sein.
Meine Frage ist, wie kann ich dieses Problem tatsächlich lösen. Ohne diesen Ansatz ist es sehr schwer zu berechnen overlap ratio
oder [email protected]
. Dies ist auf Funken 1,5 (cloudera 5.5)