2012-07-30 14 views
87

Ich muss oft eine Funktion auf die Gruppen einer sehr großen DataFrame (von gemischten Datentypen) anwenden und möchte mehrere Kerne nutzen.Effizientes Anwenden einer Funktion auf einen gruppierten Pandas DataFrame parallel

Ich kann einen Iterator aus den Gruppen erstellen und das Multiprocessing-Modul verwenden, aber es ist nicht effizient, da jede Gruppe und die Ergebnisse der Funktion für Messaging zwischen Prozessen gebeizt werden müssen.

Gibt es eine Möglichkeit, das Beizen zu vermeiden oder das Kopieren des DataFrame vollständig zu vermeiden? Es sieht so aus, als ob die Shared-Memory-Funktionen der Multiprocessing-Module auf numpy Arrays beschränkt sind. Gibt es noch andere Möglichkeiten?

+0

Soweit ich weiß, gibt es keine Möglichkeit, beliebige Objekte zu teilen. Ich frage mich, ob das Beizen viel mehr Zeit in Anspruch nimmt als der Gewinn durch Multiprozessing. Vielleicht sollten Sie nach einer Möglichkeit suchen, größere Arbeitspakete für jeden Prozess zu erstellen, um die relative Beizzeit zu reduzieren. Eine andere Möglichkeit wäre die Verwendung von Multiprocessing beim Erstellen der Gruppen. –

+3

Ich mache so etwas, aber mit UWSGI, Flask und Preforking: Ich lade den Pandas-Datenframe in einen Prozess, forktiere ihn x-mal (mach es zu einem Shared-Memory-Objekt) und rufe dann diese Prozesse von einem anderen Python-Prozess auf. atm Ich benutze JSON als Kommunikationsprozess, aber das kommt (noch immer sehr experimentell): http://pandas.pydata.org/pandas-docs/dev/io.html#msgpack-experimental – Carst

+0

Übrigens, hast du? jemals HDF5 mit Chunking betrachten? (HDF5 ist nicht für gleichzeitiges Schreiben, aber Sie können auch in separaten Dateien speichern und am Ende verketten Zeug) – Carst

Antwort

12

Von den obigen Kommentaren scheint es, dass dies für pandas einige Zeit geplant ist (es gibt auch eine interessant aussehende rosetta project, die ich gerade bemerkt habe).

jedoch bis jeder parallele Funktionalität in pandas eingebaut ist, bemerkte ich, dass es sehr einfach zu schreiben effizienten & Nicht-Speicher-Kopieren parallel Augmentationen zu pandas direkt mit cython + OpenMP und C++.

Hier ist ein kurzes Beispiel eine parallele groupby-Summe zu schreiben, deren Verwendung so etwas wie dieses:

import pandas as pd 
import para_group_demo 

df = pd.DataFrame({'a': [1, 2, 1, 2, 1, 1, 0], 'b': range(7)}) 
print para_group_demo.sum(df.a, df.b) 

und Ausgang:

 sum 
key  
0  6 
1  11 
2  4 

Hinweis Ohne Zweifel, diese Die einfache Beispielfunktionalität wird schließlich Teil von pandas sein. Einige Dinge werden jedoch in C++ für einige Zeit natürlicher sein und es ist wichtig zu wissen, wie einfach es ist, dies in pandas zu kombinieren.


Um dies zu tun, schrieb ich eine einfache Single-Source-Datei-Erweiterung, deren Code folgt.

Es beginnt mit einiger Ein- und Typdefinitionen

from libc.stdint cimport int64_t, uint64_t 
from libcpp.vector cimport vector 
from libcpp.unordered_map cimport unordered_map 

cimport cython 
from cython.operator cimport dereference as deref, preincrement as inc 
from cython.parallel import prange 

import pandas as pd 

ctypedef unordered_map[int64_t, uint64_t] counts_t 
ctypedef unordered_map[int64_t, uint64_t].iterator counts_it_t 
ctypedef vector[counts_t] counts_vec_t 

Der C++ unordered_map Typ zum Summieren von einem einzelnen Faden ist, und die vector ist von allen Threads zum Summieren.

Nun zur Funktion sum.Es beginnt mit typed memory views für schnellen Zugriff off:

def sum(crit, vals): 
    cdef int64_t[:] crit_view = crit.values 
    cdef int64_t[:] vals_view = vals.values 

Die Funktion weiter durch Dividieren der halb gleichmäßig auf die Fäden (hier einprogrammiert bis 4), und wobei jeder Faden Summe der Einträge in seinem Bereich:

cdef uint64_t num_threads = 4 
    cdef uint64_t l = len(crit) 
    cdef uint64_t s = l/num_threads + 1 
    cdef uint64_t i, j, e 
    cdef counts_vec_t counts 
    counts = counts_vec_t(num_threads) 
    counts.resize(num_threads) 
    with cython.boundscheck(False): 
     for i in prange(num_threads, nogil=True): 
      j = i * s 
      e = j + s 
      if e > l: 
       e = l 
      while j < e: 
       counts[i][crit_view[j]] += vals_view[j] 
       inc(j) 

Wenn die Fäden abgeschlossen haben, geht die Funktion alle Ergebnisse (aus den verschiedenen Bereichen) in einem einzigen unordered_map:

cdef counts_t total 
    cdef counts_it_t it, e_it 
    for i in range(num_threads): 
     it = counts[i].begin() 
     e_it = counts[i].end() 
     while it != e_it: 
      total[deref(it).first] += deref(it).second 
      inc(it)   

Alle th auf der linken Seite ist ein DataFrame und geben die Ergebnisse zu erstellen:

key, sum_ = [], [] 
    it = total.begin() 
    e_it = total.end() 
    while it != e_it: 
     key.append(deref(it).first) 
     sum_.append(deref(it).second) 
     inc(it) 

    df = pd.DataFrame({'key': key, 'sum': sum_}) 
    df.set_index('key', inplace=True) 
    return df 
Verwandte Themen