2017-02-03 6 views
2

Innerhalb des angegebenen Verzeichnisses habe ich viele verschiedene Ordner und in jedem Ordner habe ich Hadoop-Dateien (part_001, etc.).Wie rekursiv Hadoop-Dateien aus dem Verzeichnis mit Spark lesen?

directory 
    -> folder1 
     -> part_001... 
     -> part_002... 
    -> folder2 
     -> part_001... 
    ... 

das Verzeichnis gegeben, wie kann ich lesen rekursiv den Inhalt aller Ordner in diesem Verzeichnis und laden Sie diese Inhalte in einem einzigen RDD in Spark-Scala mit?

Ich fand dieses, aber es geht nicht rekursiv in Unterordnern (I import org.apache.hadoop.mapreduce.lib.input bin mit):

var job: Job = null 
    try { 
    job = Job.getInstance() 
    FileInputFormat.setInputPaths(job, new Path("s3n://" + bucketNameData + "/" + directoryS3)) 
    FileInputFormat.setInputDirRecursive(job, true) 
    } catch { 
    case ioe: IOException => ioe.printStackTrace(); System.exit(1); 
    } 
    val sourceData = sc.newAPIHadoopRDD(job.getConfiguration(), classOf[TextInputFormat], classOf[LongWritable], classOf[Text]).values 

Ich fand auch dieses web-page die SequenceFile verwendet, aber ich verstehe nicht, wieder, wie man Anwenden auf meinen Fall?

+0

Sie mit einem einfachen Wildcard versucht haben? Wenn die Verzeichnisstruktur konsistent ist, sollte es wie ein Charme funktionieren – Chobeat

+0

siehe http://stackoverflow.com/a/27843858/647053 –

+0

@Chobeat: Meinst du, dass die Antwort von dbustosp ('var rdd = sc.textFile (" Pfad/*/* ")') wird direkt tun, was ich erklärt habe, ohne den gesamten Code, den ich gepostet habe, zu schreiben? – user7379562

Antwort

5

Wenn Sie Funken verwenden, können Sie dies mit wilcards wie folgt tun:

scala>sc.textFile("path/*/*") 

sc die SparkContext die, wenn Sie Funken Shell verwenden ist standardmäßig initialisiert oder wenn Sie erstellen Ihre eigenes Programm sollte einen SparkContext selbst instanziieren müssen.

mit folgenden Flagge Vorsicht:

scala> sc.hadoopConfiguration.get ("mapreduce.input.fileinputformat.input.dir.recursive") RES6: String = null

Yo sollte dieses Flag auf true gesetzt:

sc.hadoopConfiguration.set ("mapreduce.input.fileinputformat.input.dir.recursive", "true")

+0

Also meinst du, dass ich das einfach machen kann? 'val myRDD = sc.textFile (" Pfad/*/* ")'? Sie müssen 'setInputDirRecursive' nicht verwenden? Bekomme ich RDD von String? (Ich brauche RDD von String) – user7379562

+0

Ja, genau richtig. Dies wird standardmäßig als String geladen, und wenn Sie Platzhalter verwenden, verwenden Sie dieses Flag nicht. – dbustosp

+0

Ok, noch eine Sache, die ich falsch verstehe: Wenn die Daten in Hadoop-Dateien JSON-Format haben, bekomme ich RDD von JSON-Strings, nachdem ich 'sc.textFile (...)' gemacht habe, oder? Dann, um es in DataFrame zu konvertieren funktioniert dieser Ansatz ?: 'val rddFromHadoop = sc.textFile (" Pfad/*/* ") import sqlContext.implicits._ var df = rddFromHadoop.toDF()'. Oder soll ich "rddFromHadoop" auf "RDD [Map [String, String]]' analysieren, bevor ich 'toDF()'? Entschuldigung, für diese zusätzliche Frage. Es ist nur wichtig für mich zu verstehen, dass mein Gesamtansatz funktioniert, wenn ich 'textFile' benutze und hadoopish-Dateien in RDD lese. – user7379562

1

habe ich gefunden, dass die Parameter auf diese Weise eingestellt werden müssen:

.set("spark.hive.mapred.supports.subdirectories","true") 
.set("spark.hadoop.mapreduce.input.fileinputformat.input.dir.recursive","true") 
Verwandte Themen