2016-06-19 11 views
0

Ich bin ein Neuling zu Python, ich habe Funktion, die Feature für meine Daten berechnen und dann eine Liste, die in der Datei verarbeitet und geschrieben werden soll., .. Ich benutze Pool, um das zu tun Berechnung und dann und verwenden Sie die Callback-Funktion, um in Datei schreiben, aber die Callback-Funktion wird nicht aufgerufen werden, habe ich einige print-Anweisung in es, aber es ist definitiv nicht aufgerufen. mein Code wie folgt aussieht:apply_async Callback-Funktion nicht aufgerufen

def write_arrow_format(results): 
print("writer called") 
results[1].to_csv("../data/model_data/feature-"+results[2],sep='\t',encoding='utf-8') 
with open('../data/model_data/arow-'+results[2],'w') as f: 
    for dic in results[0]: 
     feature_list=[] 
     print(dic) 
     beginLine=True 
     for key,value in dic.items(): 
       if(beginLine): 
       feature_list.append(str(value)) 
       beginLine=False 
       else: 
       feature_list.append(str(key)+":"+str(value)) 
     feature_line=" ".join(feature_list) 
     f.write(feature_line+"\n") 


def generate_features(users,impressions,interactions,items,filename): 
    #some processing 
    return [result1,result2,filename] 





if __name__=="__main__": 
    pool=mp.Pool(mp.cpu_count()-1) 

    for i in range(interval): 
     if i==interval: 
      pool.apply_async(generate_features,(users[begin:],impressions,interactions,items,str(i)),callback=write_arrow_format) 
     else: 
      pool.apply_async(generate_features,(users[begin:begin+interval],impressions,interactions,items,str(i)),callback=write_arrow_format) 
      begin=begin+interval 
    pool.close() 
    pool.join() 
+0

Da die Datei zu lang ist .. Ich eingefügt die Codes, die problematics .. interval Variable – Eliethesaiyan

+0

Ich sehe keinen Fehler in Ihrem Code, die verhindern würde, dass die Callback-Funktion aufgerufen werden. Eine gute Debugging-Methode besteht darin, Ihren Code progressiv zu reduzieren, bis Sie ein sehr einfaches Beispiel haben, das das Problem veranschaulicht. Eines von zwei sehr guten Dingen wird passieren: Entweder wirst du ein * runnable * minimales Beispiel haben, das du hier posten kannst (was deine Chance, eine gute Antwort zu bekommen, erheblich erhöht) oder bei der Vereinfachung des Codes, wo der Fehler liegt . – unutbu

+0

@unutbu Ich weiß auch nicht, warum Rückruf wird nicht aufgerufen ... alle Methoden laufen korrekt, aber definetly nicht der Rückruf .. ich versuchte, es zu debuggen, aber vergeblich..ich kommentierte alle Codes außer dem Druck .. ., aber immer noch nicht – Eliethesaiyan

Antwort

3

Es ist nicht klar, aus Ihrem Post, was in der Liste von generate_features zurück enthalten ist. Wenn jedoch result1, result2 oder filename nicht serialisierbar sind, wird die Multiprocessing-Bibliothek aus irgendeinem Grund die Callback-Funktion nicht aufrufen und dies auch nicht automatisch tun. I Think Dies liegt daran, die Multiprocessing-Lib versucht, Objekte vor der Weitergabe zwischen untergeordneten Prozessen und dem übergeordneten Prozess zu pickle. Wenn etwas, das Sie zurückgeben, nicht "pickleable" ist (d. H. Nicht serialisierbar), wird der Callback nicht aufgerufen.

Ich habe diesen Fehler selbst festgestellt, und es stellte sich heraus, dass es sich um eine Instanz eines Logger-Objekts handelte, das mir Probleme bereitete. Hier ist ein Beispielcode, um mein Problem zu reproduzieren:

import multiprocessing as mp 
import logging 

def bad_test_func(ii): 
    print('Calling bad function with arg %i'%ii) 
    name = "file_%i.log"%ii 
    logging.basicConfig(filename=name,level=logging.DEBUG) 
    if ii < 4: 
     log = logging.getLogger() 
    else: 
     log = "Test log %i"%ii 
    return log 

def good_test_func(ii): 
    print('Calling good function with arg %i'%ii) 
    instance = ('hello', 'world', ii) 
    return instance 

def pool_test(func): 
    def callback(item): 
     print('This is the callback') 
     print('I have been given the following item: ') 
     print(item) 
    num_processes = 3 
    pool = mp.Pool(processes = num_processes) 
    results = [] 
    for i in range(5): 
     res = pool.apply_async(func, (i,), callback=callback) 
     results.append(res) 
    pool.close() 
    pool.join() 

def main(): 

    print('#'*30) 
    print('Calling pool test with bad function') 
    print('#'*30) 

    pool_test(bad_test_func) 

    print('#'*30) 
    print('Calling pool test with good function') 
    print('#'*30) 
    pool_test(good_test_func) 

if __name__ == '__main__': 
    main() 

Hoffentlich ist dies hilfreich und weist Sie in die richtige Richtung.

+0

OMG !!!! Vielen Dank = P Auch scheint alles, was in einer Callback-Funktion fehlschlägt, automatisch zu stolpern, ohne die Ausnahme wieder in die Logs zu übertragen. – doubleOK

Verwandte Themen