2017-07-17 1 views
1

Ich erstelle einen neuen Datenrahmen mit einer Handvoll Datensätze von einem Join.Zählen auf Spark Dataframe ist extrem langsam

val joined_df = first_df.join(second_df, first_df.col("key") === 
second_df.col("key") && second_df.col("key").isNull, "left_outer") 
joined_df.repartition(1) 
joined_df.cache() 
joined_df.count() 

Alles ist schnell (unter einer Sekunde) außer dem Count-Vorgang. Die RDD-Konvertierung beginnt und dauert buchstäblich Stunden. Gibt es eine Möglichkeit, die Dinge zu beschleunigen?

INFO MemoryStore: Block rdd_63_140 stored as values in memory (estimated size 16.0 B, free 829.3 MB) 
INFO BlockManagerInfo: Added rdd_63_140 in memory on 192.168.8.52:36413 (size: 16.0 B, free: 829.8 MB) 
INFO Executor: Finished task 140.0 in stage 10.0 (TID 544). 4232 bytes result sent to driver 
INFO TaskSetManager: Starting task 142.0 in stage 10.0 (TID 545, localhost, executor driver, partition 142, PROCESS_LOCAL, 6284 bytes) 
INFO Executor: Running task 142.0 in stage 10.0 (TID 545) 
INFO TaskSetManager: Finished task 140.0 in stage 10.0 (TID 544) in 16 ms on localhost (executor driver) (136/200) 
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks 
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 
INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 200 blocks 
INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms 
+1

Evrything ist schnell unter 1 Sekunde, weil keine Operation stattfindet, bis Sie "count" erreichen. Lesen Sie mehr über [Lazy Evaluation in Spark hier] (https://spark.apache.org/docs/latest/rdd-programming-guide.html#rdd-operations) – philantrovert

Antwort

5

Alles ist schnell (unter einer Sekunde), mit Ausnahme der Zähloperation.

Diese wie folgt gerechtfertigt ist: alle Vorgänge vor den count werden, sind genannt Transformationen und diese Art von Funken Operationen faul heißt es keine Berechnung nicht tun, bevor eine Aktion (count in Ihrem Beispiel) aufrufen.

Das zweite Problem ist in der repartition(1):

im Auge behalten, dass Sie alle die Parallelität durch Funken angeboten verlieren werde und Sie Berechnung wird in einem Testamentsvollstrecker ausgeführt werden (Kern, wenn Ihr im Standalone-Modus sind), so Sie müssen diesen Schritt entfernen oder zu einer Zahl propositional zu der Anzahl Ihrer CPU-Kerne (Standalone-Modus) oder die Anzahl der Executoren (Cluster-Modus) ändern.

Die RDD-Konvertierung beginnt und dauert buchstäblich Stunden.

Wenn ich das richtig verstanden würde verdeckte Sie die DataFrame auf ein RDD, das ist wirklich eine schlechte Praxis in Funken und Sie sollten eine solche Umwandlung wie möglich zu vermeiden, wie Sie können. Dies ist, weil die Daten in DataFrame und Dataset sind codiert mit spezielle Funken Encoder (es heißt tungstant, wenn ich gut daran erinnert), die viel weniger Speicher als die JVM Serialisierung Encoder, so eine Umwandlung bedeuten, dass Funken den Typ ändern wird Ihre Daten von seinem eigenen (die viel weniger Speicher nehmen und funken optimieren viele Kommutierungen durch nur die codierten Daten zu arbeiten und nicht die Daten zu arbeiten mit Serialisierung und dann zu deserialisieren) zu dem JVM-Datentyp und dies warum DataFrame s und Dataset s sind sehr leistungsfähig als RDD s

Hoffe, dass diese Hilfe Sie

+1

Warum ist die Konvertierung von DataFrame zu RDD eine schlechte Praxis? ? Sind RDDs nicht die zugrunde liegenden Instanzen in Dataframes und Datasets? – philantrovert

+3

Dies ist, weil die Daten in 'DataFrame' und' Dataset' mit speziellen Funken Encoder (es heißt ** tungstant **, wenn ich mich gut daran erinnert), die ** viel weniger Speicher ** als die JVM Serialisierung Encoder, also Eine solche Umwandlung bedeutet, dass Spark die Art Ihrer Daten von seiner eigenen ändert (die viel weniger Speicher benötigt und Funken eine Menge Kommutierungen optimieren lässt, indem sie nur die codierten Daten bearbeitet und die Daten nicht serialisiert, um damit zu arbeiten und sie dann zu deserialisieren) der JVM-Datentyp und deshalb 'DataFrame's und' Datasets' sind sehr mächtig als 'RDD's. –

+0

Cool, danke Mann! – philantrovert

1

Wie andere erwähnt haben, die Vorgänge vor count sind „faul“ und nur eine Transformation, anstatt tatsächlich Kraft eine Berechnung registrieren.

Wenn Sie count aufrufen, wird die Berechnung ausgelöst. Dies ist, wenn Spark liest Ihre Daten, führt alle zuvor registrierten Transformationen und berechnet das Ergebnis, das Sie angefordert haben (in diesem Fall eine count).

Die Umwandlung RDD Tritte in und buchstäblich Stunden in Anspruch nimmt

I der Begriff „Umwandlung“ ist vielleicht ein bisschen ungenau denken zu vervollständigen.Was tatsächlich passiert, ist, dass die von Ihnen registrierten DataFrame Transformationen in Operationen übersetzt werden, und diese werden auf die RDD angewendet, die Ihrer DataFrame zugrunde liegt. Es gibt keine Konvertierung per se in dem Code, den Sie hier angegeben haben.

Nebenbei ist es möglich, eine DataFrame zu einem über die DataFrame.rdd Eigenschaft explizit zu konvertieren. Wie in this answer erwähnt, ist dies in der Regel eine schlechte Idee, da Sie einige der Vorteile (in Leistung und API) von gut strukturierten Daten verlieren.