2017-02-06 3 views
2

ich eine Verzeichnisstruktur auf S3 wie folgt aussehen:Funken lesen mehrere Verzeichnisse in mutiple Datenrahmen

foo 
    |-base 
    |-2017 
     |-01 
      |-04 
       |-part1.orc, part2.orc .... 
    |-A 
    |-2017 
     |-01 
      |-04 
       |-part1.orc, part2.orc .... 
    |-B 
    |-2017 
     |-01 
      |-04 
       |-part1.orc, part2.orc .... 

Was bedeutet, dass für die Verzeichnis foo ich mehrere Output-Tabellen haben, base, A, B, usw. in einem bestimmten Pfad basierend auf dem Zeitstempel eines Jobs.

Ich möchte left join sie alle, basierend auf einem Zeitstempel und dem Hauptverzeichnis, in diesem Fall foo. Dies würde bedeuten, dass in jeder Ausgabetabelle base, A, B usw. neue separate Eingabetabellen gelesen werden, auf die left join angewendet werden kann. Alle mit der base Tabelle als Ausgangspunkt

so etwas (nicht funktionierenden Code!)

val dfs: Seq[DataFrame] = spark.read.orc("foo/*/2017/01/04/*") 
val base: DataFrame = spark.read.orc("foo/base/2017/01/04/*") 

val result = dfs.foldLeft(base)((l, r) => l.join(r, 'id, "left")) 

jemand mich in der richtigen Richtung zeigen kann, wie man diese Sequenz von Datenrahmen zu bekommen? Es kann sogar sinnvoll sein, die Lesevorgänge als faul oder sequenziell zu betrachten und daher nur die Tabelle A oder B zu lesen, wenn der Join angewendet wird, um die Speicheranforderungen zu reduzieren.

Hinweis: Die Verzeichnisstruktur ist nicht endgültig, dh sie kann sich ändern, wenn dies zur Lösung passt.

+0

hive Partitionsstruktur aussieht und Sie Ork verwenden Datum partitionierte Dateien. Warum können diese nicht in Hive gemappt werden und 'hiveContext.sql' für jedes Datum verwenden und dann beitreten –

+0

Wir laufen nicht Hive, nur Spark standalone – Tim

Antwort

0

Hier ist eine Straight-Forward-Lösung zu dem, was (glaube ich) Sie versuchen, ohne Verwendung von zusätzlichen Funktionen wie Hive oder build-in Partitionierung Fähigkeiten zu tun,:

import spark.implicits._ 

// load base 
val baseDF = spark.read.orc("foo/base/2017/01/04").as("base") 

// create or use existing Hadoop FileSystem - this should use the actual config and path 
val fs = FileSystem.get(new URI("."), new Configuration()) 

// find all other subfolders under foo/ 
val otherFolderPaths = fs.listStatus(new Path("foo/"), new PathFilter { 
    override def accept(path: Path): Boolean = path.getName != "base" 
}).map(_.getPath) 

// use foldLeft to join all, using the DF aliases to find the right "id" column 
val result = otherFolderPaths.foldLeft(baseDF) { (df, path) => 
    df.join(spark.read.orc(s"$path/2017/01/04").as(path.getName), $"base.id" === $"${path.getName}.id" , "left") } 
1

Von dem, was ich verstehe, Funken verwendet die zugrunde liegende Hadoop-API zum Einlesen der Datendatei. Das vererbte Verhalten besteht also darin, alles, was Sie angeben, in einem einzigen RDD/Datenrahmen zu lesen.

Um das zu erreichen, was Sie wollen, können Sie zunächst eine Liste von Verzeichnissen mit bekommen:

import org.apache.hadoop.conf.Configuration 
    import org.apache.hadoop.fs.{ FileSystem, Path } 

    val path = "foo/" 

    val hadoopConf = new Configuration() 
    val fs = FileSystem.get(hadoopConf) 
    val paths: Array[String] = fs.listStatus(new Path(path)). 
     filter(_.isDirectory). 
     map(_.getPath.toString) 

Dann sie in getrennte Datenrahmen laden:

val dfs: Array[DataFrame] = paths. 
     map(path => spark.read.orc(path + "/2017/01/04/*"))