2017-12-16 2 views
1

Darunter sehen Sie eine vereinfachte Version von dem, was ich versuche zu tun. Ich lade ein Dataframe von 150 parquets (> 10 TB) in S3 gespeichert, dann gebe ich diesem Datenfeld eine ID-Spalte mit func.monotonically_increasing_id(). Danach speichere ich ein paar Abweichungen von diesem dataframe. Die Funktion, die ich anwenden sind ein wenig komplizierter, als ich hier vorstellen, aber ich hoffe, dass dies den Punkt überWerden die gleichen IDs immer über denselben Logikplan ausgegeben?

bekommt
DF_loaded = spark.read.parquet(/some/path/*/') 
DF_with_IDs = DF_loaded.withColumn('id',func.monotonically_increasing_id()) 
#creating parquet_1  
DF_with_IDs.where(col('a').isNotNull()).write.parquet('/path/parquet_1/') 
#creating parquet_2 
DF_with_IDs.where(col('b').isNotNull()).write.parquet('/path/parquet_2/') 

jetzt bemerkte ich, dass Funken nach dem Erstellen parquet_1 lädt wieder alle Daten von S3 parquet_2 zu erstellen. Jetzt mache ich mir Sorgen, dass die IDs parquet_1 nicht denen von parquet_2 entsprechen. Die gleiche Zeile hat unterschiedliche IDs in beiden parquets. Denn so weit ist, ich verstehe es die Logik Plan Funke kommt mit sieht wie folgt auf:

#parquet_1 
load_data -> give_ids -> make_selection -> write_parquet 
#parquet_2 
load_data -> give_ids -> make_selection -> write_parquet 

So sind die gleichen mit den gleichen Reihen in beiden parquets gegeben IDs?

Antwort

1

Solange:

  • verwenden Sie eine aktuelle Version von Funken (SPARK-13473, SPARK-14241).
  • Es gibt keine Konfigurationsänderung zwischen Aktionen (Änderungen in einer Konfiguration können sich auf die Anzahl der Partitionen und folglich auf die IDs auswirken).

monotonically_increasing_id sollte stabil sein. Beachten Sie, dass dies Prädikat-Pushdown deaktiviert.

rdd.zipWithindex.toDF sollte stabil unabhängig von der Konfiguration sein, so könnte es vorzuziehen sein.

+0

Ich bin auf 2.2. so ist es gelöst – Thagor

Verwandte Themen