Can you nest parallel operations in R?

August 15, 2016
By

(This article was first published on R – Win-Vector Blog, and kindly contributed to R-bloggers)

When we teach parallel programming in R we start with the basic use of parallel (please see here for example). This is, in our opinion, a necessary step before getting into clever notation and wrapping such as doParallel and foreach. Only then do the students have a sufficiently explicit interface to frame important questions about the semantics of parallel computing. Beginners really need a solid mental model of what services are really being provided by their tools and to test edge cases early.

One question that comes up over and over again is “can you nest parLapply?”

The answer is “no.” This is in fact an advanced topic, but it is one of the things that pops up when you start worrying about parallel programming. Please read on for what that is the right answer and how to work around that (simulate a “yes”).

I don’t think the above question is usually given sufficient consideration (nesting parallel operations can in fact make a lot of sense). You can’t directly nest parLapply, but that is a different issue than can one invent a work-around. For example: a “yes” answer (really meaning there are work-arounds) can be found here. Again this is a different question than “is there a way to nest foreach loops” (which is possible through the nesting operator %.% which presumably handles working around nesting issues in parLapply).

Let’s set up a concrete example, so we can ask and answer a precise question. Suppose we have a list of jobs (coming from an external source) that we will simulate with the code fragment below.

jobs <- list(1:3,5:10,100:200)

Notice the jobs have wildly diverging sizes, this is an important consideration.

Suppose the task we want to perform is some the square roots of the entries. The standard (non-parallel) calculation would look like the following.

worker1 <- function(x) {
  sum(sqrt(x))
}

lapply(jobs,worker1)
## [[1]]
## [1] 4.146264
## 
## [[2]]
## [1] 16.32201
## 
## [[3]]
## [1] 1231.021

For didactic purposes please pretend that the sum function is very expensive and the sqrt function is somewhat expensive.

If it was obvious we always had a great number of small sub-lists we would want to use parallelization to make sure we are performing many sums at the same time. We would then parallelize over the first level as below.

clus <- parallel::makeCluster(4)
parallel::parLapplyLB(clus,jobs,worker1)
## [[1]]
## [1] 4.146264
## 
## [[2]]
## [1] 16.32201
## 
## [[3]]
## [1] 1231.021

Notice that parallel::parLapplyLB uses almost the same calling convention as lapply and returns the exact same answer.

If it was obvious we had a single large sub-list we would want to make sure we were always parallelizing the sqrt operations so we would prefer to parallelize as follows:

mkWorker2 <- function(clus) {
  force(clus)
  function(x) {
    xs <- parallel::parLapplyLB(clus,x,sqrt)
    sum(as.numeric(xs))
  }
}

worker2 <- mkWorker2(clus)

lapply(jobs,worker2)
## [[1]]
## [1] 4.146264
## 
## [[2]]
## [1] 16.32201
## 
## [[3]]
## [1] 1231.021

(For the details of building functions and passing values to remote workers please see here.)

If we were not sure if in the future what structure we would encounter we would prefer to schedule all operations for possible parallel execution. This would minimize the number of idle resources and minimize the time to finish the jobs. Ideally that would look like the following (a nested use of parallel):

parallel::parLapplyLB(clus,jobs,worker2)
## Error in checkForRemoteErrors(val): 3 nodes produced errors; first error: invalid connection

Notice the above fails with an error. Wishing for flexible code is what beginners intuitively mean when they as if you can nest parallel calls. They may not be able to explain it, but they are worried they don’t have a good characterization of the work they are trying to parallelize over. They are not asking if things get magically faster by “parallelizing parallel.”

It isn’t too hard to find out what the nature of the error is: the communication connection socket file descriptors (con) are passed as integers to each machine, but they are not valid descriptors where they arrive (they are just integers). We can see this by looking at the structure of the cluster:

