2014-12-12 6 views
20

Ich habe ein kleines Scala-Programm, das auf einem einzelnen Knoten läuft. Allerdings skaliere ich es so, dass es auf mehreren Knoten ausgeführt wird. Dies ist mein erster solcher Versuch. Ich versuche nur zu verstehen, wie die RDDs in Spark arbeiten, also basiert diese Frage auf Theorie und ist möglicherweise nicht 100% korrekt.Spark RDDs - wie funktionieren sie?

Sagen wir, ich schaffen eine RDD: val rdd = sc.textFile(file)

Jetzt, nachdem ich getan habe, dass, bedeutet das, dass die Datei auf file nun über den Knoten unterteilt ist (vorausgesetzt, alle Knoten Zugriff auf den Dateipfad) ?

Zweitens mag ich die Anzahl der Objekte in der RDD (einfach genug), aber zählen, ich brauche diese Zahl in einer Berechnung zu verwenden, die auf Objekte in der RDD angewandt werden muss - ein Pseudo-Code Beispiel:

rdd.map(x => x/rdd.size) 

sagen lassen gibt es 100 Objekte in rdd, und sagen, dass es 10 Knoten, also eine Anzahl von 10 Objekten pro Knoten (vorausgesetzt, dies ist, wie die RDD Konzept funktioniert), wenn ich jetzt die Methode aufrufen, ist jeder Knoten gehen die Berechnung mit rdd.size als 10 oder 100 durchführen? Da die RDD insgesamt die Größe 100 hat, aber lokal auf jedem Knoten nur 10 ist. Muss ich vor der Berechnung eine Broadcast-Variable erstellen? Diese Frage hängt mit der folgenden Frage zusammen.

Schließlich, wenn ich eine Umwandlung in die RDD, z. rdd.map(_.split("-")), und dann wollte ich die neue size der RDD, muss ich eine Aktion auf der RDD, wie count() ausführen, so dass alle Informationen an den Treiberknoten zurückgeschickt werden?

+1

'Diese Frage bezieht sich auf die folgende Frage.' -> ?? – gsamaras

+0

Ich denke du meintest 'rdd.flatMap (_. Split (" - "))' – lovasoa

Antwort

6

Normalerweise wird die Datei (oder Teile der Datei, wenn sie zu groß ist) auf N Knoten im Cluster repliziert (standardmäßig N = 3 auf HDFS). Es ist nicht beabsichtigt, jede Datei zwischen allen verfügbaren Knoten zu teilen.

Aber für Sie (d. H. Der Client) sollte die Arbeit mit Datei mit Spark transparent sein - Sie sollten keinen Unterschied in rdd.size sehen, egal wie viele Knoten es teilt und/oder repliziert. Es gibt Methoden (zumindest in Hadoop), um herauszufinden, auf welchen Knoten (Teilen der Datei) sich im Moment befinden kann. In einfachen Fällen müssen Sie diese Funktionalität jedoch höchstwahrscheinlich nicht verwenden.

UPDATE: ein Artikel, der Interna RDD: https://cs.stanford.edu/~matei/papers/2012/nsdi_spark.pdf

+0

Danke für die Antwort. Also, für eine Berechnung wie: 'rdd.filter (...). Map (x => x * rdd.count)' ist der 'filter' Schritt auf jedem Knoten ausgeführt, bevor irgendein Knoten den 'map' Schritt ausführen kann? Weil der "map" -Schritt offensichtlich von dem "Filter" -Schritt abhängt, der bereits an jedem Knoten durchgeführt wird, da die "map" "rdd.count" enthält. Danke noch einmal. – monster

+0

Natürlich, weil 'map' auf' filter' aufgebaut ist (lesen Sie das "lineage" -Konzept im Artikel). – Ashalynd

+0

Danke für die Informationen, es ist ein gutes Buch, aber ich frage mich jetzt, was ist der Zweck einer Broadcast-Variable? Danke nochmal, danke! – monster

18
val rdd = sc.textFile(file) 

bedeutet das, dass die Datei nun über den Knoten partitioniert ist?

Die Datei bleibt wo sie war. Die Elemente der resultierenden RDD[String] sind die Zeilen der Datei. Die RDD ist partitioniert, um der natürlichen Partitionierung des zugrunde liegenden Dateisystems zu entsprechen. Die Anzahl der Partitionen hängt nicht von der Anzahl der Knoten ab, die Sie haben.

Es ist wichtig zu verstehen, dass wenn diese Zeile ausgeführt wird nicht die Datei (en) lesen. Die RDD ist ein faules Objekt und wird nur etwas tun, wenn es muss. Das ist großartig, weil es unnötigen Speicherverbrauch vermeidet.

Zum Beispiel, wenn Sie val errors = rdd.filter(line => line.startsWith("error")) schreiben, passiert immer noch nichts.Wenn Sie dann val errorCount = errors.count schreiben, müssen Sie jetzt die Reihenfolge der Operationen ausführen, da das Ergebnis count eine ganze Zahl ist. Was jeder Worker-Kern (Executor-Thread) dann parallel tut, ist eine Datei (oder ein Teil einer Datei) zu lesen, durch seine Zeilen zu iterieren und die Zeilen zu zählen, die mit "error" beginnen. Puffern und GC beiseite, nur eine einzelne Zeile pro Kern wird im Speicher zu einer Zeit sein. Dies ermöglicht es, mit sehr großen Daten zu arbeiten, ohne viel Speicher zu verwenden.

Ich mag die Anzahl der Objekte in der RDD jedoch zählen, ich brauche diese Zahl in einer Berechnung zu verwenden, die auf Objekte in der RDD angewandt werden muss - ein Pseudo-Code Beispiel:

rdd.map(x => x/rdd.size) 

Es gibt keine rdd.size Methode. Es gibt rdd.count, die die Anzahl der Elemente in der RDD zählt. rdd.map(x => x/rdd.count) wird nicht funktionieren. Der Code wird versuchen, die rdd Variable an alle Arbeiter zu senden und wird mit einem NotSerializableException fehlschlagen. Was Sie tun können ist:

val count = rdd.count 
val normalized = rdd.map(x => x/count) 

Dies funktioniert, weil count ein Int und serialisiert werden kann.

Wenn ich eine Umwandlung in die RDD, z. rdd.map(_.split("-")), und dann wollte ich die neue Größe der RDD, muss ich eine Aktion auf der RDD, wie count(), so dass alle Informationen an den Treiberknoten zurück gesendet werden?

map ändert nicht die Anzahl der Elemente. Ich weiß nicht, was du mit "Größe" meinst. Aber ja, Sie müssen eine Aktion wie count ausführen, um etwas aus der RDD zu bekommen. Sie sehen, dass überhaupt keine Arbeit ausgeführt wird, bis Sie eine Aktion ausführen. (Wenn Sie count ausführen, wird nur die Anzahl der Partitionen zurück an den Treiber gesendet, natürlich nicht "alle Informationen".)

+0

Ich habe ein [tag: Python] Beispiel basierend auf Ihrer Antwort in [der Dokumentation] (http://stackoverflow.com/documentation/apache-spark/833/introduction-to-apache-spark#t=20160817171702245426), wenn Sie mögen es, Sie können es in Ihre Antwort aufnehmen! – gsamaras

+0

Dies sollte die akzeptierte Antwort sein. Es beantwortet alle Teile vollständig und korrekt. – tejaskhot

Verwandte Themen