2016-04-12 7 views
5

Angenommen, ich habe einen Pandas-Datenrahmen und eine Funktion, die ich auf jede Zeile anwenden möchte. Ich kann df.apply(apply_fn, axis=1) anrufen, die Zeit in der Größe von df linear dauern sollte. Oder ich kann df teilen und pool.map verwenden, um meine Funktion auf jedem Stück aufzurufen, und dann die Ergebnisse verketten.Warum führt die Verwendung von Multiprozessing mit Pandas zu einer so dramatischen Beschleunigung?

Ich habe erwartet, dass der Beschleunigungsfaktor pool.map ungefähr gleich der Anzahl der Prozesse im Pool ist (new_execution_time = original_execution_time/N, wenn N-Prozessoren verwendet werden - und das setzt Null Overhead voraus).

Stattdessen in diesem Spielzeug Beispiel Zeit fällt auf etwa 2% (0,005272/0,230757), wenn 4-Prozessoren verwenden. Ich habe höchstens 25% erwartet. Was ist los und was verstehe ich nicht?

import numpy as np 
from multiprocessing import Pool 
import pandas as pd 
import pdb 
import time 

n = 1000 
variables = {"hello":np.arange(n), "there":np.random.randn(n)} 
df = pd.DataFrame(variables) 

def apply_fn(series): 
    return pd.Series({"col_5":5, "col_88":88, 
         "sum_hello_there":series["hello"] + series["there"]}) 

def call_apply_fn(df): 
    return df.apply(apply_fn, axis=1) 

n_processes = 4 # My machine has 4 CPUs 
pool = Pool(processes=n_processes) 

t0 = time.process_time() 
new_df = df.apply(apply_fn, axis=1) 
t1 = time.process_time() 
df_split = np.array_split(df, n_processes) 
pool_results = pool.map(call_apply_fn, df_split) 
new_df2 = pd.concat(pool_results) 
t2 = time.process_time() 
new_df3 = df.apply(apply_fn, axis=1) # Try df.apply a second time 
t3 = time.process_time() 

print("identical results: %s" % np.all(np.isclose(new_df, new_df2))) # True 
print("t1 - t0 = %f" % (t1 - t0)) # I got 0.230757 
print("t2 - t1 = %f" % (t2 - t1)) # I got 0.005272 
print("t3 - t2 = %f" % (t3 - t2)) # I got 0.229413 

gespeichert ich den Code oben und lief es python3 my_filename.py verwenden.

PS Ich weiß, dass in diesem Spielzeug Beispiel new_df kann in einem viel einfacher Weise erstellt werden, ohne Anwendung zu verwenden. Ich bin daran interessiert, ähnlichen Code mit einem komplexeren apply_fn anzuwenden, der nicht nur Spalten hinzufügt.

Antwort

1

bearbeiten (meine Antwort war eigentlich falsch.)

time.process_time() (doc) misst die Zeit nur im aktuellen Prozess (und enthält keine Schlafzeit). So wird die Zeit in den Kindprozessen nicht berücksichtigt.

Ich führe Ihren Code mit time.time(), die reale Zeit misst (zeigt keine Beschleunigung überhaupt) und mit einer zuverlässigeren timeit.timeit (etwa 50% Beschleunigung). Ich habe 4 Kerne.

+0

Danke, Sie müssen recht haben. Ich verstehe jedoch nicht ganz, was vor sich geht. Warum passiert das, obwohl die 'time.process_time()' Aufrufe außerhalb der Multiprocessing Aufrufe sind? Liegt es daran, dass 'time.process_time()' nur die CPU-Zeit des übergeordneten Prozesses zurückgibt? – Adrian

+0

@Adrian Sorry, ich habe mich geirrt - die Child-Prozesse haben sich nicht mit 'process_time' beschäftigt. Entschuldigung für die Verwirrung. Ich aktualisiere die Antwort. – ptrj

Verwandte Themen