2016-03-18 8 views
1

Ich verarbeite einen sehr großen Datensatz mit Funken. Die Daten werden als Avro-Dateien gespeichert. Die Daten sind ebenfalls in einer Verzeichnisstruktur organisiert (/ input/yyyy/MM/dd/HH /). So zum Beispiel werden die Avro-Dateien für heute in/Eingang/2016/03/18/00 zu/Eingang/2016/03/18/23Spark Erstellen von 100s leerer Avro-Dateien

Nun, wenn ich die letzten 2 Jahre Daten verarbeiten, gibt es viele viele avro-Dateien, die verarbeitet werden.

Der Datenverarbeitungscode ist wie folgt

val inputRDD = sc.load("/input", "com.databricks.spark.avro").rdd 
val outputRDD = inputRDD.map(foo).filter(_.isDefined).flatMap(x => x).join(anotherRDD).map { 
    case (a, (b, (c, d))) => (a, (b, c, d)) 
}.join(yetAnotherRDD).filter { 
    case (a, ((b, c, d), (e, f))) => Math.abs(a - b) <= 2000 
}.map { 
    case (a, ((b, c, d), (e, f))) => Row(a, d) 
} 
val outputDF = sc.createDataframe(outputRDD, outputSchema) 
outputDF.save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

Nun, wenn ich innen Ausgang mit Hue gehen. Ich sehe 181 Seiten und auf jeder Seite sehe ich viele leere Avro-Dateien.

Nicht alle Dateien sind leer ... aber es gibt so viele leere Dateien.

Was ist, wenn ich keine leeren Dateien möchte. (ohne auf "collect" zurückgreifen zu müssen)

Antwort

1

Jede Eingabedatei erzeugt mindestens eine RDD (wenn eine Datei groß ist, könnte ich in mehreren Eingabesequenzen gelesen werden und mehrere RDDs erstellen). In Ihrer Anwendung führen Sie Filter auf diesen RDD aus, daher ist es möglich, dass einige RDDs leer sind, weil alle ihre Zeilen herausgefiltert wurden. Wenn Sie Ihren Dataframe speichern, wird jede RDD in einer anderen PART-Datei gespeichert, sodass eine leere RDD eine leere RDD-Datei generiert. Um dies zu umgehen, verwenden Sie .coalesce(n), wodurch die Anzahl der RDDs verringert wird. versuchen So etwas wie diese auf Ihre letzte Zeile:

outputDF.coalesce(200).save(s"/output/${datePath(date)}", "com.databricks.spark.avro") 

Die Zahl in coalesce zu verwenden, ist auf die Größe der Daten abhängig. Wenn Sie zu viele RDDs haben, wird aufgrund des Kommunikationsaufwands mit dem Treiber viel Leistung verloren gehen; Wenn Sie zu wenige RDDs haben, werden Sie möglicherweise nicht alle verfügbaren Executoren verwenden, was ebenfalls zu einer schlechteren als der optimalen Leistung führt.

+0

Ich habe überprüft, aber Funken 1.3.0 hat nicht die Colease-Funktion. Ich schätze, sie haben es später zu den Datenrahmen hinzugefügt. –

+1

Sie können '.repartition (200)' in diesem Fall verwenden. Beachten Sie, dass '.coalesce (200)' bei späteren Spark-Versionen besser funktioniert. –

Verwandte Themen