2016-05-20 15 views
1

Ich muss eine RDD durch erste Buchstaben (A-Z) teilen und schreiben Sie die Dateien in Verzeichnisse jeweils. Die einfache Lösung besteht darin, die RDD für jeden Buchstaben zu filtern, aber dies erfordert 26 Durchgänge. Es gibt eine Antwort auf eine ähnliche Frage zum Schreiben in Textdateien here, aber ich kann nicht herausfinden, wie dies für Avro-Dateien zu tun.Spark Avro schreiben RDD zu mehreren Verzeichnissen per Schlüssel

Hat es jemand geschafft?

Antwort

0

Ich hoffe, Sie als meine eine bessere Antwort bekommen ...

Ich habe in einer ähnlichen Situation selbst gewesen, außer mit „ORC“ statt Avro. Ich habe im Grunde die Hände hochgeworfen und habe die ORC-Dateiklassen direkt aufgerufen, um die Dateien selbst zu schreiben.

In meinem Fall würde mein Ansatz die Partitionierung der Daten über "partitionBy" in 26 Partitionen beinhalten, eine für jeden ersten Buchstaben A-Z. Rufen Sie dann "mapPartitionsWithIndex" auf, und übergeben Sie eine Funktion, die die i-te Partition im entsprechenden Pfad an eine Avro-Datei ausgibt. Um schließlich Spark dazu zu bringen, tatsächlich etwas zu tun, muss mapPartitionsWithIndex beispielsweise eine Liste zurückgeben, die den einzelnen booleschen Wert "true" enthält. Rufen Sie dann "count" auf der RDD auf, die von mapPartitionsWithIndex zurückgegeben wird, um Spark zum Starten der Show zu erhalten.

fand ich ein Beispiel hier eine Avro-Datei zu schreiben: http://www.myhadoopexamples.com/2015/06/19/merging-small-files-into-avro-file-2/

+0

Dank! Das Schreiben von Partitionen scheint zu funktionieren, zumindest bei einem kleinen Datensatz im lokalen Modus. –

+0

Folgefrage. Mit diesem Ansatz erstellt Spark nur 26 Partitionen? Und damit hätte jedes Verzeichnis eine einzige Datei? Ich bin besorgt, dass die Dateien sehr groß werden und dass, wenn die Mehrheit der Daten unter dem Buchstaben A stehen würde, dann wird dieser Executor mit nicht genügend Arbeitsspeicher fehlschlagen. –

+0

Die in "mapPartitionsWithIndex" übergebene Funktion kann die Größe der Ausgabedatei begrenzen. Diese Funktion wird jedes Element in der Partition durchlaufen. Es könnte die Anzahl der Datensätze, die es in eine Ausgabedatei geschrieben hat, verfolgen und diese schließen und eine neue öffnen, wenn sie ein Maximum erreicht hat. –

1

Sie multipleoutputformat diese

Es zu tun, verwenden können, ist ein zweistufiger Aufgabe: -

  1. Zuerst benötigen Sie den Mehrfaches Ausgabeformat für avro. Unten ist der Code dafür:

    package avro 
    
    import org.apache.hadoop.mapred.lib.MultipleOutputFormat 
    import org.apache.hadoop.fs.FileSystem 
    import org.apache.hadoop.mapred.JobConf 
    import org.apache.hadoop.util.Progressable 
    import org.apache.avro.mapred.AvroOutputFormat 
    import org.apache.avro.mapred.AvroWrapper 
    import org.apache.hadoop.io.NullWritable 
    import org.apache.spark.rdd.RDD 
    import org.apache.hadoop.mapred.RecordWriter 
    
    class MultipleAvroFileOutputFormat[K] extends MultipleOutputFormat[AvroWrapper[K], NullWritable] { 
    val outputFormat = new AvroOutputFormat[K] 
    
    override def generateFileNameForKeyValue(key: AvroWrapper[K], value: NullWritable, name: String) = { 
    val name = key.datum().asInstanceOf[String].substring(0, 1) 
    name + "/" + name 
    } 
    
    override def getBaseRecordWriter(fs: FileSystem, 
    job: JobConf, 
    name: String, 
    arg3: Progressable) = { 
    outputFormat.getRecordWriter(fs, job, name, arg3).asInstanceOf[RecordWriter[AvroWrapper[K], NullWritable]] 
    } 
    
    } 
    
  2. In Ihrem Treiber-Code müssen Sie erwähnen, dass Sie die oben angegebenen Ausgabeformat verwenden möchten. Sie müssen auch das Ausgabeschema für Avro-Daten erwähnen. Im Folgenden finden Sie Beispieltreibercode, die eine RDD von Zeichenfolge in Avro-Format mit Schema speichert { "type": "string"}

    package avro 
    
    import org.apache.spark.SparkConf 
    import org.apache.spark.SparkContext 
    import org.apache.hadoop.io.NullWritable 
    import org.apache.spark._ 
    import org.apache.spark.SparkContext._ 
    import org.apache.hadoop.mapred.JobConf 
    import org.apache.avro.mapred.AvroJob 
    import org.apache.avro.mapred.AvroWrapper 
    object AvroDemo { 
    def main(args: Array[String]): Unit = { 
    val conf = new SparkConf 
    conf.setAppName(args(0)); 
    conf.setMaster("local[2]"); 
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer") 
    conf.registerKryoClasses(Array(classOf[AvroWrapper[String]])) 
    val sc = new SparkContext(conf);  
    val input = sc.parallelize(Seq("one", "two", "three", "four"), 1); 
    val pairRDD = input.map(x => (new AvroWrapper(x), null)); 
    val job = new JobConf(sc.hadoopConfiguration) 
    val schema = "{\"type\":\"string\"}" 
    job.set(AvroJob.OUTPUT_SCHEMA, schema) //set schema for avro output 
    pairRDD.partitionBy(new HashPartitioner(26)).saveAsHadoopFile(args(1), classOf[AvroWrapper[String]], classOf[NullWritable], classOf[MultipleAvroFileOutputFormat[String]], job, None); 
    sc.stop() 
    } 
    }