2016-09-02 6 views
0

Der Business Case ist, dass wir eine große Parkett-Datei in kleine Spalten durch eine Spalte als Partition teilen möchten. Wir haben getestet mit dataframe.partition ("xxx"). Write (...). Es dauerte etwa 1 Stunde mit 100K Einträge von Datensätzen. Also werden wir map reduce verwenden, um verschiedene Parkett-Dateien in verschiedenen Ordnern zu erzeugen. Beispielcode:Unterstützt Funken mehrere Ausgabedatei mit Parkett-Format

import org.apache.hadoop.io.NullWritable 

import org.apache.spark._ 
import org.apache.spark.SparkContext._ 

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat 

class RDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] { 
    override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String = 
    key.asInstanceOf[String]+"/aa" 
} 

object Split { 
    def main(args: Array[String]) { 
    val conf = new SparkConf().setAppName("SplitTest") 
    val sc = new SparkContext(conf) 
    sc.parallelize(List(("w", "www"), ("b", "blog"), ("c", "com"), ("w", "bt"))) 
     .map(value => (value._1, value._2 + "Test")) 
     .partitionBy(new HashPartitioner(3))//.saveAsNewAPIHadoopFile(path, keyClass, valueClass, outputFormatClass, conf) 
     .saveAsHadoopFile(args(0), classOf[String], classOf[String], 
     classOf[RDDMultipleTextOutputFormat]) 
    sc.stop() 
    } 
} 

Das obige Beispiel generiert nur eine Textdatei, wie Sie eine Parkettdatei mit multipleoutputformat generieren?

Antwort

1

Funken unterstützt Parkettpartitionierungs da 1.4.0 (1.5 + Syntax):

df.write.partitionBy("some") 

und Bucketing da (2.0.0):

df.write.bucketBy("some") 

mit optionaler sortBy Klausel.