2

Ich hoffe, dass ich diesmal nicht downvoted bin. Ich habe in Python eine Weile mit der parallelen Verarbeitung gekämpft (genau 2 Tage). Ich habe diese Ressourcen (eine unvollständige Liste überprüft wird hier gezeigt:Bessere Beispiele für Parallelverarbeitung in Python

(a) http://eli.thegreenplace.net/2013/01/16/python-paralellizing-cpu-bound-tasks-with-concurrent-futures

(b) https://pythonadventures.wordpress.com/tag/processpoolexecutor/

Ich kam unstuck Was ich tun möchte, ist dies:.

Meister :

Break up the file into chunks(strings or numbers) 
Broadcast a pattern to be searched to all the workers 
Receive the offsets in the file where the pattern was found 

Arbeiter:

Receive pattern and chunk of text from the master 
Compute() 
Send back the offsets to the master. 

Ich habe versucht, dies mit MPI/concurrent.futures/multiprocessing zu implementieren und kam nicht zum Einsatz.

Meine naive Implementierung Multiprocessing-Modul

import multiprocessing 

filename = "file1.txt" 
pat = "afow" 
N = 1000 

""" This is the naive string search algorithm""" 

def search(pat, txt): 

    patLen = len(pat) 
    txtLen = len(txt) 
    offsets = [] 

    # A loop to slide pattern[] one by one 
    # Range generates numbers up to but not including that number 
    for i in range ((txtLen - patLen) + 1): 

    # Can not use a for loop here 
    # For loops in C with && statements must be 
    # converted to while statements in python 
     counter = 0 
     while(counter < patLen) and pat[counter] == txt[counter + i]: 
      counter += 1 
      if counter >= patLen: 
       offsets.append(i) 
     return str(offsets).strip('[]') 

     """" 
     This is what I want 
if __name__ == "__main__": 
    tasks = [] 
    pool_outputs = [] 
    pool = multiprocessing.Pool(processes=5) 
    with open(filename, 'r') as infile: 
      lines = [] 
      for line in infile: 
       lines.append(line.rstrip()) 
       if len(lines) > N: 
        pool_output = pool.map(search, tasks) 
        pool_outputs.append(pool_output) 
        lines = [] 
       if len(lines) > 0: 
        pool_output = pool.map(search, tasks) 
        pool_outputs.append(pool_output) 
    pool.close() 
    pool.join() 
    print('Pool:', pool_outputs) 
     """"" 

with open(filename, 'r') as infile: 
    for line in infile: 
     print(search(pat, line)) 

mit Mir würde vor allem mit dem concurrent.futures für jede Führung dankbar. Vielen Dank für Ihre Zeit. Valeriy hat mir mit seinem Zusatz geholfen und ich danke ihm dafür.

Aber wenn jemand nur mich für einen Moment gönnen konnte, ist dies der Code, den ich arbeitete für die concurrent.futures (off Arbeit ein Beispiel, das ich irgendwo gesehen)

from concurrent.futures import ProcessPoolExecutor, as_completed 
import math 

def search(pat, txt): 

    patLen = len(pat) 
    txtLen = len(txt) 
    offsets = [] 

# A loop to slide pattern[] one by one 
# Range generates numbers up to but not including that number 
    for i in range ((txtLen - patLen) + 1): 

    # Can not use a for loop here 
    # For loops in C with && statements must be 
    # converted to while statements in python 
     counter = 0 
     while(counter < patLen) and pat[counter] == txt[counter + i]: 
      counter += 1 
      if counter >= patLen: 
       offsets.append(i) 
return str(offsets).strip('[]') 

#Check a list of strings 
def chunked_worker(lines): 
    return {0: search("fmo", line) for line in lines} 


def pool_bruteforce(filename, nprocs): 
    lines = [] 
    with open(filename) as f: 
     lines = [line.rstrip('\n') for line in f] 
    chunksize = int(math.ceil(len(lines)/float(nprocs))) 
    futures = [] 

    with ProcessPoolExecutor() as executor: 
     for i in range(nprocs): 
      chunk = lines[(chunksize * i): (chunksize * (i + 1))] 
      futures.append(executor.submit(chunked_worker, chunk)) 

    resultdict = {} 
    for f in as_completed(futures): 
     resultdict.update(f.result()) 
    return resultdict 


filename = "file1.txt" 
pool_bruteforce(filename, 5) 

Nochmals vielen Dank, Valeriy und jedermann wer versucht mir zu helfen, mein Rätsel zu lösen.

Antwort

0

Sie verwenden mehrere Argumente, so:

import multiprocessing 
from functools import partial 
filename = "file1.txt" 
pat = "afow" 
N = 1000 

""" This is the naive string search algorithm""" 

def search(pat, txt): 
    patLen = len(pat) 
    txtLen = len(txt) 
    offsets = [] 

    # A loop to slide pattern[] one by one 
    # Range generates numbers up to but not including that number 
    for i in range ((txtLen - patLen) + 1): 

    # Can not use a for loop here 
    # For loops in C with && statements must be 
    # converted to while statements in python 
     counter = 0 
     while(counter < patLen) and pat[counter] == txt[counter + i]: 
      counter += 1 
      if counter >= patLen: 
       offsets.append(i) 
     return str(offsets).strip('[]') 


if __name__ == "__main__": 
    tasks = [] 
    pool_outputs = [] 
    pool = multiprocessing.Pool(processes=5) 
    lines = [] 
    with open(filename, 'r') as infile: 
     for line in infile: 
      lines.append(line.rstrip())     
    tasks = lines 
    func = partial(search, pat) 
    if len(lines) > N: 
     pool_output = pool.map(func, lines) 
     pool_outputs.append(pool_output)  
    elif len(lines) > 0: 
     pool_output = pool.map(func, lines) 
     pool_outputs.append(pool_output) 
    pool.close() 
    pool.join() 
    print('Pool:', pool_outputs) 
+0

Valeriy: Danke. Was macht der Partial überhaupt? Kennen Sie Ressourcen, die die parallele Verarbeitung in Python gründlich angehen? Danke noch einmal. – corax

+0

https://docs.python.org/2/library/functools.html#functools.partial –

+0

Valeriy: Ich habe das gelesen und konnte es nicht wirklich verstehen. Es tut mir leid, aber ich meinte als ein richtiges Beispiel in einer Funktion. Vielen Dank. – corax