2015-10-23 4 views
5

Ich bin neu in Spark und versuche, eine Spalte in jeder Eingabezeile mit dem Dateinamen einzufügen, aus dem es stammt.Wie fügt man jeder Zeile in Spark den Namen der Quelldatei hinzu?

Ich habe andere gesehen, die eine ähnliche Frage gestellt haben, aber alle ihre Antworten haben wholeTextFile verwendet, aber ich versuche dies für größere CSV-Dateien (gelesen mit der Spark-CSV-Bibliothek), JSON-Dateien und Parquet-Dateien (nicht nur kleine Textdateien).

kann ich die spark-shell verwenden, um eine Liste der Dateinamen zu erhalten:

val df = sqlContext.read.parquet("/blah/dir") 
val names = df.select(inputFileName()) 
names.show 

aber das ist ein Datenrahmen. Ich bin nicht sicher, wie man es als eine Spalte zu jeder Zeile hinzufügen (und wenn dieses Ergebnis ist die gleiche wie die Ausgangsdaten entweder, obwohl ich nehme an, dass es immer ist) und wie dies als eine allgemeine Lösung für alle Eingabearten zu tun .

+0

Warum wollen Sie/das brauchen? –

+1

Jeder Datensatz muss zeigen, welche Datei es ursprünglich ist ... einfacher, Dinge zu debuggen, wenn Sie den gesamten Pfad kennen (wie eine fehlerhafte Eingabedatei) – mcmcmc

Antwort

2

Wenn Sie eine RDD aus einer Textdatei zu erstellen, möchten Sie wahrscheinlich die Daten in einem Fall Klasse abzubilden, so könnte man die Eingangsquelle in diesem Stadium hinzu:

case class Person(inputPath: String, name: String, age: Int) 
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt" 
val rdd = sc.textFile(inputPath).map { 
    l => 
     val tokens = l.split(",") 
     Person(inputPath, tokens(0), tokens(1).trim().toInt) 
    } 
rdd.collect().foreach(println) 

Wenn Sie nicht wollen, mix "Geschäftsdaten" mit Meta-Daten:

case class InputSourceMetaData(path: String, size: Long) 
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData) 

// Fake the size, for demo purposes only 
val md = InputSourceMetaData(inputPath, size = -1L) 
val rdd = sc.textFile(inputPath).map { 
    l => 
    val tokens = l.split(",") 
    PersonWithMd(tokens(0), tokens(1).trim().toInt, md) 
} 
rdd.collect().foreach(println) 

und wenn Sie die RDD zu einem Datenrahmen zu fördern:

import sqlContext.implicits._ 
val df = rdd.toDF() 
df.registerTempTable("x") 

können Sie es wie

sqlContext.sql("select name, metadata from x").show() 
sqlContext.sql("select name, metadata.path from x").show() 
sqlContext.sql("select name, metadata.path, metadata.size from x").show() 

aktualisieren

Sie abfragen, um die Dateien in HDFS mit org.apache.hadoop.fs.FileSystem.listFiles() rekursiv lesen kann.

in einem Wert eine Liste von Dateinamen Bei files (Standard-Kollektion Scala enthält org.apache.hadoop.fs.LocatedFileStatus), können Sie eine RDD für jede Datei erstellen:

val rdds = files.map { f => 
    val md = InputSourceMetaData(f.getPath.toString, f.getLen) 

    sc.textFile(md.path).map { 
    l => 
     val tokens = l.split(",") 
     PersonWithMd(tokens(0), tokens(1).trim().toInt, md) 
    } 
} 

Jetzt können Sie reduce die Liste des RDDs zu einem einzigen : Die Funktion für reduce concats alle RDDs zu einem einzigen:

val rdd = rdds.reduce(_ ++ _) 
rdd.collect().foreach(println) 

Dies funktioniert, aber ich kann/führt auch mit großen Dateien nicht testen, ob diese verteilt.

+0

Ich weiß das definitiv zu schätzen, aber das einzige Problem ist, dass Sie das angeben müssen vollständiger Pfad und Dateiname der Eingabedatei Ich gebe nur das Eingabeverzeichnis an und ziehe alle Eingabedateien darin hinein. – mcmcmc

+0

Welche Funktion verwenden Sie gerade? Ist es 'ganzeTextFiles()'? – Beryllium

+0

Für CSV-Dateien verwende ich die Databricks/Spark-CSV-Bibliothek 'sqlContext.read.format (" com.databricks.spark.csv "). Load ("/path/dir/")'. Für Parkett-Dateien verwenden Sie 'sqlContext.read.parquet ("/path/parketdir/"). – mcmcmc

Verwandte Themen