2016-10-20 2 views
7

Ich benutze Spark 1.6.0 und Scala.Wie kann ein Datenrahmen als komprimierte (gezippte) CSV-Datei gespeichert werden?

Ich möchte einen DataFrame als komprimiertes CSV-Format speichern. Hier

ist, was ich bisher haben (davon ausgehen, ich habe schon df und sc als SparkContext):

//set the conf to the codec I want 
sc.getConf.set("spark.hadoop.mapred.output.compress", "true") 
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "true") 
sc.getConf.set("spark.hadoop.mapred.output.compression.codec", "org.apache.hadoop.io.compress.GzipCodec") 
sc.getConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK") 

df.write 
    .format("com.databricks.spark.csv") 
    .save(my_directory) 

Der Ausgang nicht in gz Format ist.

+0

In Verbindung stehende Frage über RDDs: http://stackoverflow.com/questions/32231650/spark-rdd-saveastextfile-gzip –

Antwort

4

Auf dem Funken csv Github: https://github.com/databricks/spark-csv

One lesen:

codec: Kompressions-Codec zu verwenden, wenn eine Datei zu speichern. Sollte der vollständig qualifizierte Name einer Klasse sein, die org.apache.hadoop.io.compress.CompressionCodec implementiert, oder eines der Groß-/Kleinschreibung, die die Groß-/Kleinschreibung nicht beachtet (bzip2, gzip, lz4 und snappy). Standardmäßig wird keine Komprimierung verwendet, wenn kein Codec angegeben ist.

In Ihrem Fall sollte diese Arbeit: df.write.format("com.databricks.spark.csv").codec("gzip")\ .save('my_directory/my_file.gzip')

10

Dieser Code funktioniert für Spark 2.1, wo .codec nicht verfügbar.

df.write 
    .format("com.databricks.spark.csv") 
    .option("codec", "org.apache.hadoop.io.compress.GzipCodec") 
    .save(my_directory) 

für Spark 2.2 können Sie die df.write.csv(...,codec="gzip") Option hier beschrieben: https://spark.apache.org/docs/latest/api/python/pyspark.sql.html?highlight=codec

+1

Während dieser Code die Frage möglicherweise beantworten kann und zusätzlichen Kontext betreffend zur Verfügung stellt warum und/oder wie dieser Code die Frage beantwortet verbessert seinen langfristigen Wert. – manniL

+0

Bei Verwendung des "json" -Formats wird die Komprimierung nicht übernommen – Disha

+0

Es sieht so aus, als ob das Schlüsselwortargument in 'compression' geändert wurde. https://spark.apache.org/docs/latest/api/python/pypark.sql.html?highlight=codec#pypspark.sql.DataFrameWriter.csv – volker238

4

Mit Funken 2.0+, das ist ein wenig einfacher geworden ist:

df.write.csv("path", compression="gzip") 

Sie brauchen nicht die externes Databricks CSV-Paket nicht mehr.

Der csv() Schreiber unterstützt eine Reihe von praktischen Optionen. Beispiel:

  • sep: Zum Festlegen des Trennzeichens.
  • quote: Ob und wie Werte angegeben werden.
  • header: Ob eine Kopfzeile enthalten sein soll.

Es gibt auch eine Reihe von anderen Kompressions-Codecs können Sie, zusätzlich zu gzip verwenden:

  • bzip2
  • lz4
  • snappy
  • deflate

Die vollständige Spark-Dokumentation für die csv() Schriftsteller sind hier: Python/Scala

+0

Danke für die Verknüpfung mit csv writer docs, und nicht nur einen Databricks Antworten! –

+0

@LaurensKoppenol - Nun, um fair zu sein, die CSF-Unterstützung, die ursprünglich zu Spark hinzugefügt wurde, begann ursprünglich als externes Databricks CSV-Paket [verlinkt mit] (https://github.com/databricks/spark-csv) in der akzeptierten Antwort. :) Dieses Paket ist für jeden Spark-Benutzer verfügbar, aber ab Spark 2.0 wird es nicht mehr benötigt. –

1

die CSV mit Header-Datei zu schreiben und die Teil-000-Datei

DF.coalesce(1).write.format("com.databricks.spark.csv").mode("overwrite") 
.option("header","true") 
.option("codec",org.apache.hadoop.io.compress.GzipCodec").save(tempLocationFileName) 

copyRename(tempLocationFileName, finalLocationFileName) 

def copyRename(srcPath: String, dstPath: String): Unit = { 
    val hadoopConfig = new Configuration() 
    val hdfs = FileSystem.get(hadoopConfig) 
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath), true, hadoopConfig, null) 
    // the "true" setting deletes the source files once they are merged into the new output 
} 

.csv.gzip umbenennen Wenn Sie nicht tun brauche den Header und setze ihn dann auf false und du musst die Koaleszenz auch nicht machen. Es wird auch schneller zu schreiben sein.

Verwandte Themen