2015-11-05 15 views
5

Also was ich versuche mit dem folgenden Code zu tun ist, eine Liste von Listen zu lesen und sie durch die Funktion checker und dann log_result beschäftigen mit dem Ergebnis der Funktion checker. Ich versuche dies mit Multithreading zu tun, weil der Variablenname rows_to_parse in Wirklichkeit Millionen von Zeilen hat, also sollte die Verwendung mehrerer Kerne diesen Prozess um einen beträchtlichen Betrag beschleunigen.Multiprocessing Schreiben in Pandas Datenframe

Der Code im Moment funktioniert nicht und stürzt Python ab.

Anliegen und Fragen habe ich:

  1. die vorhandene df wollen, die in der Variablen df hielt den Index im gesamten Prozess zu halten, da sonst log_result verwirrt bekommen darüber, welche Zeile aktualisiert werden muss.
  2. Ich bin ziemlich sicher, dass apply_async ist nicht die geeignete Multiprocessing-Funktion, um diese Pflicht zu erfüllen, weil ich glaube, die Reihenfolge, in der der Computer liest und schreibt die df kann es möglicherweise verderben ???
  3. Ich denke, dass eine Warteschlange muss möglicherweise zum Schreiben und Lesen df eingerichtet werden, aber ich bin mir nicht sicher, wie ich das tun würde.

Vielen Dank für Ihre Hilfe.

import pandas as pd 
import multiprocessing 
from functools import partial 

def checker(a,b,c,d,e): 
    match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)] 
    index_of_match = match.index.tolist() 
    if len(index_of_match) == 1: #one match in df 
     return index_of_match 
    elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__: 
     return [index_of_match[0]] 
    else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df 
     return [a,b,c,d,e] 



def log_result(result, dataf): 
    if len(result) == 1: # 
     dataf.loc[result[0]]['e'] += 1 
    else: #append new row to exisiting df 
     new_row = pd.DataFrame([result],columns=cols) 
     dataf = dataf.append(new_row,ignore_index=True) 


def apply_async_with_callback(parsing_material, dfr): 
    pool = multiprocessing.Pool() 
    for var_a, var_b, var_c, var_d, var_e in parsing_material: 
     pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr)) 
    pool.close() 
    pool.join() 



if __name__ == '__main__': 
    #setting up main dataframe 
    cols = ['a','b','c','d','e'] 
    existing_data = [["YES","A","16052011","13031999",3], 
        ["NO","Q","11022003","15081999",3], 
        ["YES","A","22082010","03012001",9]] 

    #main dataframe 
    df = pd.DataFrame(existing_data,columns=cols) 

    #new data 
    rows_to_parse = [['NO', 'A', '09061997', '06122003', 5], 
        ['YES', 'W', '17061992', '26032012', 6], 
        ['YES', 'G', '01122006', '07082014', 2], 
        ['YES', 'N', '06081992', '21052008', 9], 
        ['YES', 'Y', '18051995', '24011996', 6], 
        ['NO', 'Q', '11022003', '15081999', 3], 
        ['NO', 'O', '20112004', '28062008', 0], 
        ['YES', 'R', '10071994', '03091996', 8], 
        ['NO', 'C', '09091998', '22051992', 1], 
        ['YES', 'Q', '01051995', '02012000', 3], 
        ['YES', 'Q', '26022015', '26092007', 5], 
        ['NO', 'F', '15072002', '17062001', 8], 
        ['YES', 'I', '24092006', '03112003', 2], 
        ['YES', 'A', '22082010', '03012001', 9], 
        ['YES', 'I', '15072016', '30092005', 7], 
        ['YES', 'Y', '08111999', '02022006', 3], 
        ['NO', 'V', '04012016', '10061996', 1], 
        ['NO', 'I', '21012003', '11022001', 6], 
        ['NO', 'P', '06041992', '30111993', 6], 
        ['NO', 'W', '30081992', '02012016', 6]] 


    apply_async_with_callback(rows_to_parse, df) 
+0

Was ist sonst: #no match, gib es Argumente zu schreiben an df soll tun? Ich denke, wenn Sie 'zurückgeben [a, b, c, d, e]' Ihr Code wird tatsächlich abgeschlossen, aber Sie werden andere Probleme haben, verwenden Sie auch nie dataf überall –

+0

Vielen Dank für das Hinzeigen, ich habe den Code geändert. So wird '[a, b, c, d, e]' in die Funktion 'log_result' geschrieben. – user3374113

+0

'partially (log_result, dataf = dfr)' stimmt nicht mit der Signatur von 'log_results' überein – mdurant

Antwort

8

Aktualisieren Datenrahmen wie dies in Multiprocessing wird nicht funktionieren:

dataf = dataf.append(new_row,ignore_index=True) 

Zum einen ist dies sehr ineffizient ist (O (n) für jeden anhängen so O (n^2) in Der bevorzugte Weg besteht darin, einige Objekte in einem Durchgang zusammenzufassen.

Zum anderen und noch wichtiger ist, dass dataf nicht für jedes Update gesperrt wird. Es gibt also keine Garantie, dass zwei Operationen nicht kollidieren (ich vermute Dies ist ein Absturz python).

Schließlich funktioniert append nicht an Ort und Stelle, so dass die Variable dataf verworfen wird, sobald der Rückruf beendet ist !! und es werden keine Änderungen an dem Elternelement dataf vorgenommen.


Wir MultiProcessing list oder eine dict nutzen könnten. Liste, wenn Sie sich nicht um Reihenfolge oder dict kümmern, wenn Sie tun (z. B. aufzählen), wie Sie beachten müssen, dass die Werte nicht in einer wohldefinierten Reihenfolge von Async zurückgegeben werden.
(oder wir könnten ein Objekt erstellen, die selbst implementiert sperren, siehe Eli Bendersky.)
So wurden folgende Änderungen vorgenommen:

df = pd.DataFrame(existing_data,columns=cols) 
# becomes 
df = pd.DataFrame(existing_data,columns=cols) 
d = MultiProcessing.list([df]) 

dataf = dataf.append(new_row,ignore_index=True) 
# becomes 
d.append(new_row) 

Jetzt, nachdem die Asynchron Sie haben ein MultiProcessing.list von beendet hat Datenrahmen. Sie können diese (und ignore_index) concat das gewünschte Ergebnis zu erhalten:

pd.concat(d, ignore_index=True) 

sollte es tun.


Hinweis: bei jeder Stufe die NewRow Datenrahmen schaffen, ist auch weniger effizient, dass die Vermietung Pandas die Liste der Listen direkt an einen Datenrahmen in einem Rutsch analysieren. Hoffentlich ist das ein Spielzeugbeispiel, wirklich wollen Sie, dass Ihre Stücke ziemlich groß sind, um Gewinne mit MultiProcessing zu bekommen (ich habe 50kb als eine Faustregel gehört ...), eine Reihe zu einer Zeit wird nie eine sein hier gewinnen.


Abgesehen: Sie sollen (wie df) in Ihrem Code mit Globals vermeiden, ist es viel sauberer sie in Ihren Funktionen über um (in diesem Fall als Argument für checker).

Verwandte Themen