2016-08-28 7 views
3

In Spark, was ist der beste Weg, Dateigröße der Ausgabedatei zu steuern. Zum Beispiel können wir in log4j die maximale Dateigröße angeben, nach der die Datei rotiert.Wie kontrollieren Sie die Größe der Ausgabedatei?

Ich bin auf der Suche nach einer ähnlichen Lösung für Parkettfeilen. Gibt es beim Schreiben einer Datei eine maximale Dateigröße?

Ich habe einige Workarounds, aber keine ist gut. Wenn ich Dateien auf 64 MB beschränken möchte, besteht eine Option darin, die Daten neu zu partitionieren und in den temporären Speicherort zu schreiben. Und fügen Sie dann die Dateien zusammen, indem Sie die Dateigröße im temporären Speicherort verwenden. Aber es ist schwierig, die richtige Dateigröße zu bekommen.

+0

Nur neugierig zu wissen, was ist der Anwendungsfall der gleichen Größe in Ausgabedateien. – sachinjain024

+0

Der Versuch, die Dateigröße konsistent zu halten. Wenn ich zum Beispiel Dateien in eine andere Partition schreibe, sind manche Partitionsdateien 10 mal größer. df.repartition (35) .write.mode (SaveMode.Overwrite) .partitionBy (Liste: _ *). Parket ("tmp5") – user447359

Antwort

10

Es ist unmöglich für Spark, die Größe von Parquet-Dateien zu steuern, da der DataFrame im Speicher vor dem Schreiben auf Festplatten codiert und komprimiert werden muss. Bevor dieser Prozess abgeschlossen ist, kann die tatsächliche Dateigröße auf dem Datenträger nicht geschätzt werden.

Also meine Lösung ist:

  • die Datenrahmen zu HDFS schreiben, df.write.parquet(path)
  • die Verzeichnisgröße Erhalten und die Anzahl der Dateien

    val fs = FileSystem.get(sc.hadoopConfiguration) 
    val dirSize = fs.getContentSummary(path).getLength 
    val fileNum = dirSize/(512 * 1024 * 1024) // let's say 512 MB per file 
    
  • Lesen Sie das Verzeichnis berechnen und re- schreiben an HDFS

    val df = sqlContext.read.parquet(path) 
    df.coalesce(fileNum).write.parquet(another_path) 
    

    Verwenden Sie das Original NICHT df, sonst löst es Ihren Job zweimal aus.

  • Löschen Sie das alte Verzeichnis und benennen Sie das neue Verzeichnis zurück

    fs.delete(new Path(path), true) 
    fs.rename(new Path(newPath), new Path(path)) 
    

Diese Lösung hat den Nachteil, dass sie die Daten zweimal schreiben muss, die Scheibe IO verdoppelt, aber jetzt ist das die einzige Lösung.

-1

Hier ist meine Lösung, und es macht Spaß für mich.

val repartition_num = 20 
val hqc = new org.apache.spark.sql.hive.HiveContext(sc) 
val t1 = hqc.sql("select * from customer") 

// 20 parquet files will be generated in hdfs dir 
// JUST control your file with partition number 
t1.repartition(repartition_num).saveAsParquetFile(parquet_dir) 

Und das ist das Ergebnis:

> hadoop fs -ls /tpch-parquet/customer/*.parquet | wc -l 
20 
+0

-1. Dies beantwortet nicht die Frage des OP (Größe der Kontrolldatei), sondern eine völlig andere Frage (Kontrolle der Anzahl der Dateien) – synhershko

1

Wie andere erwähnt haben Sie nicht explizit eine Zielgröße pro Datei treffen kann. Sie können jedoch alle Ausgabedateien ungefähr so ​​viele Zeilen enthalten. Wenn Sie im Durchschnitt wissen, wie Ihre Komprimierungsrate aussieht, können Sie mit der gleichmäßigen Verteilung von Zeilen auf Ausgabedateien bis zu max_rows konsistente Größen für Ihr Ziel erzielen.

Dies ist einfacher gesagt als getan, wenn Sie eine Partition machen, bevor Sie schreiben. Hier einige Pseudo-Code, wie wir es tun:

-- #3 distribute partitionC's rows based on partitions plus random integer that pertains to file number 
select * from dataframe_table as t4 
inner join 

    -- #2 calculate the number of output files per partition 
    ((select t1.partitionA, t1.partitionB, cast(t2.partition_num_rows/max_rows as int) + 1 as partition_num_files from dataframe_table) as t1 
     inner join 

     -- #1 determine number of rows in output partition 
     (select partitionA, partitionB, count(*) as partition_num_rows from dataframe_table group by (partitionA, partitionB)) as t2 
     on t1.partitionA = t2.partitionA and t1.partitionB = t2.partitionB) as t3 

on t3.partitionA = t4.partitionA and t3.partitionB=t4.partitionB 
distribute by (t4.partitionA, t4.partitionC, floor(rand() * t3.partition_num_files)) sort by (partitionC, sortfield) 

ich eine Art auf die Partition hier enthalten, weil in unserem Anwendungsfall diese drastisch Kompression verbessert, während nur minimal Leistung zu beeinträchtigen.

Und wenn Ihre Ergebnisse aus Schritt 1 und 2 ausreichend klein sind, kann Spark möglicherweise Broadcast beitreten, um sie zu beschleunigen.

Verwandte Themen