Ich möchte die Laufzeit einer Spark-Anwendung optimieren, indem Sie eine große CSV-Datei abhängig von ihren Eigenschaften in verschiedene Partitionen aufteilen.Apache Spark: Verwenden von Ordnerstrukturen zum Reduzieren der Laufzeit von Analysen
z. Ich habe eine Spalte mit Kunden-IDs (ganze Zahl, a), eine Spalte mit Daten (Monat + Jahr, zB 01.2015, b) und eine Spalte mit Produkt-IDs (ganze Zahl, c) (und mehr Spalten mit produktspezifischen Daten, nicht benötigt) für die Partitionierung).
Ich möchte eine Ordnerstruktur wie /customer/a/date/b/product/c
erstellen. Wenn ein Benutzer Informationen zu Produkten von Kunde X, die im Januar 2016 verkauft wurden, wissen möchte, kann er die ingespeicherte Datei laden und analysieren.
Gibt es eine Möglichkeit, solche Ordnerstrukturen über Wildcards zu laden? Es sollte auch möglich sein, alle Kunden oder Produkte eines bestimmten Zeitbereichs, z. 01.2015 bis 09.2015. Ist es möglich, Wildcards wie /customer/*/date/*.2015/product/c
zu verwenden? Oder wie könnte ein Problem wie dieses gelöst werden?
Ich möchte die Daten einmal partitionieren und später die spezifischen Dateien in der Analyse laden, um die Laufzeit für diese Jobs zu reduzieren (ignoriert die zusätzliche Arbeit für die Partitionierung).
LÖSUNG: mit Parkettarbeitsdateien
ich meine Spark-Anwendung geändert meine Daten zu speichern, Dateien Parkett, jetzt funktioniert alles einwandfrei und ich kann die Daten vorab auswählen, indem Ordner-Struktur. Hier ist mein Code-Schnipsel:
JavaRDD<Article> goodRdd = ...
SQLContext sqlContext = new SQLContext(sc);
List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("keyStore", DataTypes.IntegerType, false));
fields.add(DataTypes.createStructField("textArticle", DataTypes.StringType, false));
StructType schema = DataTypes.createStructType(fields);
JavaRDD<Row> rowRDD = goodRdd.map(new Function<Article, Row>() {
public Row call(Article article) throws Exception {
return RowFactory.create(article.getKeyStore(), article.getTextArticle());
}
});
DataFrame storeDataFrame = sqlContext.createDataFrame(rowRDD, schema);
// WRITE PARQUET FILES
storeDataFrame.write().partitionBy(fields.get(0).name()).parquet("hdfs://hdfs-master:8020/user/test/parquet/");
// READ PARQUET FILES
DataFrame read = sqlContext.read().option("basePath", "hdfs://hdfs-master:8020/user/test/parquet/").parquet("hdfs://hdfs-master:8020/user/test/parquet/keyStore=1/");
System.out.println("READ : " + read.count());
WICHTIG
Sie nicht mit einem Tisch versuchen, mit nur einer Spalte! Sie erhalten Ausnahmen, wenn Sie versuchen, die partitionBy
Methode aufzurufen!
kippe Sie hive Tabelle für hdfs Pfad erstellen? Die Hive-Tabelle unterstützt sowohl die dynamische Partitionierung als auch die statische Partitionierung. Mit Datenrahmen können Sie die Daten beliebig abfragen. –
@RamPrasadG Sie müssen keine Hive-Tabellen erstellen. Spark kann das gut machen. Wie auch immer, vielleicht werde ich diese Frage stattdessen beantworten;) –
@GlennieHellesIndholt: Das bedeutet, dass Spark Pfade wie "/ customer/*/date/*/products/123" interpretieren kann? –