2016-11-10 16 views
0

Ich versuche multiprocess eine Funktion, die mehrere Aktionen für eine große Datei tut, aber ich bekomme die bekannte pickling Fehlerereignis, obwohl ich partial.Python Multiprocessing mit mehreren Argumenten

Die Funktion sieht wie folgt aus etwas:

def process(r,intermediate_file,record_dict,record_id): 

    res=0 

    record_str = str(record_dict[record_id]).upper() 
    start = record_str[0:100] 
    end= record_str[len(record_seq)-100:len(record_seq)] 

    print sample, record_id 
    if r=="1": 

     if something: 
      res = something... 
      intermediate_file.write("...") 

     if something: 
      res = something 
      intermediate_file.write("...") 



    if r == "2": 
     if something: 
      res = something... 
      intermediate_file.write("...") 

     if something: 
      res = something 
      intermediate_file.write("...") 

    return res 

Die Art und Weise im nannte es die folgenden in einer anderen Funktion:

def call_func(): 
    intermediate_file = open("inter.txt","w") 
    record_dict = get_record_dict()     ### get infos about each record as a dict based on the record_id 
    results_dict = {} 
    pool = Pool(10) 
    for a in ["a","b","c",...]: 

     if not results_dict.has_key(a): 
      results_dict[a] = {} 

     for b in ["1","2","3",...]: 

      if not results_dict[a].has_key(b): 
       results_dict[a][b] = {} 


      results_dict[a][b]['res'] = [] 

      infile = open(a+b+".txt","r") 
      ...parse the file and return values in a list called "record_ids"... 

      ### now call the function based on for each record_id in record_ids 
      if b=="1": 
       func = partial(process,"1",intermediate_file,record_dict) 
       res=pool.map(func, record_ids) 
       ## append the results for each pair (a,b) for EACH RECORD in the results_dict 
       results_dict[a][b]['res'].append(res) 

      if b=="2": 
       func = partial(process,"2",intermediate_file,record_dict) 
       res = pool.map(func, record_ids) 
       ## append the results for each pair (a,b) for EACH RECORD in the results_dict 
       results_dict[a][b]['res'].append(res) 

    ... do something with results_dict... 

Die Idee ist, dass innerhalb der record_ids für jeden Datensatz, ich will um die Ergebnisse für jedes Paar zu speichern (a, b).

Ich bin mir nicht sicher, was mir diesen Fehler geben:

File "/code/Python/Python-2.7.9/Lib/multiprocessing/pool.py", line 251, in map 
    return self.map_async(func, iterable, chunksize).get() 
    File "/code/Python/Python-2.7.9/Lib/multiprocessing/pool.py", line 558, in get 
    raise self._value 
cPickle.PicklingError: Can't pickle <type 'function'>: attribute lookup __builtin__.function faile 

d

Antwort

0

func nicht auf der obersten Ebene des Codes definiert ist, so dass es nicht gebeizt werden kann. Sie können pathos.multiprocesssing verwenden, die kein Standardmodul ist, aber es funktioniert.

Oder verwenden Sie etwas anderes zu Pool.map vielleicht eine Warteschlange von Arbeitnehmern? https://docs.python.org/2/library/queue.html

Am Ende gibt es ein Beispiel, das Sie verwenden können, ist es für threading ist aber sehr ähnlich die multiprocessing, wo es auch ist Queue ...

https://docs.python.org/2/library/multiprocessing.html#pipes-and-queues

Verwandte Themen