Ich habe eine Reihe von großen komprimierten JSON-Dateien mit verschachtelten Schlüssel-Wert-Paaren. Es gibt ungefähr 70-80 Schlüssel (und Unterschlüssel) im json-Objekt, aber ich bin nur an wenigen Schlüsseln interessiert. Ich wollte die JSON-Dateien mit Spark SQL abfragen, nur die Schlüssel-Wert-Paare heraussuchen, die mich interessieren, und sie in eine Reihe von CSV-Dateien ausgeben. Es dauert ungefähr 5 Minuten, um eine komprimierte JSON-Datei mit einer Größe von 170 MB zu verarbeiten. Ich frage mich nur, ob es irgendeinen Weg geben könnte, diesen Prozess zu optimieren. Oder gibt es für diesen Job bessere Tools als Spark? Vielen Dank!schnelle Möglichkeit zur Verarbeitung von JSON-Datei in Spark
Hier ist eine Momentaufnahme des scala Code, den ich verwendet wurde:
val data = sc.textFile("abcdefg.txt.gz")
// repartition the data
val distdata = data.repartition(10)
val dataDF = sqlContext.read.json(distdata)
// register a temp table
dataDF.registerTempTable("pixels")
// query the json file, grab columns of interest
val query =
"""
|SELECT col1, col2, col3, col4, col5
|FROM pixels
|WHERE col1 IN (col1_v1, col1_v2, ...)
""".stripMargin
val result = sqlContext.sql(query)
// reformat the timestamps
val result2 = result.map(
row => {
val timestamp = row.getAs[String](0).stripSuffix("Z").replace("T"," ")
Row(timestamp, row(1), row(2), row(3), row(4), row(5), row(6), row(7),
row(8), row(9), row(10), row(11))
}
)
// output the result to a csv and remove the square bracket in each row
val output_file = "/root/target"
result2.map(row => row.mkString(",")).saveAsTextFile(output_file)
I; m die meiste Zeit zu raten auf dem Lese geht/Dekomprimierung und Schreiben, das nicht sein kann parallelisiert. Fügen Sie den Overhead für das Verteilen der Jobs und das Sammeln des Ergebnisses hinzu, und meine Vermutung ist, dass Spark Sie hier verlangsamt. Und warum die Repartition der nicht geparsten Zeilen? –
Wenn Sie nur Ihre Daten transformieren möchten. Sie benötigen nicht die gesamte SparkSQL-Funktionalität. Bleiben Sie einfach bei RDDs. Verwenden Sie eine schnelle json lib wie PlayJson, um json zu parsen. Ändern Sie es und dumpen Sie es ab. –
Und bitte nicht auf RDDs neu partitionieren, wenn nicht explizit erforderlich. –