2017-10-16 2 views
11

Ich versuche, eine peinliche parallel zur Schleife (previously asked here) und ließ sich auf this implementation parallelisieren, die meine Parameter passen:Warum sehe ich in Python keine Beschleunigung durch Multiprocessing?

with Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc, 
              total_test_samples, num_classes, num_features, label_set, 
              method_names, pos_class_index, out_results_dir, exhaustive_search]) 
     partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs) 

     with Pool(processes=num_procs) as pool: 
      cv_results = pool.map(partial_func_holdout, range(num_repetitions)) 

Der Grund, warum ich ein proxy object (gemeinsam zwischen Prozessen) verwenden müssen, um das erste Element in der shared proxy list datasets das ist eine Liste von großen Objekten (jeweils etwa 200-300MB). Diese datasets Liste hat normalerweise 5-25 Elemente. Normalerweise muss dieses Programm auf einem HPC-Cluster ausgeführt werden.

Hier ist die Frage, wenn ich dieses Programm mit 32 Prozessen und 50 GB Speicher (num_repetitions = 200, mit Datasets eine Liste von 10 Objekten, jeweils 250MB) ausführen, sehe ich keine Beschleunigung sogar um den Faktor 16 (mit 32 parallelen Prozessen). Ich verstehe nicht warum - irgendwelche Hinweise? Irgendwelche offensichtlichen Fehler oder schlechte Entscheidungen? Wo kann ich diese Implementierung verbessern? Irgendwelche Alternativen?

Ich bin mir sicher, dass dies bereits besprochen wurde, und die Gründe können vielfältig und sehr spezifisch für die Umsetzung sein - daher bitte ich Sie, mir Ihre 2 Cent zu geben. Vielen Dank.

Update: Ich habe einige Profiling mit cProfile, um eine bessere Idee zu bekommen - hier ist ein paar Infos, sortiert nach kumulativer Zeit.

In [19]: p.sort_stats('cumulative').print_stats(50) 
Mon Oct 16 16:43:59 2017 profiling_log.txt 

     555404 function calls (543552 primitive calls) in 662.201 seconds 

    Ordered by: cumulative time 
    List reduced from 4510 to 50 due to restriction <50> 

    ncalls tottime percall cumtime percall filename:lineno(function) 
    897/1 0.044 0.000 662.202 662.202 {built-in method builtins.exec} 
     1 0.000 0.000 662.202 662.202 test_rhst.py:2(<module>) 
     1 0.001 0.001 661.341 661.341 test_rhst.py:70(test_chance_classifier_binary) 
     1 0.000 0.000 661.336 661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run) 
     4 0.000 0.000 661.233 165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait) 
     4 0.000 0.000 661.233 165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait) 
     23 661.233 28.749 661.233 28.749 {method 'acquire' of '_thread.lock' objects} 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map) 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get) 
     1 0.000 0.000 661.233 661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait) 
    866/8 0.004 0.000 0.868 0.108 <frozen importlib._bootstrap>:958(_find_and_load) 
    866/8 0.003 0.000 0.867 0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked) 
    720/8 0.003 0.000 0.865 0.108 <frozen importlib._bootstrap>:641(_load_unlocked) 
    596/8 0.002 0.000 0.865 0.108 <frozen importlib._bootstrap_external>:672(exec_module) 
    1017/8 0.001 0.000 0.863 0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed) 
    522/51 0.001 0.000 0.765 0.015 {built-in method builtins.__import__} 

Die Profilierungs info jetzt von time sortiert

In [20]: p.sort_stats('time').print_stats(20) 
Mon Oct 16 16:43:59 2017 profiling_log.txt 

     555404 function calls (543552 primitive calls) in 662.201 seconds 

    Ordered by: internal time 
    List reduced from 4510 to 20 due to restriction <20> 

    ncalls tottime percall cumtime percall filename:lineno(function) 
     23 661.233 28.749 661.233 28.749 {method 'acquire' of '_thread.lock' objects} 
    115/80 0.177 0.002 0.211 0.003 {built-in method _imp.create_dynamic} 
     595 0.072 0.000 0.072 0.000 {built-in method marshal.loads} 
     1 0.045 0.045 0.045 0.045 {method 'acquire' of '_multiprocessing.SemLock' objects} 
    897/1 0.044 0.000 662.202 662.202 {built-in method builtins.exec} 
     3 0.042 0.014 0.042 0.014 {method 'read' of '_io.BufferedReader' objects} 
