Also was ich versuche mit dem folgenden Code zu tun ist, eine Liste von Listen zu lesen und sie durch die Funktion checker
und dann log_result
beschäftigen mit dem Ergebnis der Funktion checker
. Ich versuche dies mit Multithreading zu tun, weil der Variablenname rows_to_parse
in Wirklichkeit Millionen von Zeilen hat, also sollte die Verwendung mehrerer Kerne diesen Prozess um einen beträchtlichen Betrag beschleunigen.Multiprocessing Schreiben in Pandas Datenframe
Der Code im Moment funktioniert nicht und stürzt Python ab.
Anliegen und Fragen habe ich:
- die vorhandene df wollen, die in der Variablen
df
hielt den Index im gesamten Prozess zu halten, da sonstlog_result
verwirrt bekommen darüber, welche Zeile aktualisiert werden muss. - Ich bin ziemlich sicher, dass
apply_async
ist nicht die geeignete Multiprocessing-Funktion, um diese Pflicht zu erfüllen, weil ich glaube, die Reihenfolge, in der der Computer liest und schreibt die df kann es möglicherweise verderben ??? - Ich denke, dass eine Warteschlange muss möglicherweise zum Schreiben und Lesen
df
eingerichtet werden, aber ich bin mir nicht sicher, wie ich das tun würde.
Vielen Dank für Ihre Hilfe.
import pandas as pd
import multiprocessing
from functools import partial
def checker(a,b,c,d,e):
match = df[(df['a'] == a) & (df['b'] == b) & (df['c'] == c) & (df['d'] == d) & (df['e'] == e)]
index_of_match = match.index.tolist()
if len(index_of_match) == 1: #one match in df
return index_of_match
elif len(index_of_match) > 1: #not likely because duplicates will be removed prior to: if "__name__" == __main__:
return [index_of_match[0]]
else: #no match, returns a result which then gets processed by the else statement in log_result. this means that [a,b,c,d,e] get written to the df
return [a,b,c,d,e]
def log_result(result, dataf):
if len(result) == 1: #
dataf.loc[result[0]]['e'] += 1
else: #append new row to exisiting df
new_row = pd.DataFrame([result],columns=cols)
dataf = dataf.append(new_row,ignore_index=True)
def apply_async_with_callback(parsing_material, dfr):
pool = multiprocessing.Pool()
for var_a, var_b, var_c, var_d, var_e in parsing_material:
pool.apply_async(checker, args = (var_a, var_b, var_c, var_d, var_e), callback = partial(log_result,dataf=dfr))
pool.close()
pool.join()
if __name__ == '__main__':
#setting up main dataframe
cols = ['a','b','c','d','e']
existing_data = [["YES","A","16052011","13031999",3],
["NO","Q","11022003","15081999",3],
["YES","A","22082010","03012001",9]]
#main dataframe
df = pd.DataFrame(existing_data,columns=cols)
#new data
rows_to_parse = [['NO', 'A', '09061997', '06122003', 5],
['YES', 'W', '17061992', '26032012', 6],
['YES', 'G', '01122006', '07082014', 2],
['YES', 'N', '06081992', '21052008', 9],
['YES', 'Y', '18051995', '24011996', 6],
['NO', 'Q', '11022003', '15081999', 3],
['NO', 'O', '20112004', '28062008', 0],
['YES', 'R', '10071994', '03091996', 8],
['NO', 'C', '09091998', '22051992', 1],
['YES', 'Q', '01051995', '02012000', 3],
['YES', 'Q', '26022015', '26092007', 5],
['NO', 'F', '15072002', '17062001', 8],
['YES', 'I', '24092006', '03112003', 2],
['YES', 'A', '22082010', '03012001', 9],
['YES', 'I', '15072016', '30092005', 7],
['YES', 'Y', '08111999', '02022006', 3],
['NO', 'V', '04012016', '10061996', 1],
['NO', 'I', '21012003', '11022001', 6],
['NO', 'P', '06041992', '30111993', 6],
['NO', 'W', '30081992', '02012016', 6]]
apply_async_with_callback(rows_to_parse, df)
Was ist sonst: #no match, gib es Argumente zu schreiben an df soll tun? Ich denke, wenn Sie 'zurückgeben [a, b, c, d, e]' Ihr Code wird tatsächlich abgeschlossen, aber Sie werden andere Probleme haben, verwenden Sie auch nie dataf überall –
Vielen Dank für das Hinzeigen, ich habe den Code geändert. So wird '[a, b, c, d, e]' in die Funktion 'log_result' geschrieben. – user3374113
'partially (log_result, dataf = dfr)' stimmt nicht mit der Signatur von 'log_results' überein – mdurant