2

Zum Beispiel Im Ausführen einiger Abfragen auf Funken, und in der Spark-UI kann ich sehen, dass einige Abfragen mehr shuffle haben, und diese Shuffle scheint, ist die Menge der Daten lokal gelesen und zwischen gelesen Vollstrecker.Wie wirkt sich die Menge der Daten in Spark gemischt

Aber so verstehe ich nicht eine Sache, zum Beispiel diese Abfrage unter geladenen 7GB von HDFS, aber das Suffel lesen + shufflled schreiben ist mehr als 10GB. Aber ich habe andere Abfragen gesehen, die auch 7GB von HDFS laden und der Shuffle ist wie 500kb. Also verstehe ich das nicht, kannst du bitte helfen? Die Menge der Daten, die gemischt werden, hängt nicht mit den Daten zusammen, die von den hdfs gelesen werden?

select 
    nation, o_year, sum(amount) as sum_profit 
from 
    (
select 
    n_name as nation, year(o_orderdate) as o_year, 
    l_extendedprice * (1 - l_discount) - ps_supplycost * l_quantity as amount 
    from 
     orders o join 
     (select l_extendedprice, l_discount, l_quantity, l_orderkey, n_name, ps_supplycost 
     from part p join 
     (select l_extendedprice, l_discount, l_quantity, l_partkey, l_orderkey, 
       n_name, ps_supplycost 
      from partsupp ps join 
      (select l_suppkey, l_extendedprice, l_discount, l_quantity, l_partkey, 
        l_orderkey, n_name 
      from 
       (select s_suppkey, n_name 
       from nation n join supplier s on n.n_nationkey = s.s_nationkey 
       ) s1 join lineitem l on s1.s_suppkey = l.l_suppkey 
      ) l1 on ps.ps_suppkey = l1.l_suppkey and ps.ps_partkey = l1.l_partkey 
     ) l2 on p.p_name like '%green%' and p.p_partkey = l2.l_partkey 
    ) l3 on o.o_orderkey = l3.l_orderkey 
)profit 
group by nation, o_year 
order by nation, o_year desc; 
+0

a) Operation b) Konfiguration (Anzahl der Partitionen zum Beispiel) c) initiale Datenverteilung – zero323

+0

Die Frage von OP ist, warum shuffle lesen oder schreiben manchmal weniger ist, während die Eingabe in GBs ist. Welche Faktoren können Shuffle Read Write in diesem Umfang bestimmen oder kontrollieren? –

Antwort

2

Der Shuffle ist der Mechanismus von Spark, um Daten so zu verteilen, dass sie auf verschiedene Partitionen verteilt sind. Dies beinhaltet typischerweise das Kopieren von Daten über Executoren und Maschinen hinweg. Es ist also ziemlich klar, dass gemischte Daten nicht wirklich von der Menge der Eingabedaten abhängig sind. Es hängt jedoch davon ab, welche Operationen Sie an den Eingabedaten ausführen, was dazu führt, dass Daten über Executors (und damit Maschinen) verschoben werden. Bitte gehen Sie durch http://spark.apache.org/docs/latest/programming-guide.html#shuffle-operations, um zu verstehen, warum das Mischen ein kostspieliger Prozess ist.

Mit Blick auf die Abfrage, die Sie eingefügt haben, scheint es, dass Sie eine Menge Join-Operationen (haben nicht tief geschaut, um die ultimative Operation zu verstehen, die Sie tun). Und das erfordert definitiv das Verschieben der Daten über Partitionen.Das Problem kann behandelt werden, indem die Abfrage erneut besucht und optimiert wird oder indem Ihre Eingabedaten auf eine Weise manipuliert oder vorverarbeitet werden, die zu weniger Bewegung von Daten führt (z. B. das Zusammenlagern der Daten, die verbunden werden, so dass sie in die gleiche Partition fallen). Auch dies ist nur ein Beispiel und Sie müssen aus Ihrem Anwendungsfall herausfinden, was für Sie am besten funktioniert.

2

Ich empfehle das Lesen, was ich als the paper on explaining the Mapreduce programming model sein.

Grundsätzlich ist es nicht die Menge der Daten auf HDFS (oder was auch immer die Quelle), die bestimmt, wie viele Daten gemischt werden. Ich werde versuchen, drei Beispiele zu erklären, mit:

Beispiel 1. Anzahl schlurfte Daten weniger als Eingangsdaten:

val wordCounts = words.map((_, 1)).reduceByKey(_ + _) 

Hier zählen wir die Anzahl der Wörter (für jede Taste) in jedem Partition, dann nur das Ergebnis mischen. Dann, wenn wir die Unterzählungen gemischt haben, addieren wir sie. Die Menge der Daten, die wir mischen, hängt also von der Anzahl der Zählungen ab. In diesem Fall hängt es also mit der Anzahl der eindeutigen Wörter zusammen.

Wenn wir nur ein einziges Wort hätten, würden wir viel weniger Daten als die Eingabe mischen. In der Tat zählt so viel wie es Threads gibt (also eine winzige Menge).

Hypothetisch, wenn jedes Wort einzigartig wäre, würden wir mehr Daten mischen (lesen Sie das Papier für Details). Die Menge der Daten, die in diesem Beispiel gemischt werden, hängt damit zusammen, wie viele eindeutige Schlüssel wir haben (eindeutige Wörter).

Beispiel 2. Datenmenge gemischt ist die gleiche wie Eingangsdaten:

val wordCounts = words.map((_, 1)).groupByKey().mapValues(_.size) 

Hier gruppieren wir alle Worte zusammen, dann zählen wir, wie viele es sind. Also müssen wir alle Daten mischen.

Beispiel 3. Datenmenge gemischt ist mehr als Eingabedaten:

val silly = 
    words.map(word => 
    (word, generateReallyLongString())) 
    .groupByKey() 

Hier unserer Karte Bühne abbildet jedes Wort eine wirklich lange zufällige Zeichenfolge, dann gruppieren wir sie alle zusammen für Wort. Hier erzeugen wir mehr Daten als die Eingabe und mischen mehr Daten als die Eingabe.

Verwandte Themen