2016-06-14 16 views
3

Ich möchte die Laufzeit einer Spark-Anwendung optimieren, indem Sie eine große CSV-Datei abhängig von ihren Eigenschaften in verschiedene Partitionen aufteilen.Apache Spark: Verwenden von Ordnerstrukturen zum Reduzieren der Laufzeit von Analysen

z. Ich habe eine Spalte mit Kunden-IDs (ganze Zahl, a), eine Spalte mit Daten (Monat + Jahr, zB 01.2015, b) und eine Spalte mit Produkt-IDs (ganze Zahl, c) (und mehr Spalten mit produktspezifischen Daten, nicht benötigt) für die Partitionierung).

Ich möchte eine Ordnerstruktur wie /customer/a/date/b/product/c erstellen. Wenn ein Benutzer Informationen zu Produkten von Kunde X, die im Januar 2016 verkauft wurden, wissen möchte, kann er die ingespeicherte Datei laden und analysieren.

Gibt es eine Möglichkeit, solche Ordnerstrukturen über Wildcards zu laden? Es sollte auch möglich sein, alle Kunden oder Produkte eines bestimmten Zeitbereichs, z. 01.2015 bis 09.2015. Ist es möglich, Wildcards wie /customer/*/date/*.2015/product/c zu verwenden? Oder wie könnte ein Problem wie dieses gelöst werden?

Ich möchte die Daten einmal partitionieren und später die spezifischen Dateien in der Analyse laden, um die Laufzeit für diese Jobs zu reduzieren (ignoriert die zusätzliche Arbeit für die Partitionierung).

LÖSUNG: mit Parkettarbeitsdateien

ich meine Spark-Anwendung geändert meine Daten zu speichern, Dateien Parkett, jetzt funktioniert alles einwandfrei und ich kann die Daten vorab auswählen, indem Ordner-Struktur. Hier ist mein Code-Schnipsel:

JavaRDD<Article> goodRdd = ... 

SQLContext sqlContext = new SQLContext(sc); 

List<StructField> fields = new ArrayList<StructField>(); 
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false)); 
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false)); 

StructType schema = DataTypes.createStructType(fields); 

JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() { 
    public Row call(Article article) throws Exception { 
     return RowFactory.create(article.getKeyStore(), article.getTextArticle()); 
    } 
}); 

DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema); 

// WRITE PARQUET FILES 
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/"); 

// READ PARQUET FILES 
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/"); 

System.out.println("READ : " + read.count()); 

WICHTIG

Sie nicht mit einem Tisch versuchen, mit nur einer Spalte! Sie erhalten Ausnahmen, wenn Sie versuchen, die partitionBy Methode aufzurufen!

+0

kippe Sie hive Tabelle für hdfs Pfad erstellen? Die Hive-Tabelle unterstützt sowohl die dynamische Partitionierung als auch die statische Partitionierung. Mit Datenrahmen können Sie die Daten beliebig abfragen. –

+1

@RamPrasadG Sie müssen keine Hive-Tabellen erstellen. Spark kann das gut machen. Wie auch immer, vielleicht werde ich diese Frage stattdessen beantworten;) –

+0

@GlennieHellesIndholt: Das bedeutet, dass Spark Pfade wie "/ customer/*/date/*/products/123" interpretieren kann? –

Antwort

9

So in Spark können Sie partitionierte Daten in der Art, wie Sie suchen, speichern und lesen. Anstatt jedoch den Pfad zu schaffen, wie Sie haben /customer/a/date/b/product/c Funken diese Konvention /customer=a/date=b/product=c verwenden, wenn Sie Daten speichern mit:

df.write.partitionBy("customer", "date", "product").parquet("/my/base/path/") 

Wenn Sie in den Daten lesen müssen, müssen Sie die basepath-option wie folgt angeben:

sqlContext.read.option("basePath", "/my/base/path/").parquet("/my/base/path/customer=*/date=*.2015/product=*/") 

und alles folgende /my/base/path/ wird von Spark als Spalten interpretiert. In dem hier angegebenen Beispiel fügt Spark die drei Spalten customer, date und product dem Datenrahmen hinzu. Beachten Sie, dass Sie Platzhalter für beliebige Spalten verwenden können, wie Sie möchten.

Beim Einlesen von Daten in einem bestimmten Zeitbereich sollten Sie beachten, dass Spark Prädikat-Push-Down verwendet, sodass nur Daten in den Speicher geladen werden, die den Kriterien entsprechen (wie durch eine Filterumwandlung festgelegt). Wenn Sie den Bereich jedoch explizit angeben möchten, können Sie eine Liste mit Pfadnamen erstellen und diese dann an die Lesefunktion übergeben. Wie folgt aus:

val pathsInMyRange = List("/my/path/customer=*/date=01.2015/product=*", 
          "/my/path/customer=*/date=02.2015/product=*", 
          "/my/path/customer=*/date=03.2015/product=*"..., 
          "/my/path/customer=*/date=09.2015/product=*") 

sqlContext.read.option("basePath", "/my/base/path/").parquet(pathsInMyRange:_*) 

Wie dem auch sei, ich hoffe, das hilft :)

+0

danke! Sieht gut aus, habe es einfach ausprobiert - es funktioniert ohne die Partitionierung ... wenn ich "df.write.partitionBy" verwende, erhalte ich eine Exception, siehe den oben bearbeiteten Code. –

+0

Es funktioniert jetzt! Danke für deine Antwort, @glennie-helles-sindholt! Die Exception ist aufgetreten, weil ich versucht habe, eine Tabelle mit nur einer Spalte zu partitionieren (unrealistischer Testfall), also brauchen Sie hier mindestens zwei Spalten, damit es funktioniert! –

Verwandte Themen