2037/1974 0.037 0.000 0.082 0.000 {built-in method builtins.__build_class__} 
     286 0.022 0.000 0.061 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat) 
    2886 0.021 0.000 0.021 0.000 {built-in method posix.stat} 
     79 0.016 0.000 0.016 0.000 {built-in method posix.read} 
     597 0.013 0.000 0.021 0.000 <frozen importlib._bootstrap_external>:830(get_data) 
     276 0.011 0.000 0.013 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset) 
     108 0.011 0.000 0.038 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser) 
    1225 0.011 0.000 0.050 0.000 <frozen importlib._bootstrap_external>:1233(find_spec) 
    7179 0.009 0.000 0.009 0.000 {method 'splitlines' of 'str' objects} 
     33 0.008 0.000 0.008 0.000 {built-in method posix.waitpid} 
     283 0.008 0.000 0.015 0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines) 
     3 0.008 0.003 0.008 0.003 {method 'poll' of 'select.poll' objects} 
    7178 0.008 0.000 0.008 0.000 {method 'expandtabs' of 'str' objects} 
     597 0.007 0.000 0.007 0.000 {method 'read' of '_io.FileIO' objects} 

Weitere Informationen Profilierung von percall info sortiert: profiling info sorted by percall

Update 2

Die Elemente in der großen Liste datasets Ich erwähne Ed früher sind in der Regel nicht so groß - sie sind in der Regel 10-25MB. Je nach verwendeter Gleitkomma-Genauigkeit, Anzahl der Samples und Features kann dies jedoch leicht auf 500MB-1GB pro Element anwachsen. Daher würde ich eine Lösung bevorzugen, die skalierbar ist.

Update 3:

Der Code innerhalb holdout_trial_compare_datasets verwendet Methode GridSearchCV von Scikit-Learn, die intern JOBLIB Bibliothek verwendet, wenn wir n_jobs> 1 (oder wann immer wir es sogar eingestellt) eingestellt. Dies kann zu einigen schlechten Interaktionen zwischen Multiprocessing und Joblib führen. Also eine andere Konfiguration versuchen, wo ich überhaupt keine n_jobs setze (die standardmäßig keine Parallelität innerhalb von scikit-learn haben sollten). Sie auf dem Laufenden halten.

+0

haben Sie jede Profilierung getan? – georgexsh

+0

noch nicht, da die Parameter, für die ich es testen möchte (16-32 Prozesse, mit 10-15 Datensätze) erfordern, dass ich es auf einem Cluster ausführen und ich weiß nicht, wie Sie Python-Programme in der Befehlszeile profilieren . Ich werde es bald untersuchen. –

+0

my 2 ¢: Wenn Ihr großes Datenobjekt nur von Parent zu Child übergeben wird, scheint 'Manager' zu viel zu sein, Sie könnten es in eine globale Variable im Parent laden, dann wird es mit child nach' fork() 'geteilt. – georgexsh

Antwort

0
{method 'acquire' of '_thread.lock' objects} 

Wenn Sie Ihren Profiler-Ausgang betrachten, würde ich sagen, dass der gemeinsame Objekt-Lock/Unlock-Overhead die Geschwindigkeitsgewinne von Multithreading überwältigt.

Refactor, so dass die Arbeit für Arbeiter, die nicht so viel miteinander reden müssen, entwickelt wird.

Spezifisch, wenn möglich, leiten Sie eine Antwort pro Datenstapel ab und agieren dann auf die akkumulierten Ergebnisse.

Dies ist, warum Warteschlangen so viel schneller scheinen können: Sie beinhalten eine Art von Arbeit, die kein Objekt erfordert, das "verwaltet" und so gesperrt/entsperrt werden muss.

Nur 'verwalten' Dinge, die unbedingt zwischen den Prozessen geteilt werden müssen. Ihre verwalteten Liste enthält einige sehr kompliziert aussehenden Objekte ...

Ein schnelles Paradigma ist:

allwork = manager.list([a, b,c]) 
theresult = manager.list() 

und dann

while mywork: 
    unitofwork = allwork.pop() 
    theresult = myfunction(unitofwork) 
+0

Danke für die Vorschläge und Antworten. Tatsächlich muss die Arbeit in verschiedenen Prozessen NICHT miteinander kommunizieren, d. H. Sobald sie auf die geteilte Liste von Daten "shared_inputs" zugreifen, ist alles innerhalb der Arbeitseinheit unabhängig von anderen. –

+0

Und ich bin mir nicht sicher, wie sich mein implementierter Vorschlag von Ihren Vorschlägen unterscheidet. Könnten Sie versuchen, mehr auszuarbeiten, oder vielleicht eine detailliertere Implementierung schreiben, die anstelle dessen, was ich an der Spitze zeige, getauscht werden kann? das wäre großartig, danke. –

0

Wenn Sie nicht über ein komplexes gemeinsames Objekt benötigen, dann nur Gebrauch eine Liste der denkbar einfachsten Objekte.

Dann sagen Sie den Arbeitern, die komplexen Daten zu erwerben, die sie in ihrer eigenen kleinen Welt verarbeiten können.

