2017-12-01 2 views
9

einen Standard gruppiert Betrieb auf einem data.frame:Ersatz für parallele plyr mit domc

library(plyr) 
library(doMC) 
library(MASS) # for example 

nc <- 12 
registerDoMC(nc) 

d <- data.frame(x = c("data", "more data"), g = c("group1", "group2")) 
y <- "some global object" 

res <- ddply(d, .(g), function(d_group) { 
    # slow, complicated operations on d_group 
}, .parallel = FALSE) 

Es trivial Vorteil eines Multi-Core-Setup zu übernehmen, indem Sie einfach .parallel = TRUE Schreiben statt. Dies ist eines meiner Lieblingsfeatures von plyr.

library(dplyr) 
library(multidplyr) 
library(parallel) 
library(MASS) # for example 

nc <- 12 

d <- tibble(x = c("data", "more data"), g = c("group1", "group2")) 
y <- "some global object" 

cl <- create_cluster(nc) 
set_default_cluster(cl) 
cluster_library(cl, packages = c("MASS")) 
cluster_copy(cl, obj = y) 

d_parts <- d %>% partition(g, cluster = cl) 
res <- d_parts %>% collect() %>% ungroup() 

rm(d_parts) 
rm(cl) 

Sie können sich vorstellen, wie lange dieses Beispiel könnte:

Aber mit plyr veraltet ist (glaube ich) und im wesentlichen durch dplyr ersetzt, purrr usw., hat die Lösung für die parallele Verarbeitung wesentlich ausführlicher geworden Wenn Sie jedes Paket und Objekt, das Sie benötigen, innerhalb der Schleife betrachten, benötigen Sie einen eigenen cluster_* Befehl, um es auf die Knoten zu kopieren. Die nicht parallelisierte Plr-zu-Dplyr-Übersetzung ist nur eine einfache dplyr::group_by Konstruktion, und es ist bedauerlich, dass es keine knappe Möglichkeit gibt, die parallele Verarbeitung darauf zu ermöglichen. Also, meine Fragen sind:

  • Ist dies eigentlich die bevorzugte Art, um meinen Code von Plyr zu Dplyr zu übersetzen?
  • Welche Art von Magie passiert hinter den Kulissen in plyr, die es so einfach macht, die Parallelverarbeitung einzuschalten? Gibt es einen Grund, warum diese Fähigkeit besonders schwer zu dplyr hinzuzufügen wäre und deshalb existiert sie noch nicht?
  • Sind meine beiden Beispiele grundlegend anders in Bezug darauf, wie der Code ausgeführt wird?
+3

Re Ihrer dritten Frage: Ich würde Sag ja. Ihr 'plyr' Beispiel verwendet' doMC', das ist ein 'Multicore' Backend für' foreach', das heißt: ** forking **. Ihr 'multidplyr' Beispiel verwendet' create_cluster', das standardmäßig auf 'parallel :: makePSOCKcluster' steht, dh: ** Parallel SOCKet Cluster **. –

+1

Noch eine zweite Frage: die gleiche Art von Magie, die passiert, wenn Sie 'partition()' aufrufen, ohne vorher einen Cluster zu erstellen: 'plyr' basiert auf einem zuvor registrierten 'foreach'-Backend (' print (plyr ::: setup_parallel)) '),' multidplyr :: partition() 'ohne einen Cluster beruht implizit auf' create_cluster() ', würde aber wahrscheinlich ein anderes Backend erkennen, wenn eines bereits registriert ist (ich habe das nicht überprüft, multidplyr ::: cluster_exists)) '). Die ersten Beispiele der Vignette "multidsplyr" veranschaulichen diese Fähigkeit, einfach 'partition()' ohne vorheriges Setup aufzurufen. –

+1

Zu Ihrer ersten Frage: Soweit ich das beurteilen kann, erlaubt 'multiplypr' aus dem doc und aus meinen eigenen Experimenten nicht, wie" plyr "es tut, sondern nur" PSOCK ". –

Antwort

3
  1. Ich glaube nicht, dass es eine wahre 'bevorzugt' Art und Weise {plyr} Code {dplyr} zu übersetzen.

  2. In den Kommentaren hat @ Aurèle einen besseren Job gemacht, als ich jemals bei der Beschreibung der Verbindung zwischen {plyr} und {doMC} konnte. Eine Sache, die passiert ist, dass sich die Anreize etwas geändert haben. {doMC} stammt von Revolution Analytics (seit Kauf von Microsoft). Aber Hadley, der dplyr entwickelt hat, arbeitet derzeit bei RStudio. Diese beiden Unternehmen konkurrieren im IDE-Bereich. Es ist also naheliegend, dass ihre Pakete nicht gut aufeinander abgestimmt sind. Die einzige Form der Parallelität, die ich aus RStudio stark unterstützt habe, ist {sparklyr}, die sie relativ einfach eingerichtet haben. Aber ich kann nicht wirklich empfehlen, mit Spark zu arbeiten, um Parallelverarbeitung für eine einzelne Maschine zu machen.

  3. @ Aurèle hat die Ausführungsunterschiede wieder gut erklärt. Ihr neuer Code verwendet einen PSOCK-Cluster und den alten Code, der für forks verwendet wird. Gabeln verwenden eine Kopie im Schreibmodus für den Zugriff auf den Arbeitsspeicher, sodass parallele Prozesse mit dem Zugriff auf die gleichen Daten unmittelbar nach dem Verzweigungsprozess beginnen können. PSOCK-Cluster erzeugen neue Kopien von R - sie müssen Bibliotheken laden und eine explizite Kopie der Daten erhalten.

Sie ein Muster verwenden können, wie ...

library(dplyr) 
library(purrr) 
library(future) 
plan(multicore) 
options(mc.cores = availableCores()) 
d <- data.frame(x = 1:8, g = c("group1", "group2", "group3", "group4")) 
y <- "some global object" 


split(d, d$g) %>% 
    map(~ future({Sys.sleep(5);mean(.x$x)})) %>% 
    map_df(~value(.x)) 

... mit etwas Finesse auf dem map_df Schritt eine parallele Verarbeitung zu tun. Beachten Sie, dass unter {purrr} die Funktion der anonymen Funktion ~ ist, wobei .x für die Werte steht, die zugeordnet wurden.

Wenn Sie gefährlich leben mögen, können Sie möglicherweise eine Version von etwas zu schaffen, ähnlich, ohne mit {} Zukunft durch eine private Methode in {} purrr mit

mcmap <- function(.x, .f, ...) { 
    .f <- as_mapper(.f, ...) 
    mclapply(.x, function(.x) { 
    force(.f) 
    .Call(purrr:::map_impl, environment(), ".x", ".f", "list") 
    }) %>% 
    map(~ .x[[1]]) 
} 
+0

Danke für die Erklärung. Ich habe den Code noch nicht ausprobiert, aber purrr + future könnte eine nette Lösung sein. – Devin