2017-11-21 3 views
0

Ich möchte die Anzahl der Zeilen in einer RDD auf halbem Weg zwischen den Start- und Endtransformationen protokollieren. Mein Code sieht derzeit wie folgt aus:Sollten Sie eine RDD immer vor einer Zwischenzählung zwischenspeichern?

val transformation1 = firstTransformation(inputdata).cache // Is this cache recommended or can I remove it? 
log("Transformation1 count: " + tranformation1.count) 
val tranformation2 = secondTransformation(transformation1).cache 
val finalX = transformation2.filter(row => row.contains("x")) 
val finalY = tranformation2.filter(row => row.contains("y")) 

Mein Problem ist, dass transformation1 ein großer RDD ist und viel Speicher in Anspruch nimmt (im Speicher passt aber verursacht Probleme Speicher später). Ich weiß jedoch, dass es normalerweise empfohlen wird, dass es zwischengespeichert wird, da ich 2 verschiedene Operationen auf Transformation1 (.count() und secondTransformation()) ausführe.

Diese Art von Szenario ist wahrscheinlich sehr häufig, also, was ist der empfohlene Weg, damit umzugehen? Sollten Sie eine RDD immer vor einer Zwischenzählung zwischenspeichern, oder kann ich die .cache() auf transformation1 entfernen?

Antwort

1

Wenn Sie Probleme mit dem Speicher haben, sollten Sie so bald wie möglich aus dem Telefonbuch austragen und auf der Festplatte bleiben.

val transformation1 = firstTransformation(inputdata).persist(StorageLevel.DISK_ONLY) // Is this cache recommended or can I remove it? 
log("Transformation1 count: " + tranformation1.count) 
val tranformation2 = secondTransformation(transformation1).persist(StorageLevel.DISK_ONLY) 
val finalX = transformation2.filter(row => row.contains("x")) 
val finalY = tranformation2.filter(row => row.contains("y")) 
// All the actions are done 
transformation1.unpersist() 
transformation2.unpersist() 

wenn Sie unpersist vor den Speicherprobleme können geschehen, es wäre besser, wenn Sie statt beharren auf der Festplatte zwischenzuspeichern

+0

Wenn unpersisting Spark Abstürze und sagt 'Exception in thread„main“org.apache .spark.rpc.RpcTimeoutException: Terminüberschreitungen nach [120 Sekunden]. Diese Zeitüberschreitung wird von spark.rpc.askTimeout' –

+0

gesteuert, das ist ein anderes Problem, werfen Sie einen Blick auf https://stackoverflow.com/questions/41123846/scala-spark-dataframes-join-java-util-concurrent-timeoutexception-futures -t – Mikel

Verwandte Themen