2017-12-15 1 views
2

Wenn ich Spark SQL-Anweisung des Formulars SELECT [...] UNION ALL SELECT [...] habe, werden die zwei SELECT Anweisungen parallel ausgeführt werden? In meinem spezifischen Anwendungsfall fragen die zwei SELECT s zwei verschiedene Datenbanktabellen ab. Im Gegensatz zu dem, was ich erwartet hätte, scheint das Spark-UI darauf hinzuweisen, dass die beiden SELECT-Anweisungen nacheinander ausgeführt werden.Wird UNION ALL von zwei SELECTs über verschiedene Tabellen parallel ausgeführt?

== Update 1 ==

Unten ist der physische Plan, wie in der Spark-Benutzeroberfläche angezeigt:

== Physical Plan == 
*Sort [avg_tip_pct#655 DESC NULLS LAST], true, 0 
+- Exchange rangepartitioning(avg_tip_pct#655 DESC NULLS LAST, 4) 
    +- *HashAggregate(keys=[neighborhood#163], functions=[avg(tip_pct#654)], output=[neighborhood#163, avg_tip_pct#655]) 
     +- Exchange hashpartitioning(neighborhood#163, 4) 
     +- *HashAggregate(keys=[neighborhood#163], functions=[partial_avg(tip_pct#654)], output=[neighborhood#163, sum#693, count#694L]) 
      +- *Project [neighborhood#163, (tip_amount#513/total_amount#514) AS tip_pct#654] 
       +- InMemoryTableScan [neighborhood#163, tip_amount#513, total_amount#514] 
        +- InMemoryRelation [pickup_latitude#511, pickup_longitude#512, tip_amount#513, total_amount#514, neighborhood#163, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
          +- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, neighborhood#163, index#165] 
           +- *Project [pickup_latitude#301, index#165, pickup_longitude#300, neighborhood#163, total_amount#313, point#524, polygon#164, tip_amount#310] 
           +- *SortMergeJoin [curve#578], [curve#580], Inner, ((relation#581 = Within) || Within(point#524, polygon#164)) 
            :- *Sort [curve#578 ASC NULLS FIRST], false, 0 
            : +- Exchange hashpartitioning(curve#578, 4) 
            :  +- Generate inline(indexer(point#524, 30)), true, false, [curve#578, relation#579] 
            :  +- Union 
            :   :- *Project [pickup_latitude#301, pickup_longitude#300, tip_amount#310, total_amount#313, pointconverter(pickup_longitude#300, pickup_latitude#301) AS point#524] 
            :   : +- *Filter ((isnotnull(total_amount#313) && payment_type#306 IN (CREDIT,CRD,1)) && (total_amount#313 > 200.0)) 
            :   :  +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2014},[email protected]) [pickup_latitude#301,payment_type#306,pickup_longitude#300,total_amount#313,tip_amount#310] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point... 
            :   +- *Project [pickup_latitude#436, pickup_longitude#435, tip_amount#445, total_amount#448, pointconverter(pickup_longitude#435, pickup_latitude#436) AS point#524] 
            :    +- *Filter ((isnotnull(total_amount#448) && payment_type#441 IN (CREDIT,CRD,1)) && (total_amount#448 > 200.0)) 
            :     +- *Scan BigQueryTableRelation({datasetId=new_york, projectId=bigquery-public-data, tableId=tlc_yellow_trips_2015},[email protected]) [payment_type#441,pickup_longitude#435,pickup_latitude#436,total_amount#448,tip_amount#445] PushedFilters: [IsNotNull(total_amount), In(payment_type, [CREDIT,CRD,1]), GreaterThan(total_amount,200.0)], ReadSchema: struct<pickup_latitude:double,pickup_longitude:double,tip_amount:double,total_amount:double,point... 
            +- *Sort [curve#580 ASC NULLS FIRST], false, 0 
             +- Exchange hashpartitioning(curve#580, 4) 
              +- Generate inline(index#165), true, false, [curve#580, relation#581] 
              +- InMemoryTableScan [neighborhood#163, polygon#164, index#165] 
                +- InMemoryRelation [neighborhood#163, polygon#164, index#165], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas) 
                 +- *Project [UDF:metadata_string(metadata#13, neighborhood) AS neighborhood#163, polygon#12, index#15] 
                  +- InMemoryTableScan [metadata#13, polygon#12, index#15] 
                    +- InMemoryRelation [point#10, polyline#11, polygon#12, metadata#13, valid#14, index#15], true, 10000, StorageLevel(disk, memory, deserialized, 1 replicas), `neighborhoods` 
                     +- *Scan GeoJSONRelation(gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson,Map(type -> geojson, magellan.index -> true, magellan.index.precision -> 30, path -> gs://miraisolutions/public/sparkgeo/nyc_neighborhoods.geojson)) [point#10,polyline#11,polygon#12,metadata#13,valid#14,index#15] ReadSchema: struct<point:struct<type:int,xmin:double,ymin:double,xmax:double,ymax:double,x:double,y:double>,p... 

Notiere die Vereinigung der beiden SELECT s in Form von Scans auf BigQueryTableRelation. Diese scheinen nacheinander ausgeführt zu werden.

Jede der BigQuery-Selektionen wird in einem separaten Job (jeweils mit einer einzelnen Phase) ausgeführt - sequenziell. Ich betreibe einen 5-Knoten-YARN-Cluster mit jeweils 4 CPUs und 26 GB RAM. Ich frage mich, ob die Tatsache, dass ich eine benutzerdefinierte BigQuery-Datenquelle habe, hier eine Rolle spielt. Ich würde erwarten, dass es nicht sollte. In jedem Fall als Referenz, kann die Datenquelle hier: github.com/miraisolutions/spark-bigquery

== Update 2 ==

im Funken lügt ich folgenden Protokolleintrag sehen:

17/12/19 14:36:24 INFO SparkSqlParser: Parsing command: SELECT `pickup_latitude` AS `pickup_latitude`, `pickup_longitude` AS `pickup_longitude`, `tip_amount` AS `tip_amount`, `total_amount` AS `total_amount` FROM ((SELECT * FROM `trips2014`) UNION ALL (SELECT * FROM `trips2015`)) `ggcyamhubf` WHERE (`payment_type` IN ("CREDIT", "CRD", "1"))

Funken optimiert diese Abfrage und schiebt die Prädikate bis auf die Datenquelle (BigQuery in diesem Fall). Die entsprechenden BigQuery-Jobs scheinen jedoch vollständig sequenziell ausgeführt zu werden, d. H. Der zweite Job wird nur ausgelöst, sobald der erste abgeschlossen ist.

+0

Könnte das sein, dass BigQuery funktioniert, wenn zwei parallele Abfragen vom selben Client nacheinander ausgeführt werden? Können Sie den Abfrageplan über die Webbenutzeroberfläche anhängen? –

+0

Ich habe den Abfrageplan gerade nicht verfügbar, aber von den Spark-Protokollen sieht es definitiv so aus, als würde '' BuildScan' '' DataSource'' (über 'RelationProvider') sequenziell aufgerufen, was bedeuten würde, dass die sequentielle Verarbeitung ein Ergebnis von Sparks ist Ausführungsplan. Ich sehe derzeit nichts, was bedeuten würde, dass es sich um eine BigQuery-Einschränkung handelt. –

+0

Zu viel raten ... zu wenig Daten, die uns helfen würden. Der Grund für die mögliche sequenzielle Ausführung ist nur die Anzahl der CPUs, die zur Verarbeitung aller Tasks verfügbar sind, die Teil der Abfrage sind. Aus diesem Grund benötigen wir dringend die Ausführungsstatistiken von der Web-Benutzeroberfläche. Warum archivieren Sie die Protokolle nicht -> https://spark.apache.org/docs/latest/monitoring.html#viewing-after-the-fact? –

Antwort

2

TL; DR Ja (je nach CPU-Verfügbarkeit)

Als Randbemerkung: Wenn Sie im Zweifel sind, können Sie auch ausführen zwei SELECTs auf ihre eigenen Fäden gefolgt von union (das wiederum würde auch hängt von der Anzahl der CPUs ab, aber Sie würden mit Sicherheit eine wirklich parallele Ausführung haben.

Lassen Sie uns die (sehr einfach) folgende Abfrage verwenden:

val q = spark.range(1).union(spark.range(2)) 

explain werden Sie nicht über die endgültige Ausführung erzählen von der CPU Perspektive, aber zumindest gibt Ihnen, ob ein Code-Generierung ganze Stufen im Einsatz ist und wie weit der Abfragebaum hoch ist.

scala> q.explain 
== Physical Plan == 
Union 
:- *Range (0, 1, step=1, splits=8) 
+- *Range (0, 2, step=1, splits=8) 

In diesem Beispiel sind die beiden Range physikalische Operatoren (die für die zwei separate Datensätze verantwortlich sind) erhalten „codegend“ und so ist ihre Ausführung pipelined. Ihre Ausführungszeit ist die Zeit, um alle Zeilen in Partitionen zu beenden (so schnell wie es jemals sein könnte, ohne sich mit der "Mechanik" des Java-Codes selbst zu befassen, der System.sleep oder ähnliches verwenden könnte).

Die RDD-Herkunft der Abfrage könnte Ihnen weitere Informationen zur Abfrageausführung geben.

scala> q.rdd.toDebugString 
res4: String = 
(16) MapPartitionsRDD[17] at rdd at <console>:26 [] 
| MapPartitionsRDD[16] at rdd at <console>:26 [] 
| UnionRDD[15] at rdd at <console>:26 [] 
| MapPartitionsRDD[11] at rdd at <console>:26 [] 
| MapPartitionsRDD[10] at rdd at <console>:26 [] 
| ParallelCollectionRDD[9] at rdd at <console>:26 [] 
| MapPartitionsRDD[14] at rdd at <console>:26 [] 
| MapPartitionsRDD[13] at rdd at <console>:26 [] 
| ParallelCollectionRDD[12] at rdd at <console>:26 [] 

Wenn ich mich nicht irre, da es keine Stufen sind in-zwischen gibt es nicht viel können Sie parallelisieren - es ist nur eine einzige Stufe mit 16 Partitionen und es endet so schnell wie die letzte Aufgabe (von den 16 zu planende Aufgaben).

Das bedeutet, dass die Reihenfolge in diesem Fall wichtig ist.


Ich fand auch this JIRA issue über UNION ALL, die, wenn nicht genau wie Ihr Fall ähnlich aussieht.

+1

Ja, Ihre erste Aussage ist korrekt. Die DFs in der Union werden in der gleichen Stufe gelesen, also in Ihrem Beispiel, wo jeder DF 8 Partitionen hat, dann haben Sie eine Stufe mit 16 Aufgaben, um die 2 DFs zu lesen. Es besteht keine Notwendigkeit, den DF in einem separaten Thread zu definieren. Sie können dies auch in der Benutzeroberfläche der Bühne überprüfen und die Ereignis-Timeline betrachten. Sie sollten sehen, dass die Aufgaben in den DFs parallel ausgeführt werden, wie erwartet. – Silvio

+0

Ich kann 'InMemoryTableScan' sehen, was bedeutet, dass Sie einige Teile der Abfrage zwischengespeichert haben, nicht wahr? (Nicht dass es wichtig ist, aber es lohnt sich zu fragen). Woher weißt du, dass 'UNION'' SELECT 'sequentiell ausführt? Ich sehe nicht die Anzahl der Partitionen pro 'BigQueryTableRelation'. 5 Knoten x 4 CPUs = 20 CPUs <- das ist nicht sehr beeindruckend, oder? –

+0

Ja, Sie haben Recht: Einige Teile der Abfrage werden zwischengespeichert. Wie Sie erwähnt haben, ist der Cluster nicht sehr groß, sollte aber in diesem Fall für den Zweck geeignet sein. Bitte lesen Sie mein zweites Update in der Frage, um Ihnen hoffentlich relevantere Informationen zu geben ... –