2016-03-30 9 views
0

Ich möchte herausfinden, wie sc.textfile im Detail funktioniert.
Ich habe die Textdatei Quellcode in SparkContext.scala gefunden, aber sie enthalten so viele Informationen über Scheduler, Bühne und Aufgabe eingereicht. Was ich will, ist, wie sc.textfile liest Dateien von hdfs und wie sc.textfile Wildcard verwendet, um mehrere Dateien übereinstimmen.
Wo finde ich den Quellcode?Wie funktioniert Spark sc.textfile im Detail?

Antwort

0

die Rechenfunktion in core \ src \ main \ scala \ org \ Apache \ Funke \ rdd \ HadoopRDD.scala

hier sind einige Code in der Funktion unter

var reader: RecordReader[K, V] = null 
    val inputFormat = getInputFormat(jobConf) 
    HadoopRDD.addLocalConfiguration(new SimpleDateFormat("yyyyMMddHHmm").format(createTime), 
    context.stageId, theSplit.index, context.attemptNumber, jobConf) 
    reader = inputFormat.getRecordReader(split.inputSplit.value, jobConf, Reporter.NULL) 

    // Register an on-task-completion callback to close the input stream. 
    context.addTaskCompletionListener{ context => closeIfNeeded() } 
    val key: K = reader.createKey() 
    val value: V = reader.createValue() 
1

Text-Datei ein Verfahren zur Herstellung einer org.apache.spark.SparkContext Klasse ist, dass eine Textdatei von HDFS liest, ein lokales Dateisystem (verfügbar auf allen Knoten) oder alle Hadoop-unterstützte Dateisystem-URI und Rückgabe als RDD von Strings.

Sc.textFile(path,minpartions) 

@param Pfad Pfad zur Textdatei auf einem @param minPartitions Dateisystem unterstützt vorgeschlagen minimale Anzahl von Partitionen für die resultierende RDD @return RDD der Zeilen der Textdatei

Es verwendet intern hadoopRDD (Eine RDD, die die Kernfunktionalität zum Lesen von in Hadoop gespeicherten Daten bereitstellt)

Hadoop Rdd sieht so aus

HadoopRDD(
     sc, //Sparkcontext 
     confBroadcast, //A general Hadoop Configuration, or a subclass of it 
     Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates.  inputFormatClass, 
     keyClass, 
     valueClass, 
     minPartitions) 

Im Text-Datei Methode, die wir einen hadoopRDD mit etwas fest codierten Wert schaffen nennen:

HadoopRDD(
     sc, //Sparkcontext 
     confBroadcast, //A general Hadoop Configuration, or a subclass of it 
     Some(setInputPathsFunc),//Optional closure used to initialize any JobConf that HadoopRDD creates. 
     classOf[TextInputFormat], 
     classOf[LongWritable], 
     classOf[Text], 
     minPartitions) 

Aufgrund dieser hart codierte Werte, die wir nur in der Lage sind Text-Dateien zu lesen, so dass, wenn wir eine andere Art lesen wollen der Datei verwenden wir HadoopRdd.

Verwandte Themen