2016-12-01 5 views
7

Ich lese durch theoretische Unterschiede zwischen Karte und mapPartitions, & bin sehr klar, wenn sie in verschiedenen Situationen zu verwenden.Spark RDD-Karte vs mapPartitions

Aber mein Problem, das unten beschrieben wird, basiert mehr auf GC-Aktivität & Speicher (RAM). Bitte lesen Sie unten für das Problem: -

=> Ich schrieb eine Map-Funktion, um Zeile in String zu konvertieren. Eine Eingabe von RDD [org.apache.spark.sql.Row] würde also RDD [String] zugeordnet werden. Aber mit diesem Ansatz würde Kartenobjekt für jede Zeile einer RDD erstellt werden. Somit kann die Erzeugung einer solch großen Anzahl von Objekten die GC-Aktivität erhöhen.

=> Um oben zu lösen, dachte ich über die Verwendung von mapPartitions nach. Diese Anzahl von Objekten entspricht also der Anzahl der Partitionen. mapPartitions gibt Iterator als Eingabe und akzeptiert die Rückgabe und java.lang.Iterable. Aber die meisten der Iterable wie Array, List usw. sind im Speicher. Also, wenn ich eine riesige Menge an Daten habe, dann würde das Erstellen eines Iterable auf diese Weise zu nicht genügend Arbeitsspeicher führen? oder Gibt es eine andere Sammlung (Java oder Scala), die hier verwendet werden sollte (um auf Diskette zu übertragen, falls der Speicher zu füllen beginnt)? Oder sollten wir nur mapPartitions verwenden, wenn RDD vollständig im Speicher ist?

Vielen Dank im Voraus. Jede Hilfe würde sehr geschätzt werden.

Antwort

1

Wenn Sie denken über JavaRDD.mapPartitions es dauert FlatMapFunction (oder eine Variante wie DoubleFlatMapFunction), die zurück Iterator nicht Iterable erwartet wird. Wenn die Sammlung unterlegt ist, müssen Sie sich keine Sorgen machen.

RDD.mapPartitions übernimmt eine Funktion von Iterator bis Iterator.

I allgemein, wenn Sie Referenzdaten verwenden, können Sie mapPartitions durch map ersetzen und statisches Element verwenden, um Daten zu speichern. Dies wird den gleichen Footprint haben und einfacher zu schreiben sein.

+0

Eine statische Variable ist nicht sinnvoll. als Ihr Kartenobjekt wird jedes Mal erstellt. Wenn man auch Paralleism auf einer einzelnen Maschine erreichen kann, dann hätte ich eine synchronisierte statische Variable haben sollen. Außerdem, FlatMapFunction lässt Sie überschreiben "öffentliche Iterable Anruf (Iterator itr)" –

+1

Auch können Sie bitte lassen Sie mich wissen, die Sammlung, die faul ist oder auf Festplatte im Falle Speicher (wenn es maximale Speicher-Puffer-Grenze zu erreichen) verschütten? –