2017-11-01 4 views
2

Ich versuche, einen Datenrahmen als CSV-Datei in meinem lokalen Laufwerk zu speichern. Aber wenn ich das mache, bekomme ich einen Ordner und innerhalb dieser Partition wurden Dateien geschrieben. Gibt es einen Vorschlag, dies zu überwinden?Wie schreibt man Daten als einzelne (normale) CSV-Datei in Spark?

Meine Anforderung: Um eine normale CSV-Datei mit dem tatsächlichen Namen im Code erhalten.

Code Snippet: dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/dataframe.csv")

Antwort

1

TL: DR Sie versuchen, sequentielle zu erzwingen, in-Core-Konzepte auf einem enviornment verteilen. Es kann nicht gut enden.

Spark bietet kein Dienstprogramm wie dieses. Um in der Lage zu sein, einen in einer halb verteilten Weise zu erstellen, müssten Sie ein mehrstufiges, quellenabhängiges Protokoll implementieren:

  • Sie schreiben Header.
  • Sie schreiben Datendateien für jede Partition.
  • Sie führen die Dateien zusammen und geben einen neuen Namen ein.
  • Da dies begrenzte Anwendungen hat, ist nur nützlich für kleinere Dateien, und kann sehr teuer mit einigen Quellen (wie Objekt speichert) nichts dergleichen ist in Spark implementiert.

    Sie können natürlich Daten sammeln, Standard-CSV-Parser (Univoicity, Apache Commons) verwenden und dann in den Speicher Ihrer Wahl legen. Dies ist sequenziell und erfordert mehrere Datenübertragungen.

    0

    Es gibt keinen automatischen Weg, dies zu tun. Ich sehe zwei Lösungen

    • Wenn das lokale Verzeichnis auf allen Vollstrecker montiert ist: die Datei schreiben, wie Sie taten, aber dann bewegen/benennen Sie die part-*csv Datei auf den gewünschten Namen
    • Oder wenn das Verzeichnis nicht verfügbar ist auf allen Vollstrecker: mit Klar scala

    Aber beide Lösungen Art Parallelität zerstören und damit das Ziel der Funken sammeln die Datenrahmen an den Treiber und dann die Datei erstellen.

    0

    Es ist nicht möglich, aber Sie können wie folgt Somethings tun:

    dataframe.coalesce(1).write.mode("overwrite").format("com.databricks.spark.csv").option("header", "true").csv("E:/data/") 
    
    import org.apache.hadoop.fs._ 
    val fs = FileSystem.get(sc.hadoopConfiguration) 
    val filePath = "E:/data/" 
    val fileName = fs.globStatus(new Path(filePath+"part*"))(0).getPath.getName 
    fs.rename(new Path(filePath+fileName), new Path(filePath+"dataframe.csv")) 
    
    Verwandte Themen