2017-06-28 8 views
1

Ich habe herausgefunden, wie man Dateien aus einem S3-Verzeichnis in meine pyspark-Shell (und Skript) liest, z. unter Verwendung:Lesen von S3-Dateien im verschachtelten Verzeichnis über Spark EMR

rdd = sc.wholeTextFiles('s3n://bucketname/dir/*') 

Aber, während die toll in dass ich alle Dateien in einem Verzeichnis lesen, mag ich jede einzelne Datei aus allen Verzeichnissen lesen.

Ich will sie nicht flach machen oder alles auf einmal laden, weil ich Speicherprobleme haben werde.

Stattdessen muss ich automatisch alle Dateien aus jedem Unterverzeichnis stapelweise laden. Ist das möglich?

Hier ist meine Verzeichnisstruktur:

S3_bucket_name -> Jahr (2016 oder 2017) -> Monat (max 12 Ordner) -> Tag (max 31 Ordner) -> Unter Tag Ordner (max 30; im Grunde nur teilte das Sammeln jeden Tag).

So etwas wie dies, außer es für alle 12 Monate und bis zu 31 Tage ...

BucketName 
| 
| 
|---Year(2016) 
|  | 
|  |---Month(11) 
|  |  | 
|  |  |---Day(01) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |  |---Day(02) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |---Month(12) 
| 
|---Year(2017) 
|  | 
|  |---Month(1) 
|  |  | 
|  |  |---Day(01) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |  |---Day(02) 
|  |  |  | 
|  |  |  |---Sub-folder(01) 
|  |  |  | 
|  |  |  |---Sub-folder(02) 
|  |  |  | 
|  |---Month(2) 

Jeder Pfeil oben gehen werde stellt eine Gabel. z.B. Ich sammle Daten seit 2 Jahren, also gibt es 2 Jahre im "Jahr" Gabel. Dann für jedes Jahr, bis zu maximal 12 Monaten, und dann für jeden Monat bis zu 31 mögliche Tagesordner. Und an jedem Tag wird es bis nur zu 30 Ordnern sein, weil ich es aufgeteilt, dass die Art und Weise nach oben ...

Ich hoffe, das macht Sinn ...

ich in einem anderen Beitrag gesucht (read files recursively from sub directories with spark from s3 or local filesystem), wo ich glauben sie vorgeschlagen Platzhalter verwenden, so etwas wie:

rdd = sc.wholeTextFiles('s3n://bucketname/*/data/*/*') 

Aber das Problem mit diesem ist es versucht, einen gemeinsamen Ordner unter den verschiedenen Unterverzeichnissen zu finden - in diesem Fall gibt es keine Garantien gibt, und ich würde einfach alles brauchen.

jedoch auf dieser Argumentationslinie, dachte ich, was ist, wenn ich tat ..:

rdd = sc.wholeTextFiles("s3n://bucketname/*/*/*/*/*') 

Aber das Problem ist, dass ich jetzt OutOfMemory Fehler bekommen, wahrscheinlich, weil es alles auf einmal geladen ist und ausgeflippt.

Idealerweise, was würde ich in der Lage sein wird, dies zu tun:

Gehen Sie auf die Unterverzeichnisebene des Tages und liest die in, so z.B.

Erst lesen in 2016/12/01, dann 2016/12/02, bis 2012/12/31 und dann 2017/01/01, dann 2017/01/02, ... 2017/01/31 und so weiter.

Auf diese Weise, anstatt fünf Wildcards (*), wie ich oben tat, würde ich irgendwie wissen, durch jedes Unterverzeichnis auf der Ebene von "Tag" zu sehen.

Ich dachte an ein Python-Wörterbuch zu verwenden, um den Dateipfad zu jedem der Tage zu spezifizieren, aber das scheint eine ziemlich umständliche Annäherung.Was ich damit meine ist, wie folgt:

file_dict = { 
    0:'2016/12/01/*/*', 
    1:'2016/12/02/*/*', 
    ... 
    30:'2016/12/31/*/*', 
} 

im Grunde für alle Ordner und dann durch sie iterieren und laden sie so etwas wie dies bei der Verwendung:

sc.wholeTextFiles('s3n://bucketname/' + file_dict[i]) 

Aber ich will nicht Geben Sie alle diese Pfade manuell ein. Ich hoffe, das Sinn gemacht ...

EDIT:

Ein anderer Weg, um die Frage zu stellen ist, wie ich die Dateien von einer verschachtelten Unterverzeichnisstruktur in einer dosierten Weise lesen Sie? Wie kann ich alle möglichen Ordnernamen in meinem S3-Bucket in Python aufzählen? das würde vielleicht helfen ...

EDIT2:

Die Struktur der Daten in jedem meiner Dateien wie folgt:

{json object 1}, 
{json object 2}, 
{json object 3}, 
... 
{json object n}, 

Denn es „true json“, es zu sein, entweder nur erforderlich, wie das oben zu sein, ohne ein Komma am Ende, oder so etwas wie diese (beachten sie eckige Klammern, und das Fehlen des abschließenden Kommas:

[ 
    {json object 1}, 
    {json object 2}, 
    {json object 3}, 
    ... 
    {json object n} 
] 

der Grund, warum ich es in PySpark ganz tat, wie ein Skript, das ich einreiche, ist, weil ich mich gezwungen habe, diese Formatierungsquark manuell zu handhaben. Wenn ich Hive/Athena benutze, bin ich nicht sicher, wie ich damit umgehen soll.

Antwort

1

Warum verwenden Sie nicht Hive, oder noch besser, Athena? Diese werden sowohl Tabellen als auch Dateisysteme bereitstellen, um Ihnen Zugriff auf alle Daten zu geben. Dann können Sie diese in erfassen

Alternativ Spark, ich glaube, Sie auch HiveQL in Funken können eine tempTable ontop Ihrer Dateisystempfad einzurichten, und es wird alles als Hive Tabelle registrieren, die Sie ausführen können, SQL gegen. Es ist eine Weile her, seit ich das gemacht habe, aber es ist definitiv machbar

+0

Ich werde das untersuchen. Ich war mir nicht sicher, weil ich nicht wusste, ob ich s3 als Dateisystem behandeln könnte, so wie HDFS funktioniert, aber ich vermute, mit Hive oder Athena sollte ich immer noch auf alles im Eimer zugreifen können. Ich werde es überprüfen. Danke für die Idee. – shishy

+0

S3 infact ist als HDFS direkt EMR ausgesetzt. Es ist etwas, das EMRFS genannt wird. Wenn Sie Hadoop-Befehle ausführen, können Sie 'hadoop fs -copyToLocal s3: //bucket/file.txt ./' zum Beispiel (Syntax ist möglicherweise nicht ganz korrekt, war eine Weile) – Henry

+0

Um ehrlich zu sein, ist meine Hive Erfahrung ziemlich rudimentär aber ich bin mir nicht sicher, wie das geht, da meine Daten grundsätzlich so strukturiert sind: Jede Datei hat mehrere JSON-Objekte (Tweets). Aber die Formatierung jedes JSON ist nicht perfekt, da bei Multi-JSON (mit Zeilenbegrenzung) am Ende normalerweise kein Komma angezeigt wird. Nur eine neue Zeile. In meinem Fall ist die Struktur jeder Datei jedoch wie in der neuen Bearbeitung beschrieben, die ich dem OP hinzufügen werde. – shishy

Verwandte Themen