Wenn Sie unbedingt auf diese Partitionierungsstrategie haben zu bleiben, hängt die Antwort darauf, ob Sie bereit sind, Partition zu tragen Entdeckungskosten oder nicht.
Wenn Sie möchten, dass Spark alle Partitionen erkennt, die nur einmal ausgeführt werden müssen (bis Sie neue Dateien hinzufügen), können Sie den Basispfad laden und dann mithilfe der Partitionsspalten filtern.
Wenn Sie nicht möchten, dass Spark alle Partitionen erkennt, z. B. weil Sie Millionen von Dateien haben, ist die einzige effiziente allgemeine Lösung, das Intervall zu unterbrechen, in dem Sie leicht abfragen können Verwenden von @ r0bb23 Ansatz und dann Vereinigung dann zusammen.
Wenn Sie das Beste aus beiden oben genannten Fällen möchten und ein stabiles Schema haben, können Sie die Partitionen im Metastore registrieren, indem Sie eine externe partitionierte Tabelle definieren. Tun Sie dies nicht, wenn Sie erwarten, dass sich Ihr Schema weiterentwickelt, da Tabellen mit Metastasen die Schemaentwicklung zu diesem Zeitpunkt ziemlich schlecht verwalten.
Zum Beispiel zur Abfrage zwischen 2017-10-06
und 2017-11-03
Sie tun würden:
// With full discovery
spark.read.parquet("hdfs:///basepath")
.where('Year === 2017 && (
('Month === 10 && 'Day >= 6') || ('Month === 11 && 'Day <= 3')
))
// With partial discovery
val df1 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=10/Day={0[6-9], [1-3][0-9]}/*/")
val df2 = spark.read.option("basePath", "hdfs:///basepath/")
.parquet("hdfs:///basepath/Year=2017/Month=11/Day={0[1-3]}/*/")
val df = df1.union(df2)
Schreiben von generischem Code für diese sicherlich möglich ist, aber ich habe es nicht auf. Der bessere Ansatz besteht darin, in der Art und Weise zu partitionieren, wie in dem Kommentar, den ich zu der Frage gemacht habe, beschrieben wurde. Wenn Ihre Tabelle partitioniert wurde so etwas wie /basepath/ts=yyyymmddhhmm/*.parquet
verwendet, dann ist die Antwort einfach:
spark.read.parquet("hdfs:///basepath")
.where('ts >= 201710060000L && 'ts <= 201711030000L)
Der Grund, warum es das Hinzufügen Stunde & Minuten wert ist, dass man dann generischen Code schreiben kann, die Intervalle unabhängig von Griffen, ob Sie die Daten partitioniert nach Woche, Tag, Stunde oder alle 15 Minuten. Tatsächlich können Sie sogar Daten mit unterschiedlicher Granularität in derselben Tabelle verwalten, z. B. werden ältere Daten auf höheren Ebenen aggregiert, um die Gesamtzahl der Partitionen zu reduzieren, die erkannt werden müssen.
Wenn Sie einfache Bereichsabfragen auf Partitionen wünschen, ist die beste Lösung, eine bessere Partitionierungsstrategie zu verwenden, bei der Zeit auf einer einzelnen Achse, z. B. '/ tbl/ts = yyyymmddhhmm/*.Es gibt einen Abschnitt zu diesem Thema in https://spark-summit.org/east-2017/events/bulletproof-jobs-patterns-for-large-scale-spark-processing/ – Sim