2016-11-03 6 views
3

Ich benutze Spark 1.6.1 und Schreiben in HDFS. In einigen Fällen scheint es, als ob die ganze Arbeit von einem Thread erledigt wird. Warum das?Slow Parquet Schreiben in HDFS mit Spark

Auch ich brauche Parkett.enable.summary-Metadaten, um die Parkett-Dateien zu Impala zu registrieren.

Df.write().partitionBy("COLUMN").parquet(outputFileLocation); 

Es scheint auch, als ob das alles in einer CPU eines Executors passiert.

16/11/03 14:59:20 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/11/03 14:59:20 INFO mapred.SparkHadoopMapRedUtil: No need to commit output of task because needsTaskCommit=false: attempt_201611031459_0154_m_000029_0 
16/11/03 15:17:56 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 41.9 GB to disk (3 times so far) 
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 0 blocks 
16/11/03 15:21:05 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 1 ms 
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Using user defined output committer class org.apache.parquet.hadoop.ParquetOutputCommitter 
16/11/03 15:21:05 INFO codec.CodecConfig: Compression: GZIP 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Dictionary is on 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Validation is off 
16/11/03 15:21:05 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 
16/11/03 15:21:05 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: 

Dann wieder: -

16/11/03 15:21:05 INFO compress.CodecPool: Got brand-new compressor [.gz] 
16/11/03 15:21:05 INFO datasources.DynamicPartitionWriterContainer: Maximum partitions reached, falling back on sorting. 
16/11/03 15:32:37 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (0 time so far) 
16/11/03 15:45:47 INFO sort.UnsafeExternalSorter: Thread 545 spilling sort data of 31.8 GB to disk (1 time so far) 
16/11/03 15:48:44 INFO datasources.DynamicPartitionWriterContainer: Sorting complete. Writing out partition files one at a time. 
16/11/03 15:48:44 INFO codec.CodecConfig: Compression: GZIP 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet block size to 134217728 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet page size to 1048576 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Parquet dictionary page size to 1048576 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Dictionary is on 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Validation is off 
16/11/03 15:48:44 INFO hadoop.ParquetOutputFormat: Writer version is: PARQUET_1_0 
16/11/03 15:48:44 INFO parquet.CatalystWriteSupport: Initialized Parquet WriteSupport with Catalyst schema: 

Das Schema

Etwa 200 der folgenden Zeilen immer wieder 20-mal oder so.

16/11/03 15:48:44 INFO compress.CodecPool: Got brand-new compressor [.gz] 
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: mem size 135,903,551 > 134,217,728: flushing 1,040,100 records to disk. 
16/11/03 15:49:50 INFO hadoop.InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 89,688,651 

Etwa 200 der folgenden Zeilen

16/11/03 15:49:51 INFO hadoop.ColumnChunkPageWriteStore: written 413,231B for [a17bbfb1_2808_11e6_a4e6_77b5e8f92a4f] BINARY: 1,040,100 values, 1,138,534B raw, 412,919B comp, 8 pages, encodings: [RLE, BIT_PACKED, PLAIN_DICTIONARY], dic { 356 entries, 2,848B raw, 356B comp} 

Dann endlich: -

16/11/03 16:15:41 INFO output.FileOutputCommitter: Saved output of task 'attempt_201611031521_0154_m_000040_0' to hdfs://PATH/_temporary/0/task_201611031521_0154_m_000040 
16/11/03 16:15:41 INFO mapred.SparkHadoopMapRedUtil: attempt_201611031521_0154_m_000040_0: Committed 
16/11/03 16:15:41 INFO executor.Executor: Finished task 40.0 in stage 154.0 (TID 8545). 3757 bytes result sent to driver 

Update: parquet.enable.summary-Metadaten auf false gesetzt.
Reduzierte Partitionen 21.

Df.write().mode(SaveMode.Append).partitionBy("COL").parquet(outputFileLocation); 

Es tat Geschwindigkeit verbessern, aber dauert noch eine Stunde in Anspruch.

Update: - Der Grund für das meiste des Problems ist mehrere linke äußere Verbindung mit sehr kleinen Daten materialisiert kurz vor dem Schreiben. Die Überfüllungen geschehen aufgrund des Append-Modus, der die Datei geöffnet hält. In diesem Modus ist die Standardeinstellung auf 5 offene Dateien beschränkt. Sie können dies mit der Eigenschaft "spark.sql.sources.maxConcurrentWrites"

+0

Haben Sie versucht, repartition während des Lesens der Datei in Ihrem Code zu verwenden? –

+0

Standardmäßig weist Spark einen Executor pro Worker zu. Sie können die Anzahl der Executoren angeben, die Sie verwenden möchten. Welchen Meister hast du benutzt (lokal, Garn)? – ahars

+0

spark-submit '' --master '' Garn-Cluster '' --driver-memory '' 8G '' - Treiber-Cores '' 3 '' --executor-memory '' 8G '' --driver- cores '' 3 '' --executor-cores '' 3 '' --num-executors '' 4 ' – morfious902002

Antwort

0

erhöhen. Nach einigen Optimierungen im Code vor Erreichen des Schreibteils haben wir bessere Schreibzeiten. Vorher konnten wir keine Neupartitionierung durchführen, da die Shuffles mehr als 4-5 GB betrugen. Nach vorherigen Änderungen habe ich den Code von Coalesce in Repartition geändert, der die Daten über alle Executoren verteilt hat, indem jeder CPU in Executoren die gleiche Menge an Daten zum Schreiben gegeben wurde. Wenn Sie also feststellen, dass die von Ihren Jobs erstellten Parkettdateien in der Größe variieren, versuchen Sie, Ihren Dataframe vor dem Schreiben neu zu partitionieren.

Auch kann dies auch mit Schreibleistung helfen: -

sc.hadoopConfiguration.set("parquet.enable.dictionary", "false") 
Verwandte Themen