2017-08-31 1 views
0

Ich habe ein Demo-Problem, wenn auf Kubernetes und AWS Auto-Masstab Dask verteilte Implementierung testen und ich mich bin nicht sicher, ob ich das Problem richtig bin Angriff zu nehmen.Massively Parallel Suchoperation mit Dask, Distributed

ist mein Szenario einen MD5-Hash eines Strings (die ein Passwort) findet das Original-String angegeben. Ich habe drei Hauptprobleme.

A) Der Parameterraum ist massiv und der Versuch, eine Dies-Tasche mit 2.8211099e + 12 Mitgliedern zu erstellen verursachte Speicherprobleme (daher die 'explodieren' Funktion, die Sie im Beispielcode unten sehen).

B) Reinigen Sie den Ausgang bei früher Suche. Ich denke, mit take(1, npartitions=-1) wird dies erreichen, aber ich war mir nicht sicher. Ursprünglich habe ich eine Ausnahme ausgelöst, die funktionierte aber fühlte sich "schmutzig"

C) Da dies lange läuft und manchmal Arbeiter oder AWS-Boxen sterben, wie wäre es am besten, den Fortschritt zu speichern?

Beispielcode:

import distributed 
import math 
import dask.bag as db 
import hashlib 
import dask 
import os 

if os.environ.get('SCHED_URL', False): 
    sched_url = os.environ['SCHED_URL'] 
    client = distributed.Client(sched_url) 
    versions = client.get_versions(True) 
    dask.set_options(get=client.get) 

difficulty = 'easy' 

settings = { 
    'hard': (hashlib.md5('welcome1'.encode('utf-8')).hexdigest(),'abcdefghijklmnopqrstuvwxyz1234567890', 8), 
    'mid-hard': (hashlib.md5('032abgh'.encode('utf-8')).hexdigest(),'abcdefghijklmnop1234567890', 7), 
    'mid': (hashlib.md5('b08acd'.encode('utf-8')).hexdigest(),'abcdef', 6), 
    'easy': (hashlib.md5('0812'.encode('utf-8')).hexdigest(),'', 4) 
} 

hashed_pw, keyspace, max_guess_length = settings[difficulty] 

def is_pw(guess): 
    return hashlib.md5(guess.encode('utf-8')).hexdigest() == hashed_pw 

def guess(n): 
    guess = '' 
    size = len(keyspace) 
    while n>0 : 
     n -= 1 
     guess += keyspace[n % size]; 
     n = math.floor(n/size); 
    return guess 

def make_exploder(num_partitions, max_val): 
    """Creates a function that maps a int to a range based on the number maximum value aimed for 
     and the number of partitions that are expected. 
     Used in this code used with map and flattent to take a short list 
     i.e 1->1e6 to a large one 1->1e20 in dask rather than on the host machine.""" 
    steps = math.ceil(max_val/num_partitions) 
    def explode(partition): 
     return range(partition * steps, partition * steps + steps) 
    return explode 


max_val = len(keyspace) ** max_guess_length # How many possiable password permutation 
partitions = math.floor(max_val/100) 
partitions = partitions if partitions < 100000 else 100000 # split in to a maximum of 10000 partitions. Too many partitions caused issues, memory I think. 
exploder = make_exploder(partitions, max_val) # Sort of the opposite of a reduce. make_exploder(10, 100)(3) => [30, 31, ..., 39]. Expands the problem back in to the full problem space. 

print("max val: %s, partitions:%s" % (max_val, partitions)) 

search = db.from_sequence(range(partitions), npartitions=partitions).map(exploder).flatten().filter(lambda i: i <= max_val).map(guess).filter(is_pw) 

search.take(1,npartitions=-1) 

Ich finde 'easy' funktioniert gut vor Ort, 'Mitte hart' funktioniert gut auf unserem 6 bis 8 * m4.2xlarge AWS-Cluster. Aber bis jetzt haben hard nicht funktioniert.

Antwort

2

A) Der Parameterraum ist massiv und der Versuch, eine Dies-Tasche mit 2.8211099e + 12 Mitgliedern zu erstellen, führte zu Speicherproblemen (daher die "explodieren" -Funktion im Beispielcode unten).

Dies hängt stark davon ab, wie Sie Ihre Elemente in einer Tasche arrangieren. Wenn jedes Element in einer eigenen Partition ist, dann wird dies sicherlich alles töten. 1e12 Partitionen ist sehr teuer. Ich empfehle, die Anzahl der Partitionen auf Tausende oder Zehntausende zu beschränken.

B) Reinigen Sie den Ausgang bei frühem Auffinden. Ich denke, die Verwendung von Take (1, npartitions = -1) wird dies erreichen, aber ich war mir nicht sicher. Ursprünglich ich eine Ausnahme raise Exception ausgelöst ("% s ist die Antwort‘% test_str), die gearbeitet, aber fühlte sich ‚schmutzig‘

Wenn Sie das wollen, dann empfehle ich nicht dask.bag verwenden, sondern stattdessen den concurrent.futures interface mit und insbesondere die as_completed Iterator.

C) Gegeben ist diese lange Lauf und manchmal Arbeiter oder AWS-Boxen sterben, wie wäre es am besten Fortschritte zu speichern?

Dask sollte diese so lange elastisch sein, wie Sie garantieren können, dass der sched Uler überlebt. Wenn Sie die concurrent-Futures-Schnittstelle statt desask bag verwenden, können Sie auch Zwischenergebnisse auf dem Client-Prozess verfolgen.

Verwandte Themen