3

In SparkSQL, verwende ich DF.wirte.mode (SaveMode.Append) .json (xxxx), aber diese Methode, diese Dateien wieenter image description hereWie kann ich (Spark1.6) saveAsTextFile zum Anhängen bestehender Datei machen?

der Dateiname ist zu komplex und zufällig bekommen, kann ich nicht verwenden api zu bekommen.So möchte ich saveAstextfile verwenden, da der Dateiname nicht komplex und regulär ist, aber ich weiß nicht, wie man Datei im selben Verzeichnis anhängen? Schätzen Sie für Ihre Zeit.

Antwort

2

auf Funken arbeitete 1.5, ich denke, das richtige Nutzung ist ..

dataframe.write().mode(SaveMode.Append).format(FILE_FORMAT).**partitionBy**("parameter1", "parameter2").save(path); 
0

Als Funke verwendet HDFS, dies ist die typische Ausgabe ist es produziert. Sie können FileUtil verwenden, um die Dateien in einem zusammenzuführen. Es ist eine effiziente Lösung, da es keinen Funken benötigt, um ganze Daten in einem einzigen Speicher zu sammeln, indem es in 1 partitioniert wird. Dies ist der Ansatz, dem ich folge.

import org.apache.hadoop.fs.{FileSystem, FileUtil, Path} 

val hadoopConf = sqlContext.sparkContext.hadoopConfiguration 
val hdfs = FileSystem.get(hadoopConf) 
val mergedPath = "merged-" + filePath + ".json" 
val merged = new Path(mergedPath) 
if (hdfs.exists(merged)) { 
    hdfs.delete(merged, true) 
} 
df.wirte.mode(SaveMode.Append).json(filePath) 

FileUtil.copyMerge(hdfs, path, hdfs, merged, false, hadoopConf, null) 

Sie können die einzelne Datei unter Verwendung der Position mergedPath lesen. Ich hoffe es hilft.

+0

Danke, und ich mag, dass erreichen, zum Beispiel in HDFS, ich habe 3 Dateien wie Teil 00000, Teil-00001, Teil-00002, meine Nachfrage ist diese Dateien werden zu Teil machen -00000, Kann ich copyMerge in existierende Datei verwenden? – yjxyjx

+0

Ich bin nicht sehr klar über Ihre Frage. Wenn Sie fragen, ob Sie part-00000, part-00001, part-00002 in part-00000 zusammenführen können, ist dies der obige Code. Sie müssen den zusammengeführten Pfad nur so formulieren, wie Sie möchten. Ist das wonach Sie suchen? – NehaM

+0

Es gibt die Funktion 'coalesce' in Spark, um alles in einer einzigen Datei zusammenzuführen. – AKSW

2

Sie können diese Methode versuchen, die ich von irgendwo finde. Process Spark Streaming rdd and store to single HDFS file

import org.apache.hadoop.fs.{ FileSystem, FileUtil, Path } 

def saveAsTextFileAndMerge[T](hdfsServer: String, fileName: String, rdd: RDD[T]) = { 
    val sourceFile = hdfsServer + "/tmp/" 
    rdd.saveAsTextFile(sourceFile) 
    val dstPath = hdfsServer + "/final/" 
    merge(sourceFile, dstPath, fileName) 
} 

def merge(srcPath: String, dstPath: String, fileName: String): Unit = { 
    val hadoopConfig = new Configuration() 
    val hdfs = FileSystem.get(hadoopConfig) 
    val destinationPath = new Path(dstPath) 
    if (!hdfs.exists(destinationPath)) { 
    hdfs.mkdirs(destinationPath) 
    } 
    FileUtil.copyMerge(hdfs, new Path(srcPath), hdfs, new Path(dstPath + "/" + fileName), false, hadoopConfig, null) 
} 
Verwandte Themen