2015-05-13 8 views
11

Es wird von Spark-Dokumentation über Scheduling Within an Application verstanden:Wie führe ich mehrere Jobs in einem Sparkcontext aus separaten Threads in PySpark aus?

Innerhalb einer bestimmten Spark-Anwendung (SparkContext Instanz) können mehrere parallele Jobs gleichzeitig laufen, wenn sie aus verschiedenen Threads eingereicht wurden. Mit "Auftrag" meinen wir in diesem Abschnitt eine Spark-Aktion (z. B. speichern, sammeln) und alle Tasks, die ausgeführt werden müssen, um diese Aktion auszuwerten. Spark-Scheduler ist vollständig Thread-sicher und unterstützt diesen Anwendungsfall Anwendungen zu ermöglichen, die mehrere Anforderungen (zB Anfragen für mehrere Benutzer) dienen.“

ich einige Beispiel-Code von der gleichen in Scala und Java. gefunden konnte Can jemand gibt ein Beispiel, wie dies mit PySpark implementiert werden kann?

+0

überhaupt eine Antwort hier bekommen? Ich versuche das Gleiche zu tun und denke, dass es eigentlich unmöglich ist, bis zu 'SparkContext' eine bessere Sperrung hinzugefügt wird. –

+0

@MikeSukmanowsky was meinst du? Dieses Dokument spricht nicht von einer bestimmten Spark-API, sondern scheint für alle zu funktionieren. Der eigentliche Code, der bei Verwendung einer der APIs ausgeführt wird, ist der Scala-Code und ein Schnittstellencode für Java und Python. – Dici

+0

Können Sie den Link angeben, woher diese Aussage stammt? – Jon

Antwort

2

Heute fragte ich mich das gleiche. Das Multiprocessing-Modul bietet eine ThreadPool, die einige Threads für Sie hervorbringt und damit die Jobs parallel ausgeführt wird instanziieren Sie die Funktionen, erstellen Sie dann den Pool und dann map es über den Bereich, den Sie durchlaufen möchten

In meinem Fall berechnete ich diese WSSSE-Nummern für unterschiedliche Anzahl von Zentren (Hyperparameter-Tuning), um ein "gutes" k-means-Clustering zu erhalten ... genau wie es in der MLSpark documentation umrissen ist. Ohne weitere Erklärungen sind hier einige Zellen aus meinem IPython Arbeitsblatt:

from pyspark.mllib.clustering import KMeans 
import numpy as np 

c_points sind 12dim Arrays:

>>> c_points.cache() 
>>> c_points.take(3) 
[array([ 1, -1, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]), 
array([-2, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0]), 
array([ 7, -1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0])] 

Im Folgenden für jede i Ich Berechnung dieser WSSSE Wert und Zurückkehren als ein Tupel:

def error(point, clusters): 
    center = clusters.centers[clusters.predict(point)] 
    return np.linalg.norm(point - center) 

def calc_wssse(i): 
    clusters = KMeans.train(c_points, i, maxIterations=20, 
     runs=20, initializationMode="random") 
    WSSSE = c_points\ 
     .map(lambda point: error(point, clusters))\ 
     .reduce(lambda x, y: x + y) 
    return (i, WSSSE) 

Hier beginnt den interessanten Teil:

from multiprocessing.pool import ThreadPool 
tpool = ThreadPool(processes=4) 

Run it:

wssse_points = tpool.map(calc_wssse, range(1, 30)) 
wssse_points 

gibt:

[(1, 195318509740785.66), 
(2, 77539612257334.33), 
(3, 78254073754531.1), 
... 
] 
+0

...und aus Neugierde habe ich es über '% zeitit 'benchmarkiert. die serielle Ausführung (mit einem "pulsierenden" Verhalten) dauerte 53,2 Sekunden, während die parallelisierte Annäherung mit 4 Threads in 16,2 Sekunden endete. Also, es gibt wirklich einen Unterschied. Mehr aktive Phasen parallel und immer einige in der Warteschlange. –

+0

Erhöht dies nicht die Möglichkeit von Race Conditions? – Jon

+0

Wahrscheinlich hängt es von den Daten ab, mit denen Sie manipulieren. Da Sie das Threading verwalten, müssen Sie sicherstellen, dass Sie Race Conditions nicht selbst erhöhen. – Minutis

7

ich in der gleichen Ausgabe lief, so habe ich eine kleine in sich geschlossene Beispiel. Ich erstelle mehrere Threads mit dem Threading-Modul von Python und sende mehrere Funke-Jobs gleichzeitig.

Beachten Sie, dass Spark standardmäßig die Jobs in FIFO (First-In First-Out) ausführt: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application. Im Beispiel unten, ich es FAIR Planung ändern

# Prereqs: 
# set 
# spark.dynamicAllocation.enabled   true 
# spark.shuffle.service.enabled   true 
    spark.scheduler.mode     FAIR 
# in spark-defaults.conf 

import threading 
from pyspark import SparkContext, SparkConf 

def task(sc, i): 
    print sc.parallelize(range(i*10000)).count() 

def run_multiple_jobs(): 
    conf = SparkConf().setMaster('local[*]').setAppName('appname') 
    # Set scheduler to FAIR: http://spark.apache.org/docs/latest/job-scheduling.html#scheduling-within-an-application 
    conf.set('spark.scheduler.mode', 'FAIR') 
    sc = SparkContext(conf=conf) 
    for i in range(4): 
    t = threading.Thread(target=task, args=(sc, i)) 
    t.start() 
    print 'spark task', i, 'has started' 


run_multiple_jobs() 

Ausgang:

spark task 0 has started 
spark task 1 has started 
spark task 2 has started 
spark task 3 has started 
30000 
0 
10000 
20000 
+0

irgendeine Idee ist dies der beste Weg, es zu tun? Vor allem, wenn Sie in einem Cluster sind. Da der sc auf dem Master bleibt und der Master ihn auf den Workern verteilt, habe ich überlegt, ob dies der beste Weg ist. – nEO

+0

Es gibt einige nette Notizen [hier] (https://www.shanalynn.ie/using-python-threading-for-multiple-results-queue/) zur Verwendung der Threading-Bibliothek, insbesondere für Dinge wie die Rückgabe der Ergebnisse von die Gewindeberechnungen. –

Verwandte Themen