Ich habe eine Anforderung, XML-Dateien in einem S3-Ordner gestreamt zu verarbeiten. Derzeit habe ich es wie folgt implementiert.Spark Streaming XML-Dateien
Zuerst Lesen von Dateien mit filestream Spark
val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())
Für jede RDD, überprüfen, ob eine Datei
gelesen wurdeif (data.count() !=0)
die Zeichenfolge Schreiben an eine neue HDFS Verzeichnis
Erstellen Sie einen Datenrahmen aus dem obigen HDFS di
Pfarrhausval loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir)
einige Verarbeitung auf Datenrahmen tun und speichern als JSON
loaddata.write.mode("append").json("s3://mybucket/somefolder")
Irgendwie habe ich das Gefühl, dass der obige Ansatz sehr ineffizient ist und ehrlich gesagt ziemlich Schule jungenhaft. Gibt es eine bessere Lösung? Jede Hilfe würde sehr geschätzt werden.
Eine weitere Frage: Wie bearbeitet man Felder (nicht Spalten) in einem Datenrahmen? Ich habe eine komplexe verschachtelte XML und wenn ich die oben beschriebene Methode verwende, bekomme ich einen Dataframe mit 9 Spalten und 50 ungeraden inneren Struct Arrays. Bis auf die Notwendigkeit, bestimmte Feldnamen zu trimmen, ist das in Ordnung. Gibt es eine Möglichkeit, dies zu erreichen, ohne den Datenrahmen zu explodieren, da ich dieselbe Struktur wieder aufbauen muss?
Vielen Dank. Mein Ziel-Env ist EMR-Stack mit Spark 2.0.1. Ich werde Ihren Vorschlag auf einer EMR-Box versuchen. – Vamsi
pls vote-up/akzeptieren, wenn Sie mit der oben genannten Lösung in Ordnung sind. –