2016-03-22 19 views
2

Ich fange gerade an zu lernen, Funken mit Scala zu benutzen. Das Problem, an dem ich arbeite, erfordert, dass ich eine Datei lese, jede Zeile auf ein bestimmtes Zeichen aufspalte, dann die Zeilen filtere, bei denen eine der Spalten mit einem Prädikat übereinstimmt und schließlich eine Spalte lösche. Also ist die grundlegende, naive Implementierung eine Karte, dann ein Filter und dann eine andere Karte.Scala sammeln ineffizient in Spark?

Das bedeutete 3 mal durch die Sammlung gehen und das schien mir ziemlich unvernünftig. Also habe ich versucht, sie durch ein Collect zu ersetzen (das Collect, das eine Teilfunktion als Argument übernimmt). Und zu meiner Überraschung ließ es das Ganze viel langsamer laufen. Ich habe es in regelmäßigen Scala-Sammlungen versucht; Wie erwartet, ist letzteres viel schneller.

Also warum ist das? Meine Idee ist, dass die Karte und der Filter und die Karte nicht sequentiell angewendet werden, sondern in eine Operation gemischt werden; Mit anderen Worten, wenn eine Aktion bewertet wird, wird jedes Element der Liste überprüft und die ausstehenden Operationen werden ausgeführt. Ist das richtig ? Aber warum tut das Collect so schlecht?

EDIT: ein Codebeispiel zu zeigen, was ich tun möchte:

Die naive Art und Weise:

sc.textFile(...).map(l => { 
    val s = l.split(" ") 
    (s(0), s(1)) 
}).filter(_._2.contains("hello")).map(_._1) 

Die collect Weg:

sc.textFile(...).collect { 
    case s if(s.split(" ")(0).contains("hello")) => s(0) 
} 
+0

Ich versuche zu verstehen, was genau Sie auf "nicht-naive" Art und Weise gespart haben. Welche Ressource wird Ihrer Meinung nach im zweiten Ansatz weniger genutzt? Sie sammeln Ihre RDD, die die Bewegung der gesamten Daten auf eine einzelne Maschine erzwingt, was normalerweise mit einer Leistungseinbuße oder OOM-Ausnahmen einhergeht. –

+0

Nun, ich glaube nicht, dass ich das tue. Collect mit einer partiellen Funktion als Argument "gibt eine RDD zurück, die alle übereinstimmenden Werte enthält, indem f angewendet wird." Also von dem, was ich verstehe, sollte es sich wie Karte verhalten, außer dass es eine Teilfunktion ist. Auch wenn es nicht schneller ist, sollte es nicht viel langsamer sein. – Nico

+0

Ohne Bezug auf die Frage, Ihre Beispiele machen verschiedene Dinge. "Naive way" gibt die ersten Teile der Split-Strings zurück, während "collect way" die ersten Zeichen der Eingabe-Strings zurückgibt. – Aivean

Antwort

4

Die Antwort liegt in der Umsetzung von collect:

/** 
* Return an RDD that contains all matching values by applying `f`. 
*/ 
def collect[U: ClassTag](f: PartialFunction[T, U]): RDD[U] = withScope { 
    val cleanF = sc.clean(f) 
    filter(cleanF.isDefinedAt).map(cleanF) 
} 

Wie Sie sehen können, ist es die gleiche Sequenz von filter ->map, aber in Ihrem Fall weniger effizient.

In scala beide isDefinedAt und apply Methoden von PartialFunction bewerten if Teil.

Also, in Ihrem "sammeln" -Beispiel wird split zweimal für jedes Eingabeelement durchgeführt.

+2

Und hier war ich, dachte Collect war etwas anderes als Karte und Filter. Vielen Dank für Ihre Antwort. – Nico