Versuchen:

allwork = manager.list([datasetid1, datasetid2 ,...]) 
theresult = manager.list() 

while mywork: 
    unitofworkid = allwork.pop() 
    theresult = myfunction(unitofworkid) 

def myfunction(unitofworkid): 
    thework = acquiredataset(unitofworkid) 
    result = holdout_trial_compare_datasets(thework, ...) 

Ich hoffe, dass dies Sinn macht. Es sollte nicht zu lange dauern, um in diese Richtung umzudenken. Und Sie sollten sehen, dass {Methode 'acquire' von '_thread.lock'-Objekten} Zahl fallen wie ein Stein, wenn Sie profilieren.

+0

Danke, ich muss darüber nachdenken. Dies würde zu enormen I/O in meiner Situation führen - aber ich muss darüber nachdenken, ob es nur einen sehr kleinen Bruchteil der Zeit im Vergleich zur tatsächlichen Parallelberechnung ist. –

2

Basierend auf Diskussion in den Kommentaren, habe ich ein Mini-Experiment verglichen drei Versionen der Implementierung:

  • v1: im Grunde die gleiche wie Ihr Ansatz, in der Tat, wie partial(f1, *shared_inputs) wird proxy_manager.list sofort auspacken, Manager.List nicht Hier sind Daten beteiligt, die an den Worker mit der internen Warteschlange Pool übergeben wurden.
  • v2: v2 machte Gebrauch Manager.List, Arbeitsfunktion erhält ein ListProxy Objekt, es holt geteilte Daten über eine interne Verbindung zu einem Serverprozess.
  • v3: Kindprozess Daten von der übergeordneten teilen, nutzen Sie fork(2) Systemaufruf.

def f1(*args): 
    for e in args[0]: pow(e, 2) 

def f2(*args): 
    for e in args[0][0]: pow(e, 2) 

def f3(n): 
    for i in datasets: pow(i, 2) 

def v1(np): 
    with mp.Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets,]) 
     pf = partial(f1, *shared_inputs) 
     with mp.Pool(processes=np) as pool: 
      r = pool.map(pf, range(16)) 

def v2(np): 
    with mp.Manager() as proxy_manager: 
     shared_inputs = proxy_manager.list([datasets,]) 
     pf = partial(f2, shared_inputs) 
     with mp.Pool(processes=np) as pool: 
      r = pool.map(pf, range(16)) 

def v3(np): 
    with mp.Pool(processes=np) as pool: 
     r = pool.map(f3, range(16)) 

datasets = [2.0 for _ in range(10 * 1000 * 1000)] 
for f in (v1, v2, v3): 
    print(f.__code__.co_name) 
    for np in (2, 4, 8, 16): 
     s = time() 
     f(np) 
     print("%s %.2fs" % (np, time()-s)) 

auf einem 16 Kern genommen Ergebnisse E5-2682 VPC, ist es offensichtlich, dass v3 skaliert besser: result

+0

Dank George - hilft mir wirklich, das Problem besser zu verstehen. Ich bin wirklich neugierig auf '* shared_inputs' sofort entpacken - ich habe nicht bemerkt, dass passiert .. Ähnliches würde passieren, wenn wir' functools.partial' auch anwenden, wie es versucht, die Eingaben und Unterschrift einzufrieren, nicht wahr? Also mit oder ohne Entpacken müssen die Daten durch Beizen an die Kindprozesse übergeben worden sein, oder? Das heißt, das einzige, was wir vermeiden können, ist, sie als globale Variablen zu verwenden und auf sie von untergeordneten Prozessen zuzugreifen. Irgendwelche Gefahren mit dieser Methode? In untergeordneten Prozessen werden keine Änderungen an der Eingabe vorgenommen. –

+0

als '* shared_inputs' entpackte Liste vor' partiell' wirksam werden, "friert" Objekte ein, die (Proxy-) Liste enthält, nicht der Proxy selbst, v2 es ein Beispiel, 'Manager.List' mit partiellen zu verwenden; Ja, die Daten wurden pickle/unpickle zum herumreichen; Im Kontext Ihres Problems und Ihrer Arbeitslast gibt es keine Bedenken. Wenn Sie etwas übergeben, das mit externen Ressourcen wie Socket-Objekt, Datei-Deskriptor verbunden ist, müssen Sie vorsichtiger sein. – georgexsh

+0

Danke George für deine Hilfe - Ich denke, wir müssen die Engpässe besser erkennen ... vor allem mit großen Objekten (10 Elemente von 20MB) und komplexen Berechnungen (etwas anspruchsvoller als das Quadrieren eines regelmäßigen Zahlenrasters, da dies interne Memoisierungen auslösen könnte)) .. Ihre Kommentare waren sehr hilfreich und ich hoffe, dies zu nageln. –

Verwandte Themen