2017-02-10 2 views
0

Ist Partition Bereinigung aktiviert für zwischengespeicherte TempTables in Apache Funke? Wenn ja, wie konfiguriere ich es?Spark SQL-Partition Bereinigung für eine zwischengespeicherte Tabelle

Meine Daten sind eine Reihe von Sensormesswerten in verschiedenen Installationen, eine Zeile enthält Installationsname, Tag, Zeitstempel und Wert.

rdd.toDF("installationName", "tag", "timestamp", "value") 
    .repartition($"installationName", $"tag") 
    .write.partitionBy("installationName","tag").mode("overwrite").parquet(config.output) 

Ich las diese Daten mit dem folgenden Befehl in eine SQL-Tabelle mit Spark-HiveContext:

val parquet = hc.read.parquet("/path_to_table/tablename") 
parquet.registerTempTable("tablename") 

Nun, wenn ich

Ich habe die Daten in Parkett-Format mit den folgenden Befehlen geschrieben Führen Sie eine SQL-Abfrage für diese Tabelle, es Partition Bereinigung wie erwartet durchführt:

Und die Abfrage dauert ungefähr 8 Sekunden. Aber wenn ich die Tabelle im Speicher zwischengespeichert und dann die gleiche Abfrage ausführen, dauert es immer um 50 Sekunden:

hc.sql("CACHE TABLE tablename") 
hc.sql("select * from tablename where installationName = 'XXX' and tag = 'YYY'") 

ich derzeit Spark-1.6.1 mit bin.

+0

Hallo, vielen Dank für Ihren Kommentar. In der Tat mache ich eine Neupartitionierung, bevor ich Daten auf Parkett schreibe. Ich habe auch die obige Abfrage mit der Neupartitionierung getestet und es ist mit einer Abfragezeit von 20 s effizienter, aber es ist immer noch langsamer als das Lesen von den Parkettdateien ohne Caching. Mein Zweck ist es, zu vermeiden, auf Parkett-Dateien insgesamt zu schreiben. Könnten Sie vielleicht eine Quelle bereitstellen? Woher wissen Sie, dass das Löschen von Partitionen nach dem Caching nicht unterstützt wird? Wenn Sie hier eine Antwort schreiben würden, könnte ich das akzeptieren. –

+0

Korrektur, Caching im Speicher reduziert die Abfragezeit auf weniger als 1 Sekunde, was natürlich schon akzeptabel ist. Ich frage mich, ob es skaliert: das ist nur ein Teil meiner Dastas, ich habe tatsächlich über 200 mal mehr und wächst ständig, also je mehr Daten ich habe, desto mehr Zeit braucht man durch alle Partitionen zu scannen . –

Antwort

0

Der Grund, warum es passiert, liegt an der Funktionsweise von Cache in Spark.

Wenn Sie die Ausführung eine Art von Prozess zu einem Datenrahmen, RDD oder DataSet rufen unter einem Plan zu sehen ist:

val df = sc.parallelize(1 to 10000).toDF("line") 
df.withColumn("new_line", col("line") * 10).queryExecution 

Der Befehl queryExecution Rückkehr zu Ihnen dem Plan. Siehe den logischen Plan unten des Codes:

== Parsed Logical Plan == 
Project [*,('line * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#7] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at 

== Optimized Logical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at 

== Physical Plan == 
Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#7] 
+- Scan ExistingRDD[_1#4] 

In diesem Fall können Sie alle Prozesse sehen, die Ihr Code tun wird. Wenn Sie eine cache Funktion wie folgt aufrufen:

df.withColumn("new_line", col("line") * 10).cache().queryExecution 

Das Ergebnis wird so aussehen:

== Parsed Logical Plan == 
'Project [*,('line * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Analyzed Logical Plan == 
line: int, new_line: int 
Project [line#5,(line#5 * 10) AS new_line#8] 
+- Project [_1#4 AS line#5] 
    +- LogicalRDD [_1#4], MapPartitionsRDD[9] at intRddToDataFrameHolder at <console>:34 

== Optimized Logical Plan == 
InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Project [_1#4 AS line#5,(_1#4 * 10) AS new_line#8], None 

== Physical Plan == 
InMemoryColumnarTableScan [line#5,new_line#8], InMemoryRelation [line#5,new_line#8], true, 10000, StorageLevel(true, true, false, true, 1), Pro... 

Diese Ausführung Ihnen die Ausführung eines InMemoryRelation in optmized logischen Plan zurück, wird dies eine sparen Struktur der Daten in Ihrem Speicher, oder wenn Ihre Daten wirklich groß sind, wird es auf die Festplatte ausgelaufen.

Die Zeit, um dies in Ihrem Cluster zu speichern, braucht Zeit, es wird ein wenig langsam in der ersten Ausführung, aber wenn Sie erneut auf die gleichen Daten an anderer Stelle zugreifen müssen, wird der DF oder der RDD gespeichert Spark wird die Ausführung nicht erneut anfordern.

+0

Vielen Dank für Ihre Antwort! Beim Speichertabellen-Caching handelt es sich um eine eifrige Operation. Dies bedeutet, dass die Daten bereits zwischengespeichert werden, wenn ich meine Abfrage zum ersten Mal ausführe. Zwischenspeichern der Daten dauert hier tatsächlich 500 Sekunden, und tatsächlich wird nach dem Cache die Leistung der Abfrage verbessert, es dauert jetzt nur noch 50 Sekunden, um alle Partitionen zu durchsuchen. Egal, wie oft ich die Abfrage ausführe, die Leistung ist immer ungefähr gleich. Ihre Antwort bezieht sich nicht auf meine Frage, die nach dem Cahcing auf die Partition-Beschneidung abzielt. –

Verwandte Themen