2017-11-08 1 views
0

My-Datensatz auf diese Weise partitioniert ist:Spark-SQL-Abfragen auf partitionierten Daten Datum mit Rang

Year=yyyy 
|---Month=mm 
| |---Day=dd 
| | |---<parquet-files> 

Was ist die einfachste und effizienteste Weg, einen Datenrahmen in Funken mit Daten zwischen zwei Daten geladen zu schaffen?

+0

Wenn Sie einfache Bereichsabfragen auf Partitionen wünschen, ist die beste Lösung, eine bessere Partitionierungsstrategie zu verwenden, bei der Zeit auf einer einzelnen Achse, z. B. '/ tbl/ts = yyyymmddhhmm/*.Es gibt einen Abschnitt zu diesem Thema in https://spark-summit.org/east-2017/events/bulletproof-jobs-patterns-for-large-scale-spark-processing/ – Sim

Antwort

2

Wenn Sie unbedingt auf diese Partitionierungsstrategie haben zu bleiben, hängt die Antwort darauf, ob Sie bereit sind, Partition zu tragen Entdeckungskosten oder nicht.

Wenn Sie möchten, dass Spark alle Partitionen erkennt, die nur einmal ausgeführt werden müssen (bis Sie neue Dateien hinzufügen), können Sie den Basispfad laden und dann mithilfe der Partitionsspalten filtern.

Wenn Sie nicht möchten, dass Spark alle Partitionen erkennt, z. B. weil Sie Millionen von Dateien haben, ist die einzige effiziente allgemeine Lösung, das Intervall zu unterbrechen, in dem Sie leicht abfragen können Verwenden von @ r0bb23 Ansatz und dann Vereinigung dann zusammen.

Wenn Sie das Beste aus beiden oben genannten Fällen möchten und ein stabiles Schema haben, können Sie die Partitionen im Metastore registrieren, indem Sie eine externe partitionierte Tabelle definieren. Tun Sie dies nicht, wenn Sie erwarten, dass sich Ihr Schema weiterentwickelt, da Tabellen mit Metastasen die Schemaentwicklung zu diesem Zeitpunkt ziemlich schlecht verwalten.

Zum Beispiel zur Abfrage zwischen 2017-10-06 und 2017-11-03 Sie tun würden:

// With full discovery 
spark.read.parquet("hdfs:///basepath") 
    .where('Year === 2017 && (
    ('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3') 
)) 

// With partial discovery 
val df1 = spark.read.option("basePath", "hdfs:///basepath/") 
    .parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/") 
val df2 = spark.read.option("basePath", "hdfs:///basepath/") 
    .parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/") 
val df = df1.union(df2) 

Schreiben von generischem Code für diese sicherlich möglich ist, aber ich habe es nicht auf. Der bessere Ansatz besteht darin, in der Art und Weise zu partitionieren, wie in dem Kommentar, den ich zu der Frage gemacht habe, beschrieben wurde. Wenn Ihre Tabelle partitioniert wurde so etwas wie /basepath/ts=yyyymmddhhmm/*.parquet verwendet, dann ist die Antwort einfach:

spark.read.parquet("hdfs:///basepath") 
    .where('ts >= 201710060000L && 'ts <= 201711030000L) 

Der Grund, warum es das Hinzufügen Stunde & Minuten wert ist, dass man dann generischen Code schreiben kann, die Intervalle unabhängig von Griffen, ob Sie die Daten partitioniert nach Woche, Tag, Stunde oder alle 15 Minuten. Tatsächlich können Sie sogar Daten mit unterschiedlicher Granularität in derselben Tabelle verwalten, z. B. werden ältere Daten auf höheren Ebenen aggregiert, um die Gesamtzahl der Partitionen zu reduzieren, die erkannt werden müssen.

2

Bearbeitet, um mehrere Ladepfade zum Adresskommentar hinzuzufügen.

Sie können eine Regex-Syntax verwenden.

val dataset = spark 
    .read 
    .format("parquet") 
    .option("filterPushdown", "true") 
    .option("basePath", "hdfs:///basepath/") 
    .load("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9],[1-3][0-9]}/*/", 
    "hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/") 

How to use regex to include/exclude some input files in sc.textFile?

Hinweis: Sie müssen nicht die X=* können Sie einfach tun *, wenn Sie alle Tage wollen, Monate usw.

Sie sollten wahrscheinlich auch über etwas zu lesen tun Predicate Pushdown (dh filterPushdown wurde auf "True" gesetzt).

Schließlich werden Sie die basepath Option oben bemerkt, ist der Grund für die hier gefunden werden kann: Prevent DataFrame.partitionBy() from removing partitioned columns from schema

+0

Dies ist keine allgemeine Lösung für das Problem . Tatsächlich gibt es keine einfache allgemeine Lösung, um ein Intervall von Daten mit dieser Partitionierungsstrategie abzufragen. Wie würden Sie beispielsweise diesen Ansatz für die Abfrage zwischen "2017-10-06" und "2017-11-03" verwenden? – Sim

+0

Einige gute Informationen in Ihrer Antwort unten. Sie benötigen jedoch nicht die in Ihrer Antwort angezeigte Verbindung (siehe oben). Also muss ich sagen, ich denke, es ist viel verallgemeinerbarer, als du es glaubst, obwohl es einige nicht so hübsche Hilfsfunktionen erfordert. Aber für eine Menge, wenn nicht die Mehrheit der Systeme ist es das wert. Weil, wie Sie anerkennen, Partition Discovery ist nicht billig im Maßstab. Partielle Entdeckung ist nur im Maßstab besser. Obwohl ich dem zustimme, würde eine bessere Partitionierungsstrategie helfen. Ich benutze etwas mehr wie das, was Sie unten haben, macht die Hilfsfunktionen und den obigen Code trivial. – r0bb23

Verwandte Themen