Wenn ich Spark SQL-Anweisung des Formulars SELECT [...] UNION ALL SELECT [...]
habe, werden die zwei SELECT
Anweisungen parallel ausgeführt werden? In meinem spezifischen Anwendungsfall fragen die zwei SELECT
s zwei verschiedene Datenbanktabellen ab. Im Gegensatz zu dem, was ich erwartet hätte, scheint das Spark-UI darauf hinzuweisen, dass die beiden SELECT
-Anweisungen nacheinander ausgeführt werden.Wird UNION ALL von zwei SELECTs über verschiedene Tabellen parallel ausgeführt?
== Update 1 ==
Unten ist der physische Plan, wie in der Spark-Benutzeroberfläche angezeigt:
== Physical Plan ==
*Sort [avg_tip_pct#655 DESC NULLS LAST], true, 0
+- Exchange rangepartitioning(avg_tip_pct#655 DESC NULLS LAST, 4)
+- *HashAggregate(keys=[neighborhood#163], functions=[avg(tip_pct#654)], output=[neighborhood#163, avg_tip_pct#655])
+- Exchange hashpartitioning(neighborhood#163, 4)
+- *HashAggregate(keys=[neighborhood#163], functions=[partial_avg(tip_pct#654)], output=[neighborhood#163, sum#693, count#694L])
+- *Project [neighborhood#163, (tip_amount#513/total_amount#514) AS tip_pct#654]
+- InMemoryTableScan [neighborhood#163, tip_amount#513, total_amount#514]
+- InMemoryRelation [pickup_latitude#511, pickup_longitude#512, tip_amount#513, total_amount#514, neighborhood#163, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, neighborhood#163, index#165]
+- *Project [pickup_latitude#301, index#165, pickup_longitude#300, neighborhood#163, total_amount#313, point#524, polygon#164, tip_amount#310]
+- *SortMergeJoin [curve#578], [curve#580], Inner, ((relation#581 = Within) || Within(point#524, polygon#164))
:- *Sort [curve#578 ASC NULLS FIRST], false, 0
: +- Exchange hashpartitioning(curve#578, 4)
: +- Generate inline(indexer(point#524, 30)), true, false, [curve#578, relation#579]
: +- Union
: :- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, pointconverter(pickup_longitude#300, pickup_latitude#301) AS point#524]
: : +- *Filter ((isnotnull(total_amount#313) && payment_type#306 IN (CREDIT,CRD,1)) && (total_amount#313 > 200.0))
: : +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2014},[email protected]) [pickup_latitude#301,payment_type#306,pickup_longitude#300,total_amount#313,tip_amount#310] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
: +- *Project [pickup_latitude#436, pickup_longitude#435, tip_amount#445, total_amount#448, pointconverter(pickup_longitude#435, pickup_latitude#436) AS point#524]
: +- *Filter ((isnotnull(total_amount#448) && payment_type#441 IN (CREDIT,CRD,1)) && (total_amount#448 > 200.0))
: +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2015},[email protected]) [payment_type#441,pickup_longitude#435,pickup_latitude#436,total_amount#448,tip_amount#445] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point...
+- *Sort [curve#580 ASC NULLS FIRST], false, 0
+- Exchange hashpartitioning(curve#580, 4)
+- Generate inline(index#165), true, false, [curve#580, relation#581]
+- InMemoryTableScan [neighborhood#163, polygon#164, index#165]
+- InMemoryRelation [neighborhood#163, polygon#164, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas)
+- *Project [UDF:metadata_string(metadata#13, neighborhood) AS neighborhood#163, polygon#12, index#15]
+- InMemoryTableScan [metadata#13, polygon#12, index#15]
+- InMemoryRelation [point#10, polyline#11, polygon#12, metadata#13, valid#14, index#15], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `neighborhoods`
+- *Scan GeoJSONRelation(gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson,Map(type -> geojson, magellan.index -> true, magellan.index.precision -> 30, path -> gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson)) [point#10,polyline#11,polygon#12,metadata#13,valid#14,index#15] ReadSchema: struct<point:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,x:double,y:double>,p...
Notiere die Vereinigung der beiden SELECT
s in Form von Scans auf BigQueryTableRelation
. Diese scheinen nacheinander ausgeführt zu werden.
Jede der BigQuery-Selektionen wird in einem separaten Job (jeweils mit einer einzelnen Phase) ausgeführt - sequenziell. Ich betreibe einen 5-Knoten-YARN-Cluster mit jeweils 4 CPUs und 26 GB RAM. Ich frage mich, ob die Tatsache, dass ich eine benutzerdefinierte BigQuery-Datenquelle habe, hier eine Rolle spielt. Ich würde erwarten, dass es nicht sollte. In jedem Fall als Referenz, kann die Datenquelle hier: github.com/miraisolutions/spark-bigquery
== Update 2 ==
im Funken lügt ich folgenden Protokolleintrag sehen:
17/12/19 14:36:24 INFO SparkSqlParser: Parsing command: SELECT `pickup_latitude` AS `pickup_latitude`, `pickup_longitude` AS `pickup_longitude`, `tip_amount` AS `tip_amount`, `total_amount` AS `total_amount` FROM ((SELECT * FROM `trips2014`) UNION ALL (SELECT * FROM `trips2015`)) `ggcyamhubf` WHERE (`payment_type` IN ("CREDIT", "CRD", "1"))
Funken optimiert diese Abfrage und schiebt die Prädikate bis auf die Datenquelle (BigQuery in diesem Fall). Die entsprechenden BigQuery-Jobs scheinen jedoch vollständig sequenziell ausgeführt zu werden, d. H. Der zweite Job wird nur ausgelöst, sobald der erste abgeschlossen ist.
Könnte das sein, dass BigQuery funktioniert, wenn zwei parallele Abfragen vom selben Client nacheinander ausgeführt werden? Können Sie den Abfrageplan über die Webbenutzeroberfläche anhängen? –
Ich habe den Abfrageplan gerade nicht verfügbar, aber von den Spark-Protokollen sieht es definitiv so aus, als würde '' BuildScan' '' DataSource'' (über 'RelationProvider') sequenziell aufgerufen, was bedeuten würde, dass die sequentielle Verarbeitung ein Ergebnis von Sparks ist Ausführungsplan. Ich sehe derzeit nichts, was bedeuten würde, dass es sich um eine BigQuery-Einschränkung handelt. –
Zu viel raten ... zu wenig Daten, die uns helfen würden. Der Grund für die mögliche sequenzielle Ausführung ist nur die Anzahl der CPUs, die zur Verarbeitung aller Tasks verfügbar sind, die Teil der Abfrage sind. Aus diesem Grund benötigen wir dringend die Ausführungsstatistiken von der Web-Benutzeroberfläche. Warum archivieren Sie die Protokolle nicht -> https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact? –