2016-11-18 3 views
2

Ich habe eine Anforderung, XML-Dateien in einem S3-Ordner gestreamt zu verarbeiten. Derzeit habe ich es wie folgt implementiert.Spark Streaming XML-Dateien

Zuerst Lesen von Dateien mit filestream Spark

val data = ssc.fileStream[LongWritable, Text, TextInputFormat]("s3://myfolder/",(t: org.apache.hadoop.fs.Path) => true, newFilesOnly = true, hadoopConf).map(_._2.toString())

Für jede RDD, überprüfen, ob eine Datei

gelesen wurde
if (data.count() !=0) 

die Zeichenfolge Schreiben an eine neue HDFS Verzeichnis

Erstellen Sie einen Datenrahmen aus dem obigen HDFS di

Pfarrhaus
val loaddata = sqlContext.read.format("com.databricks.spark.xml").option("rowTag", "Trans").load(sdir) 

einige Verarbeitung auf Datenrahmen tun und speichern als JSON

loaddata.write.mode("append").json("s3://mybucket/somefolder") 

Irgendwie habe ich das Gefühl, dass der obige Ansatz sehr ineffizient ist und ehrlich gesagt ziemlich Schule jungenhaft. Gibt es eine bessere Lösung? Jede Hilfe würde sehr geschätzt werden.

Eine weitere Frage: Wie bearbeitet man Felder (nicht Spalten) in einem Datenrahmen? Ich habe eine komplexe verschachtelte XML und wenn ich die oben beschriebene Methode verwende, bekomme ich einen Dataframe mit 9 Spalten und 50 ungeraden inneren Struct Arrays. Bis auf die Notwendigkeit, bestimmte Feldnamen zu trimmen, ist das in Ordnung. Gibt es eine Möglichkeit, dies zu erreichen, ohne den Datenrahmen zu explodieren, da ich dieselbe Struktur wieder aufbauen muss?

Antwort

1

Wenn Sie Spark-2.0 verwenden Sie in der Lage sein, um es mit strukturiertem Streaming funktioniert:

val inputDF = spark.readStream.format("com.databricks.spark.xml") 
    .option("rowTag", "Trans") 
    .load(path) 
+0

Vielen Dank. Mein Ziel-Env ist EMR-Stack mit Spark 2.0.1. Ich werde Ihren Vorschlag auf einer EMR-Box versuchen. – Vamsi

+0

pls vote-up/akzeptieren, wenn Sie mit der oben genannten Lösung in Ordnung sind. –