2015-11-10 7 views
7

Ich möchte einen Funken-Job (Funken v1.5.1) über einige generierte S3-Pfade mit Avro-Dateien ausführen. Ich lade sie mit:Wie man Funken erlaubt, um fehlende Eingabedateien zu ignorieren?

val avros = paths.map(p => sqlContext.read.avro(p)) 

Einige der Pfade werden jedoch nicht existieren. Wie kann ich Funken sprühen, um diese leeren Pfade zu ignorieren? Zuvor habe ich this answer verwendet, aber ich bin mir nicht sicher, wie man das mit der neuen Dataframe-API benutzt.

Hinweis: Ich suche idealerweise nach einem ähnlichen Ansatz für die verknüpfte Antwort, die Eingabepfade nur optional macht. Ich möchte nicht explizit auf die Existenz von Pfaden in S3 prüfen müssen (da das mühsam ist und die Entwicklung schwierig machen könnte), aber ich denke, das ist mein Fallback, wenn es keinen sauberen Weg gibt, dies jetzt zu implementieren.

Antwort

9

Ich würde die Scala Try Typ verwenden, um die Möglichkeit des Fehlers beim Lesen eines Verzeichnisses von AVRO-Dateien zu behandeln. Mit ‚Try‘ können wir die Möglichkeit des Scheiterns explizit in unserem Code machen, und es in funktioneller Weise handhaben:

object Main extends App { 

    import scala.util.{Success, Try} 
    import org.apache.spark.{SparkConf, SparkContext} 
    import com.databricks.spark.avro._ 

    val sc = new SparkContext(new SparkConf().setMaster("local[*]").setAppName("example")) 
    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    //the first path exists, the second one doesn't 
    val paths = List("/data/1", "/data/2") 

    //Wrap the attempt to read the paths in a Try, then use collect to filter 
    //and map with a single partial function. 
    val avros = 
    paths 
     .map(p => Try(sqlContext.read.avro(p))) 
     .collect{ 
     case Success(df) => df 
     } 
    //Do whatever you want with your list of dataframes 
    avros.foreach{ df => 
    println(df.collect()) 
    } 
    sc.stop() 
} 
+0

Von den Funken docs: 'sammeln()' kann der Fahrer führt aus dem Speicher laufen, though, da 'collect()' die gesamte RDD auf einen einzelnen Rechner holt. Gibt es eine Lösung ohne '' collect() '? Dies ist für einen sehr großen Datensatz. – jbrown

+2

Das ist wahr, wenn 'collect()' auf einer RDD aufgerufen wird. Wo ich 'collect (...)' das erste Mal mit einer Teilfunktion anrufe, steht es auf einer Liste von RDDs, es ist die Collect-Funktion auf der Liste, nicht auf irgendeiner RDD. Dies ist äquivalent zu einem 'map' und einem' filter'. Ich benutze 'collect()' noch einmal am Ende innerhalb der 'foreach' am Ende, aber das ist nur ein Beispiel für den Betrieb der Liste der RDDs, ich erwarte nicht, dass Sie das in Ihrer eigenen Anwendung tun würden, aber ich brauchte ein einfaches Ende, um zu sehen, dass der Ansatz korrekt funktioniert hatte. – mattinbits

+0

Oh OK. Ich werde es versuchen und sehen, ob es dann funktioniert. Ich dachte, das erste 'collect' würde die RDDs auswerten und alle Daten an den Treiberknoten senden. – jbrown

Verwandte Themen