2016-08-15 7 views
2

Mit Apache Spark 2.0 mit Pyspark, habe ich einen DataFrame mit 1000 Zeilen von Daten und möchte diesen DataFrame in zwei separate DataFrames teilen/teilen; ein zufälliges Saatgut wird nicht ausreichen, da ich beabsichtige, diese Spaltung zu wiederholen:Aufteilen von Datenrahmen in Apache Spark

  • Der erste Datenrahmen
  • Der zweite Datenrahmen sollten die restlichen 250 Zeilen

Hinweis enthalten die ersten 750 Zeilen enthalten sollte Methode mehrere Male, und möchten die Kontrolle darüber haben, welche Daten für den ersten und zweiten DataFrame verwendet werden.

Ich habe gefunden, die Take (n) -Methode nützlich sein, um das erste Ergebnis zu generieren.
Aber ich finde nicht den richtigen Weg (oder irgendeinen anderen Weg), um den zweiten DataFrame zu bekommen.

Alle Hinweise in die richtige Richtung würden sehr geschätzt werden.

Vielen Dank im Voraus.

Update: Ich habe jetzt eine Lösung durch Sortieren und Anwenden Take (n) wieder gefunden. Das fühlt sich immer noch wie eine suboptimale Lösung aber:

# First DataFrame, simply take the first 750 rows 
part1 = spark.createDataFrame(df.take(750)) 
# Second DataFrame, sort by key descending, then take 250 rows 
part2 = spark.createDataFrame(df.rdd.sortByKey(False).toDF().take(250)) 
# Then reverse the order again, to maintain the original order 
part2 = part2.rdd.sortByKey(True).toDF() 
# Then rename the columns as they have been reset to "_1" and "_2" by the sorting process 
part2 = part2.withColumnRenamed("_1", "label").withColumnRenamed("_2", "features") 

Antwort

3

Sie haben Recht mit nehmen in Frage zu stellen, weil sie die Daten an den Fahrer zieht und dann createDataFrame umverteilt es über den Cluster. Dies ist ineffizient und kann fehlschlagen, wenn der Treiber nicht über genügend Speicher zum Speichern der Daten verfügt.

Hier ist eine Lösung, die eine Zeile Indexspalte und Scheiben auf das schafft:

from pyspark.sql.functions import monotonicallyIncreasingId 

idxDf = df.withColumn("idx", monotonicallyIncreasingId()) 
part1 = idxDf.filter('idx < 750') 
part2 = idxDf.filter('idx >= 750') 
Verwandte Themen