2017-03-20 2 views
1

Ich lese eine JSON-Datei eines sozialen Netzwerks in Funken. Ich bekomme von diesen einen Datenrahmen, den ich explodiere, um Paare zu bekommen. Dieser Prozess funktioniert perfekt. Später möchte ich dies in RDD (für die Verwendung mit GraphX) konvertieren, aber die RDD-Erstellung dauert sehr lange.Funken Dataframe Konvertierung in RDD dauert eine lange Zeit

val social_network = spark.read.json(my/path) // 200MB 
val exploded_network = social_network. 
    withColumn("follower", explode($"followers")). 
    withColumn("id_follower", ($"follower").cast("long")). 
    withColumn("id_account", ($"account").cast("long")). 
    withColumn("relationship", lit(1)). 
    select("id_follower", "id_account", "relationship") 
val E1 = exploded_network.as[(VertexId, VertexId, Int)] 
val E2 = E1.rdd 

für Um zu überprüfen, wie der Prozess ausgeführt wird, zähle ich bei jedem Schritt

scala> exploded_network.count 
res0: Long = 18205814 // 3 seconds 

scala> E1.count 
res1: Long = 18205814 // 3 seconds 

scala> E2.count // 5.4 minutes 
res2: Long = 18205814 

Warum ist RDD Umwandlung 100x zu nehmen?

Antwort

0

In Spark ist ein DataFrame eine verteilte Sammlung von Daten, die in benannte Spalten (Tabellenformat) organisiert sind. Es ist konzeptionell äquivalent zu einer Tabelle in einer relationalen Datenbank oder einem Datenrahmen in R/Python, aber mit umfangreicheren Optimierungen. Und aufgrund seines Tabellenformats verfügt es über Metadaten, die es Spark ermöglichen, eine Anzahl von Optimierungen im Hintergrund auszuführen. Die DataFrame-API verwendet die erweiterten Optimierungen von Spark wie die Tungsten-Ausführungs-Engine und den Catalyst Optimizer, um die Daten besser zu verarbeiten.

Während in einer RDD, RDDs nicht das Schema der gegebenen Datensatz ableiten und erfordert der Benutzer, ein Schema zur Verfügung stellen. Auch Rdd kann Funken Optimierer wie Catalyst Optimizer und Tungsten Execution Engine (wie oben erwähnt) nicht nutzen.

So DataFrame haben viel bessere Leistung als RDDs. In Ihrem Fall, wenn Sie eine RDD anstelle von Datenrahmen verwenden müssen, würde ich empfehlen, den Datenrahmen zwischenzuspeichern, bevor Sie in rdd konvertieren. Das sollte deine rdd-Leistung verbessern.

val E1 = exploded_network.cache() 
val E2 = E1.rdd 

Hoffe das hilft.

Verwandte Themen