2017-07-30 30 views
6

Ich habe den folgenden Datenrahmen und meine Absicht ist es, alle IDs zu finden, die unterschiedliche Verwendung aber den gleichen TYP haben.multidplyr und group_by() und filter()

ID <- rep(1:4, each=3) 
USAGE <- c("private","private","private","private", 
"taxi","private","taxi","taxi","taxi","taxi","private","taxi") 
TYPE <- c("VW","VW","VW","VW","MER","VW","VW","VW","VW","VW","VW","VW") 
df <- data.frame(ID,USAGE,TYPE) 

Wenn ich

df %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1) 

laufen bekomme ich das gewünschte Ergebnis. Aber mein ursprünglicher Datenrahmen hat> 2 Millionen Zeilen. Daher möchte ich alle meine Kerne verwenden, um diese Operation auszuführen.

habe ich versucht, diesen Code mit multidplyr:

f1 <- partition(df, ID) 
f2 <- f1 %>% group_by(ID, TYPE) %>% filter(n_distinct(USAGE)>1) 
f3 <- collect(f2) 

Aber dann erscheint die folgende Meldung:

Warning message: group_indices_.grouped_df ignores extra arguments 

nach

f1 <- partition(df, ID) 

und

Error in checkForRemoteErrors(lapply(cl, recvResult)) : 
    4 nodes produced errors; first error: Evaluation error: object 'f1' not found. 

nach

f2 <- f1%>% group_by(ID, TYPE) %>% filter(f1, n_distinct(USAGE)>1) 

Was ist der richtige Weg wäre, die ganze Operation in multidplyr zu implementieren? Danke vielmals.

Antwort

2

Sie sollten alle Gruppierungsvariablen in Ihren Aufruf an partition() einschließen. Auf diese Weise hat jeder Kern alle Daten, die benötigt werden, um eine Berechnung für eine gegebene Gruppe durchzuführen.

library(tidyverse) 
library(multidplyr) 

fast <- df %>% 
    partition(ID, TYPE) %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) %>% 
    collect() 

Verification

Sie werden immer noch die Warnung über group_indices bekommen, aber die Ergebnisse sind die gleichen wie die ursprüngliche dplyr Methode.

slow <- df %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) 

fast == slow 
     ID USAGE TYPE 
#[1,] TRUE TRUE TRUE 
#[2,] TRUE TRUE TRUE 
#[3,] TRUE TRUE TRUE 

Benchmarking

Nun ist die große Frage: Ist es schneller? Durch die Definition von cluster können wir sicherstellen, dass alle Cores verwendet werden.

library(microbenchmark) 
library(parallel) 

cluster <- create_cluster(cores = detectCores()) 

fast_func <- function(df) { 
    df %>% 
    partition(ID, TYPE, cluster = cluster) %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) %>% 
    collect() 
} 

slow_func <- function(df) { 
    slow <- df %>% 
    group_by(ID, TYPE) %>% 
    filter(n_distinct(USAGE) > 1) 
} 

microbenchmark(fast_func(df), slow_func(df)) 
# Unit: milliseconds 
# expr  min  lq  mean median  uq  max neval cld 
# fast_func(df) 41.360358 47.529695 55.806609 50.529851 61.459433 133.53045 100 b 
# slow_func(df) 4.717761 6.974897 9.333049 7.796686 8.468594 49.51916 100 a 

Parallelverarbeitung ist eigentlich langsamer in diesem Fall. Der Medianlauf für fast_func dauert 56 Millisekunden anstelle von 9. Das liegt an dem Aufwand, der mit dem Verwalten des Datenflusses über Cluster verbunden ist. Aber Sie sagten, Ihre Daten hätten Millionen von Zeilen. Versuchen wir das also.

# Embiggen the data 
df <- df[rep(seq_len(nrow(df)), each=2000000),] %>% tbl_df() 

microbenchmark(fast_func(df), slow_func(df)) 
# Unit: seconds 
# expr  min  lq  mean median  uq  max neval cld 
# fast_func(df) 43.067089 43.781144 50.754600 49.440864 55.308355 65.499095 10 b 
# slow_func(df) 1.741674 2.550008 3.529607 3.246665 3.983452 7.214484 10 a 

Mit dem riesigen Datensatz ist fast_func noch langsamer! Es gibt Zeiten, in denen das parallele Ausführen viel Zeit spart, aber ein einfacher gruppierter Filter ist nicht unbedingt einer von ihnen.