2017-05-29 7 views
0

Ich entschuldige mich, wenn diese Frage bereits beantwortet wurde. Ich habe mir das Archiv angesehen, aber ich habe keine Antwort auf meine Frage gefunden.Warum verwendet local [*] nicht alle verfügbaren Kerne in meinem Rechner?

Ich bin neu in Spark. Ich versuche, das einfache Beispiel, das parallel angebracht wird, örtlich zu führen, spark-2.1.1 in meiner MacOS Sierra-Maschine verwendend. Da ich 4 Kerne habe und es 4 Aufgaben gibt, die jeweils 10 Sekunden dauern, habe ich gehofft, insgesamt etwas mehr als 10 Sekunden zu verbringen.

Ich sehe, dass jede Aufgabe die erwartete Menge an Zeit benötigt. Aber da scheint mir nur 2 Thread der Ausführung. Ich erwartete 4. Wie Sie im Code sehen können, ist der Wert jedes Tupels die Ausführungszeit der entsprechenden Aufgabe.

insight086: pyspark lquesada $ mehr Leistung/Teil-00000

(u'1', 10.000892877578735) 
(u'3', 10.000878095626831) 

insight086: pyspark lquesada $ mehr Leistung/Teil-00001

(u'2', 10.000869989395142) 
(u'4', 10.000877857208252) 

Auch die Gesamtzeit, dies nimmt wesentlich mehr als 20 Sekunden:

total_time 33.2253439426 

Vielen Dank im Voraus für Ihre Hilfe!

Cheers, Luis

INPUT FILE:

1 
2 
3 
4 

SCRIPT:

from pyspark import SparkContext 
import time 

def mymap(word): 
    start = time.time() 
    time.sleep(10) 
    et=time.time()-start 
    return (word, et) 

def main(): 
    start = time.time() 
    sc = SparkContext(appName='SparkWordCount') 

    input_file = sc.textFile('/Users/lquesada/Dropbox/hadoop/pyspark/input.txt') 
    counts = input_file.flatMap(lambda line: line.split()) \ 
        .map(mymap) \ 
        .reduceByKey(lambda a, b: a + b) 
    counts.saveAsTextFile('/Users/lquesada/Dropbox/hadoop/pyspark/output') 

    sc.stop() 
    print 'total_time',time.time()-start 

if __name__ == '__main__': 
    main() 
+0

Dieser Datensatz ist so klein, dass es unmöglich ist, etwas damit zu beweisen ... – eliasah

+0

Meine eigentliche Frage war auf die Anzahl der verwendeten Kerne. Ich schätze jedoch die Tatsache, dass Sie mich auf "Inkonsistente Leistungszahl bei der Skalierung der Anzahl der Kerne" hingewiesen haben, da dies für meine Bedenken hinsichtlich des Overheads sicherlich relevant ist. –

Antwort

0

, deshalb, Divide and conquer algorithms ihre Schwelle, wo es Sinn macht sie überhaupt zu benutzen. Fügen Sie die Verteilung der Mischung (mit Parallelität) in Spark hinzu, und Sie haben ziemlich viel Maschinerie, um solch eine kleine Berechnung durchzuführen. Mit diesem 4-Element-Datensatz können Sie die Stärken von Spark einfach nicht nutzen.

Es wird angenommen, dass mit den größeren und größeren Datensätzen die Zeit um Ihre Erwartungen konvergieren wird.

Auch die Anzahl der Partitionen beim Lesen lokaler Datensätze ist höchstens 2, also ohne repartitioning verwenden Sie nur 2 Kerne.

repartition (numPartitions: Int) (implizite ord: Bestellung [T] = null): RDD [T] ein neues RDD zurück, die genau numPartitions Partitionen hat.

Kann den Grad der Parallelität in dieser RDD erhöhen oder verringern. Intern wird ein Shuffle verwendet, um Daten neu zu verteilen.

Wenn Sie die Anzahl der Partitionen in dieser RDD verringern, sollten Sie die Verwendung von Coalesce in Erwägung ziehen, wodurch eine Zufallswiedergabe vermieden werden kann.


local[*] bedeutet so viele Kerne zu verwenden, wie Ihr Computer (siehe den Fall für LOCAL_N_REGEX in SparkContext):

def localCpuCount: Int = Runtime.getRuntime.availableProcessors() 
val threadCount = if (threads == "*") localCpuCount else threads.toInt 

Es ist nur ein Hinweis, wie viele Partitionen standardmäßig zu verwenden, aber verhindert nicht, dass Spark nach oben oder unten geht. Es hängt hauptsächlich von den Optimierungen ab, die Spark anwendet, um mit dem besten Ausführungsplan für Ihre verteilte Berechnung zu enden. Spark macht eine Menge für Sie und je höher die Abstraktionsstufe, desto mehr Optimierungen (siehe batches in Spark SQL Optimizer).

Verwandte Themen