Wenn Sie eine RDD aus einer Textdatei zu erstellen, möchten Sie wahrscheinlich die Daten in einem Fall Klasse abzubilden, so könnte man die Eingangsquelle in diesem Stadium hinzu:
case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
Person(inputPath, tokens(0), tokens(1).trim().toInt)
}
rdd.collect().foreach(println)
Wenn Sie nicht wollen, mix "Geschäftsdaten" mit Meta-Daten:
case class InputSourceMetaData(path: String, size: Long)
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)
// Fake the size, for demo purposes only
val md = InputSourceMetaData(inputPath, size = -1L)
val rdd = sc.textFile(inputPath).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
rdd.collect().foreach(println)
und wenn Sie die RDD zu einem Datenrahmen zu fördern:
import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")
können Sie es wie
sqlContext.sql("select name, metadata from x").show()
sqlContext.sql("select name, metadata.path from x").show()
sqlContext.sql("select name, metadata.path, metadata.size from x").show()
aktualisieren
Sie abfragen, um die Dateien in HDFS mit org.apache.hadoop.fs.FileSystem.listFiles()
rekursiv lesen kann.
in einem Wert eine Liste von Dateinamen Bei files
(Standard-Kollektion Scala enthält org.apache.hadoop.fs.LocatedFileStatus
), können Sie eine RDD für jede Datei erstellen:
val rdds = files.map { f =>
val md = InputSourceMetaData(f.getPath.toString, f.getLen)
sc.textFile(md.path).map {
l =>
val tokens = l.split(",")
PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
}
}
Jetzt können Sie reduce
die Liste des RDDs zu einem einzigen : Die Funktion für reduce
concats alle RDDs zu einem einzigen:
val rdd = rdds.reduce(_ ++ _)
rdd.collect().foreach(println)
Dies funktioniert, aber ich kann/führt auch mit großen Dateien nicht testen, ob diese verteilt.
Warum wollen Sie/das brauchen? –
Jeder Datensatz muss zeigen, welche Datei es ursprünglich ist ... einfacher, Dinge zu debuggen, wenn Sie den gesamten Pfad kennen (wie eine fehlerhafte Eingabedatei) – mcmcmc