2015-01-28 4 views
7

Ich bin relativ neu in Apache Spark, und ich möchte eine einzige RDD in Python aus Listen von Wörterbüchern erstellen, die in mehreren JSON-Dateien gespeichert sind (jeweils gezippt und enthält eine Liste von Wörterbüchern). Die resultierende RDD würde dann grob gesagt alle Listen von Wörterbüchern enthalten, die in einer einzigen Liste von Wörterbüchern kombiniert sind. Ich konnte dies in der Dokumentation (https://spark.apache.org/docs/1.2.0/api/python/pyspark.html) nicht finden, aber wenn ich es verpasst habe, lass es mich wissen.Wie man das Verzeichnis von JSON-Dateien in Apache Spark in Python lädt

Bisher habe ich versucht, die JSON-Dateien zu lesen und die kombinierte Liste in Python zu erstellen, dann benutze sc.parallelize(), aber der gesamte Datensatz ist zu groß, um in den Speicher zu passen, also ist dies keine praktische Lösung. Es scheint, als hätte Spark eine clevere Lösung für diesen Anwendungsfall, aber ich bin mir dessen nicht bewusst.

Wie kann ich eine einzelne RDD in Python erstellen, die die Listen in allen JSON-Dateien enthält?

Ich sollte auch erwähnen, dass ich Spark SQL nicht verwenden möchte. Ich würde gerne Funktionen wie Karte, Filter usw. verwenden, wenn das möglich ist.

Antwort

5

Nach was tgpfeiffer in ihrer Antwort und Kommentar erwähnt, ist hier, was ich hat getan.

Zuerst mussten die JSON-Dateien formatiert werden, so dass sie ein Wörterbuch pro Zeile und nicht eine einzelne Wörterbücherliste hatten. Dann war es so einfach wie:

my_RDD_strings = sc.textFile(path_to_dir_with_JSON_files) 
my_RDD_dictionaries = my_RDD_strings.map(json.loads) 

Wenn es ein besser oder effizienter Weg, dies zu tun, lass es mich wissen, aber das scheint zu funktionieren.

2

Sie können sqlContext.jsonFile() verwenden, um ein SchemaRDD (RDD [Zeile] plus ein Schema) zu erhalten, das dann mit Spark SQL verwendet werden kann. Oder sehen Sie Loading JSON dataset into Spark, then use filter, map, etc für eine Nicht-SQL-Verarbeitungspipeline. Ich denke, Sie müssen die Dateien entpacken, und Spark kann nur mit Dateien arbeiten, bei denen jede Zeile ein einzelnes JSON-Dokument ist (d. H. Keine mehrzeiligen Objekte möglich).

+0

Dank für die Beantwortung. Ich hätte erwähnen sollen, dass ich Spark SQL nicht verwenden möchte, ich möchte eine Nicht-SQL-Verarbeitungspipeline verwenden, wie in der Frage, auf die Sie verwiesen haben. Ich werde meine ursprüngliche Frage aktualisieren. Die Antwort auf die Frage, auf die Sie verwiesen haben, scheint in Scala zu liegen, nicht in Python. Nochmals vielen Dank für Ihre Hilfe! – Brandt

+1

Richtig, es ist in Scala, aber die Idee kann auf Ihr Problem angewendet werden: Laden Sie den Eingabedatensatz mit 'sparkContext.textFile()' (was eigentlich scheint, gziped Dateien zu unterstützen] (http://stackoverflow.com/questions/16302385/gzip-support-in-spark)), analysieren Sie dann die String-Zeilen mit einem Parser Ihrer Wahl (z. B. [das json-Modul] (https://docs.python.org/2/library/json.html))), dann bearbeiten Sie, wie Sie wünschen. – tgpfeiffer

+0

Danke, das hat funktioniert! Der Schlüsselschritt war die Verwendung der Kartenfunktion auf json.loads. Ich werde genau das posten, was ich als Antwort getan habe. Vielen dank für Deine Hilfe. – Brandt

1

Sie können ein Verzeichnis von Dateien mit textFile in eine einzelne RDD laden und es unterstützt auch Platzhalter. Das würde dir keine Dateinamen geben, aber du scheinst sie nicht zu brauchen.

Sie können Spark-SQL verwenden, während grundlegende Transformationen wie Karte unterhalten, Filter etc. SchemaRDD ist auch ein RDD (in Python, sowie Scala)

1

Zur Liste der Json aus einer Datei als RDD laden:

def flat_map_json(x): return [each for each in json.loads(x[1])] 
rdd = sc.wholeTextFiles('example.json').flatMap(flat_map_json) 
Verwandte Themen