Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

I really love the plyr package. Apart from having a progress bar and plyr handeling a lot of the overhead, a very interesting feature is being able to run plyr in parallel mode. Essentially, setting .parallel = TRUE runs any plyr function in parallel. This is under the assumption that a parallel backend was registered. In my case, I use the doSNOW package to register a backend that uses the Simple Network of Workstations (SNOW) package for parallel computing.

However, the focus of this post is not on how exactly SNOW works, but on a particular problem I had. My problem will be most effectively explained using the following example (note that I assume that a parallel backend has been registered):

bla = function(x) {
return(x*y)
}

y = 10
dat = data.frame(x = 1:10, category = LETTERS[1:10])
ddply(dat, .(category), bla, .parallel = TRUE)
# Error in do.ply(i) : task 1 failed - "object 'y' not found"

The problem in this case is that the object y is not present in the environment of the worker nodes. Luckily, snow provides a few functions that allow us to load objects, functions and packages into the worker nodes. The clusterExport can be used to load objects and functions, the clusterEvalQ can be used to load packages. To streamline setting up and configuring a cluster that can be used by plyr, I wrote a small function createCluster. It assumes that you have snow and doSNOW installed, installing doSNOW gets you all the required packages. Note that the function also requires an adapted version of the clusterExport function to allow exports from other environments that .GlobalEnv.

clusterExport <- local({
gets <- function(n, v) { assign(n, v, envir = .GlobalEnv); NULL }
function(cl, list, envir = .GlobalEnv) {
## do this with only one clusterCall--loop on slaves?
for (name in list) {
clusterCall(cl, gets, name, get(name, envir = envir))
}
}
})

# Functions
createCluster = function(noCores, logfile = "/dev/null", export = NULL, lib = NULL) {
require(doSNOW)
cl <- makeCluster(noCores, type = "SOCK", outfile = logfile)
if(!is.null(export)) clusterExport(cl, export)
if(!is.null(lib)) {
l_ply(lib, function(dum) {
clusterExport(cl, "dum", envir = environment())
clusterEvalQ(cl, library(dum, character.only = TRUE))
})
}
registerDoSNOW(cl)
return(cl)
}

where noCores is a numeric specifying the number of cores to register, logfile is the location where the output of the workers goes to (set to “” to output to screen), export is a list of objects/functions that needs to be loaded into the workers, and lib is a list of packages that need to be loaded into the workers. The following example show the function in action:

library(ggplot2)
library(doSNOW)

bla = function(arg) {
dum = ggplot(aes(x = x, y = x), data = arg)
summary(dum)
xi = bla2(arg$x) return(arg$x*xi)
}

bla2 = function(arg) {
return(arg + 1)
}

# Constants
y = 10
dat = data.frame(x = 1:10, category = LETTERS[1:10])

# Create a cluster

# Fails
# Error in do.ply(i) : task 1 failed - "could not find function "ggplot""
cl = createCluster(2)
res = ddply(dat, .(category), bla, .parallel = TRUE)
stopCluster(cl)

# Fails, pacakge is loaded, function 'bla2' is not
# Error in do.ply(i) : task 1 failed - "could not find function "bla2""
cl = createCluster(2, lib = list("ggplot2"))
res =ddply(dat, .(category), bla, .parallel = TRUE)
stopCluster(cl)

# Works! Also export the function 'bla2' and object 'y'
cl = createCluster(2, export = list("bla2","y"), lib = list("ggplot2"))
res = ddply(dat, .(category), bla, .parallel = TRUE)
stopCluster(cl)

# Sanity check
all.equal(res, ddply(dat, .(category), bla, .parallel = FALSE))
# TRUE!