str(clus)
## List of 4
##  $ :List of 3
##   ..$ con :Classes 'sockconn', 'connection'  atomic [1:1] 5
##   .. .. ..- attr(*, "conn_id")= 
##   ..$ host: chr "localhost"
##   ..$ rank: int 1
##   ..- attr(*, "class")= chr "SOCKnode"
##  $ :List of 3
##   ..$ con :Classes 'sockconn', 'connection'  atomic [1:1] 6
##   .. .. ..- attr(*, "conn_id")= 
##   ..$ host: chr "localhost"
##   ..$ rank: int 2
##   ..- attr(*, "class")= chr "SOCKnode"
##  $ :List of 3
##   ..$ con :Classes 'sockconn', 'connection'  atomic [1:1] 7
##   .. .. ..- attr(*, "conn_id")= 
##   ..$ host: chr "localhost"
##   ..$ rank: int 3
##   ..- attr(*, "class")= chr "SOCKnode"
##  $ :List of 3
##   ..$ con :Classes 'sockconn', 'connection'  atomic [1:1] 8
##   .. .. ..- attr(*, "conn_id")= 
##   ..$ host: chr "localhost"
##   ..$ rank: int 4
##   ..- attr(*, "class")= chr "SOCKnode"
##  - attr(*, "class")= chr [1:2] "SOCKcluster" "cluster"
mkWorker3 <- function(clus) {
  force(clus)
  function(x) {
    as.character(clus)
  }
}

worker3 <- mkWorker3(clus)
parallel::parLapplyLB(clus,jobs,worker3)
## [[1]]
## [1] "list(con = 5, host = \"localhost\", rank = 1)"
## [2] "list(con = 6, host = \"localhost\", rank = 2)"
## [3] "list(con = 7, host = \"localhost\", rank = 3)"
## [4] "list(con = 8, host = \"localhost\", rank = 4)"
## 
## [[2]]
## [1] "list(con = 5, host = \"localhost\", rank = 1)"
## [2] "list(con = 6, host = \"localhost\", rank = 2)"
## [3] "list(con = 7, host = \"localhost\", rank = 3)"
## [4] "list(con = 8, host = \"localhost\", rank = 4)"
## 
## [[3]]
## [1] "list(con = 5, host = \"localhost\", rank = 1)"
## [2] "list(con = 6, host = \"localhost\", rank = 2)"
## [3] "list(con = 7, host = \"localhost\", rank = 3)"
## [4] "list(con = 8, host = \"localhost\", rank = 4)"

What we are getting wrong is: we can’t share control of the cluster to each worker just by passing the cluster object around. This would require some central registry and call-back scheme (which is one of the things packages like foreach and doParallel accomplish when they “register a parallel back-end to use”). Base parallel depends more on explicit reference to the cluster data structure, so it isn’t “idiomatic parLapply” to assume we can find “the parallel cluster” (there could in fact be more than one at the same time).

So what is the work around?

One work around is to move to sophisticated wrappers (like doParallel or even future, also see here).

Another fix is to (at the cost of time effort and space) to re-organize the calculation into two sequenced phases, each of which is parallel- but not nested. It is a bit involved, but we show how to do that below (using R’s Reduce and split functions to reorganize the data, though one could also use so-called “tidyverse” methods).

# Preparation 1: collect all items into one flat list
sqrtjobs <- as.list(Reduce(c,jobs))
# Phase 1: sqrt every item in parallel
sqrts <- parallel::parLapplyLB(clus,sqrtjobs,sqrt)
# Preparation 2: re-assemble new job list that needs only sums
lengths <- vapply(jobs,length,numeric(1))
pattern <- lapply(seq_len(length(lengths)),
                  function(i) {rep(i,lengths[[i]])})
pattern <- Reduce(c,pattern)
sumjobs <- split(sqrts,Reduce(c,pattern))
sumjobs <- lapply(sumjobs,as.numeric)
names(sumjobs) <- names(jobs)
# Phase 2: sum all items in parallel
parallel::parLapplyLB(clus,sumjobs,sum)
## [[1]]
## [1] 4.146264
## 
## [[2]]
## [1] 16.32201
## 
## [[3]]
## [1] 1231.021

In conclusion: you can’t directly nest parLapply, but you can usefully sequence through it.

parallel::stopCluster(clus)

To leave a comment for the author, please follow the link and comment on their blog: R – Win-Vector Blog.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.

Search R-bloggers


Sponsors

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)