2017-06-22 1 views
1

Ich habe zwei Gruppen, eine mit den Zeilen, die als Gruppen verarbeitet werden sollen, eine andere mit zu betrachtenden Gruppen.Multiprocessing-Gruppe python anwenden

test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']}) 
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']}) 

gr1 = test.groupby('city') 
gr2 = test2.groupby('city') 

Derzeit bewerbe ich meine Funktion auf jede Zeile der Gruppe,

gr1.apply(lambda x: custom_func(x.Address1, gr2.get_group(x.name))) 

Ich weiß nicht, aber, wie auf diesem Multiprozessing zu tun. Bitte beraten.

EDIT: - Ich habe versucht, dask zu verwenden, aber ich kann den gesamten Datenrahmen nicht zu meiner Funktion in dask übergeben - da es eine Beschränkung mit seiner apply Funktion gibt. Und ich habe versucht, dask apply auf meine gr1 (group) anzuwenden, aber da ich den index in meiner benutzerdefinierten Funktion setze, gibt dies einen Fehler aus, der sagt "Zu viele Indexer".

hier mit Dask, gibt dies mir ein Fehler - 'Pandas' object has no attribute 'city'

ddf1 = dd.from_pandas(test, 2) 
ddf2 = dd.from_pandas(test2, 2) 

dgr1 = ddf1.groupby('city') 
dgr2 = ddf2.groupby('city') 

meta = pd.DataFrame(columns=['Address1', 'score', 'idx','source_index']) 
ddf1.map_partitions(custom_func, x.Address1, dgr2.get_group(x.city).Address1,meta=meta).compute() 
+0

werfen Sie einen Blick auf 'dask', es ist gut mit Pandas integriert. – suvy

+0

Yeah sah das, aber dask unterstützt keine Übergabe von Datenrahmen mit der Apply-Funktion. Zweitens, als ich versuchte, dask auf Gruppe anzuwenden, scheitert es mit "zu vielen Indexern", während ich versuche, den Index innerhalb meiner custom_func zu setzen. –

+0

dask apply sollte reihenweise arbeiten, für spaltenweise map_partition. Vielleicht ist es cool, editieren Sie Ihre Frage mit was Sie versucht haben und Fehler gemeldet. – suvy

Antwort

2

biete ich eine alternative Lösung dask hier

import pandas as pd 
from multiprocessing import Pool 
test = pd.DataFrame({'Address1':['123 Cheese Way','234 Cookie Place','345 Pizza Drive','456 Pretzel Junction'],'city':['X','U','X','U']}) 
test2 = pd.DataFrame({'Address1':['123 chese wy','234 kookie Pl','345 Pizzza DR','456 Pretzel Junktion'],'city':['X','U','Z','Y'] , 'ID' : ['1','3','4','8']}) 

test=test.assign(dataset = 'test') 
test2=test2.assign(dataset = 'test2') 

newdf=pd.concat([test2,test],keys = ['test2','test']) 
gpd=newdf.groupby('city') 
def my_func(mygrp): 
    test_data=mygrp.loc['test'] 
    test2_data=mygrp.loc['test2'] 
    #do something specific 
    #if needed print something 
    return {'Address':test2_data.Address1.values[0],'ID':test2_data.ID.values[0]} #return some other stuff 

mypool=Pool(processes=2) 
ret_list=mypool.imap(my_func,(group for name, group in gpd)) 

pd.DataFrame(ret_list) 

kehrt so etwas wie

ID address 
0 3 234 kookie Pl 
1 1 123 chese wy 
2 8 456 Pretzel Junktion 
3 4 345 Pizzza DR 

zu verwenden PS: In der OP-Frage werden zwei ähnliche Datensätze in einer spezialisierten Funktion verglichen, die Lösung hier u ses pandas.concat. Man könnte sich auch eine pd.merge je nach Problem vorstellen.

+0

Hey. Danke für Ihre Hilfe. Erhalten Sie diesen Speicherfehler. :('Traceback (letzter Aufruf zuletzt): Datei" script.py ", Zeile 98, in main() Datei" script.py ", Zeile 87, in Haupt ret_list = mypool.map (my_func, (Gruppe für Name, Gruppe in gpd)) Datei "/home/ubuntu/anaconda2/lib/python2.7/multiprocessing/pool.py", Zeile 251, in Karte return self.map_async (func, iterable, chunksize) .get() Datei "/home/ubuntu/anaconda2/lib/python2.7/multiprocessing/pool.py", Zeile 567, in get raise self._value MemoryError' –

+0

Obwohl ich 32GB RAM und 256GB Festplatte habe Es hat mir nicht viel geholfen, ich denke, pool.map kopiert die gesamten Daten in jeden Unterprozess, was zu Speicherproblemen führt Ich brauche keine Daten vom Elternprozess t von dem, was ich passiere. Wie erreiche ich das? –

+0

Yaa Map wird eine Liste von Iteratoren erstellen, die intensiv sind, also benutze stattdessen mypool.imap. – suvy

Verwandte Themen