In this post, I will talk about parallelism in R. This post will likely be biased towards the solutions I use. For example, I never use `mcapply` nor `clusterApply`. I prefer to always use `foreach`. In this post, we will focus on how to parallelize R code on your computer.

I will use mainly silly examples, just to show one point at a time.

## Basics of foreach

You can install the foreach package with `install.packages("foreach")`.

``````library(foreach)

foreach(i = 1:3) %do% {
sqrt(i)
}``````
``````## []
##  1
##
## []
##  1.414214
##
## []
##  1.732051``````

So, in the example above, you iterate on `i` and apply the expression `sqrt(i)`. `foreach` returns a list by default. A common mistake is to think that `foreach` is like a for-loop. Actually, `foreach` is more like `lapply`.

``````lapply(1:3, function(i) {
sqrt(i)
})``````
``````## []
##  1
##
## []
##  1.414214
##
## []
##  1.732051``````

Parameter `.combine` is very useful.

``````foreach(i = 1:3, .combine = 'c') %do% {
sqrt(i)
}``````
``##  1.000000 1.414214 1.732051``

With `lapply`, we would do

``````res <- lapply(1:3, function(i) {
sqrt(i)
})
do.call('c', res)``````
``##  1.000000 1.414214 1.732051``

## Parallelize with foreach

You need to do at least two things:

• replace `%do%` by `%dopar%`. Basically, always use `%dopar%` because you can use `registerDoSEQ()` is you really want to run the `foreach` sequentially.
• register a parallel backend using one of the packages that begin with do (such as `doParallel`, `doMC`, `doMPI` and more). I will list only the two main parallel backends because there are too many of them.

### Using clusters

``````# Example registering clusters
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
sqrt(i)
}``````
``##  1.000000 1.414214 1.732051``
``parallel::stopCluster(cl)``

In this situation, all the data and packages used must be exported (copied) to the clusters, which can add some overhead. Yet, at least, you know what you do.

### Using forking

``````cl <- parallel::makeForkCluster(2)
doParallel::registerDoParallel(cl)
foreach(i = 1:3, .combine = 'c') %dopar% {
sqrt(i)
}``````
``##  1.000000 1.414214 1.732051``
``parallel::stopCluster(cl)``

Forking just copy the R session at its current state. This is very fast because it copies objects only it they are modified. Moreover, you don’t need to export variables nor packages because they are already in the session. However, this can’t be used on Windows. This is why I use the clusters option in my packages. Also, I don’t trust forking to always work.

## Common problems/mistakes/questions

### Exporting variables and packages

``````# Some data and function
library(dplyr)
dfs <- rep(list(iris), 3)
count(dfs[], Species)``````
``````## # A tibble: 3 x 2
##      Species     n
##
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50``````
``````# Sequential processing to apply to
# all the data frames of the list 'dfs'
registerDoSEQ()
myFun <- function() {
foreach(i = seq_along(dfs)) %dopar% {
df <- dfs[[i]]
count(df, Species)
}
}
str(myFun())``````
``````## List of 3
##  \$ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..\$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..\$ n      : int [1:3] 50 50 50
##  \$ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..\$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..\$ n      : int [1:3] 50 50 50
##  \$ :Classes 'tbl_df', 'tbl' and 'data.frame':    3 obs. of  2 variables:
##   ..\$ Species: Factor w/ 3 levels "setosa","versicolor",..: 1 2 3
##   ..\$ n      : int [1:3] 50 50 50``````
``````# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun(), error = function(e) print(e))``````
``## ``
``parallel::stopCluster(cl)``

Hey, why doesn’t this work anymore? `foreach` will export all the needed variables that are present in its environment (here, the environment of `myFun`) and `dfs` is not in this environment. Some will tell you to use option `.export` of `foreach` but I don’t think it’s good practice. You just have to pass `dfs` to `myFun`.

``````myFun2 <- function(dfs) {
foreach(i = seq_along(dfs)) %dopar% {
df <- dfs[[i]]
count(df, Species)
}
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun2(dfs), error = function(e) print(e))``````
``## ``
``parallel::stopCluster(cl)``

Hey, this still doesn’t work. You also need to load packages. You could use option `.packages` of `foreach` but you could simply add `dplyr::` before `count`. It will be clearer (like one does in packages).

``````myFun3 <- function(dfs) {
foreach(i = seq_along(dfs)) %dopar% {
df <- dfs[[i]]
dplyr::count(df, Species)
}
}
# Try in parallel
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tryCatch(myFun3(dfs), error = function(e) print(e))``````
``````## []
## # A tibble: 3 x 2
##      Species     n
##
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50
##
## []
## # A tibble: 3 x 2
##      Species     n
##
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50
##
## []
## # A tibble: 3 x 2
##      Species     n
##
## 1     setosa    50
## 2 versicolor    50
## 3  virginica    50``````
``parallel::stopCluster(cl)``

### Iterate over lots of elements.

``````cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
foreach(i = seq_len(2e4), .combine = 'c') %dopar% {
sqrt(i)
}
)``````
``````##    user  system elapsed
##   6.144   0.329   6.646``````
``parallel::stopCluster(cl)``

Iterating over multiple elements in R is bad for performance. Moreover, `foreach` is only combining results 100 by 100, which also slows computations.

If there are too many elements to loop over, the best is to split the computation in ncores blocks and to perform some optimized sequential work on each block. In package bigstatsr, I use the following function to split indices in `nb` groups because I often need to iterate over hundreds of thousands of elements (columns).

``bigstatsr:::CutBySize``
``````## function (m, block.size, nb = ceiling(m/block.size))
## {
##     if (nb > m)
##         nb <- m
##     int <- m/nb
##     upper <- round(1:nb * int)
##     lower <- c(1, upper[-nb] + 1)
##     size <- c(upper, diff(upper))
##     cbind(lower, upper, size)
## }
##
## ``````
``bigstatsr:::CutBySize(20, nb = 3)``
``````##      lower upper size
## [1,]     1     7    7
## [2,]     8    13    6
## [3,]    14    20    7``````

### Filling something in parallel

Using foreach loop in r returning NA

``````mat <- matrix(0, 5, 8)
registerDoSEQ()
# Nested foreach loop
tmp <- foreach(j = 1:8) %:%
foreach(i = 1:5) %dopar% {
mat[i, j] <- i + j
}
mat``````
``````##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    2    3    4    5    6    7    8    9
## [2,]    3    4    5    6    7    8    9   10
## [3,]    4    5    6    7    8    9   10   11
## [4,]    5    6    7    8    9   10   11   12
## [5,]    6    7    8    9   10   11   12   13``````
``````# Try in parallel
mat2 <- matrix(0, 5, 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp2 <- foreach(j = 1:8) %:%
foreach(i = 1:5) %dopar% {
mat2[i, j] <- i + j
}
parallel::stopCluster(cl)
mat2``````
``````##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    0    0    0    0    0    0    0    0
## [2,]    0    0    0    0    0    0    0    0
## [3,]    0    0    0    0    0    0    0    0
## [4,]    0    0    0    0    0    0    0    0
## [5,]    0    0    0    0    0    0    0    0``````

There are two problems here:

1. `mat` is filled in the sequential version but won’t be in the parallel version. Why? This is because when using parallelism, `mat` is copied so that each core modifies a copy of the matrix, not the original one.

2. `foreach` returns something (here a two-level list).

To overcome this problem, you could use shared-memory. For example, with my package bigstatsr.

``````library(bigstatsr)
mat3 <- FBM(5, 8)
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
tmp3 <- foreach(j = 1:8, .combine = 'c') %:%
foreach(i = 1:5, .combine = 'c') %dopar% {
mat3[i, j] <- i + j
NULL
}
parallel::stopCluster(cl)
mat3[]``````
``````##      [,1] [,2] [,3] [,4] [,5] [,6] [,7] [,8]
## [1,]    2    3    4    5    6    7    8    9
## [2,]    3    4    5    6    7    8    9   10
## [3,]    4    5    6    7    8    9   10   11
## [4,]    5    6    7    8    9   10   11   12
## [5,]    6    7    8    9   10   11   12   13``````
``tmp3``
``## NULL``

The original matrix is now modified. Note that I return `NULL` to save memory.

### Parallelize over a large matrix

``````mat <- matrix(0, 1e4, 1e4); mat[] <- rnorm(length(mat))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
Sys.sleep(1)
mat[1, 1]
}
)``````
``````##    user  system elapsed
##   1.834   0.262   3.806``````
``parallel::stopCluster(cl)``

Copying `mat` to both clusters takes time (and memory!).

``````mat2 <- FBM(1e4, 1e4); mat2[] <- rnorm(length(mat2))
cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
system.time(
tmp <- foreach(k = 1:2, .combine = 'c') %dopar% {
Sys.sleep(1)
mat2[1, 1]
}
)``````
``````##    user  system elapsed
##   0.025   0.005   2.410``````
``parallel::stopCluster(cl)``

This is faster because it’s using a matrix that is stored on disk (so shared between processes) so that it doesn’t need to be copied.

For example, you may need to write to the same data (maybe increment it). In this case, it is important to use some locks so that only one session writes to the data at the same time. For that, you could use package flock, which is really easy to use.

``````mat <- FBM(1, 1, init = 0)
mat[]``````
``##  0``
``````cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
mat[1, 1] <- mat[1, 1] + k
NULL
}``````
``## NULL``
``````parallel::stopCluster(cl)
mat[]``````
``##  33``
``sum(1:10)``
``##  55``
``````lock <- tempfile()
mat2 <- FBM(1, 1, init = 0)
mat2[]``````
``##  0``
``````cl <- parallel::makeCluster(2)
doParallel::registerDoParallel(cl)
foreach(k = 1:10, .combine = 'c') %dopar% {
locked <- flock::lock(lock)
mat2[1, 1] <- mat2[1, 1] + k
flock::unlock(locked)
NULL
}``````
``## NULL``
``````parallel::stopCluster(cl)
mat2[]``````
``##  55``

So each process uses some lock to perform its incrementation so that the data can’t be changed by some other process in the meantime.

You may also need to use some message passing or some barriers. For that, you could learn to use MPI. For some basic use, I “reimplemented” this using only shared-memory matrices (FBMs). You can see this function is you’re interested.

## Miscellenaous

• Recall that you won’t gain much from parallelism. You’re likely to gain much more performance by simply optimizing your sequential code. Don’t reproduce the silly examples here as real code, they are quite bad.

• How to print during parallel execution? Use option `outfile` in `makeCluster` (for example, using `outfile = ""` will redirect to the console).

• Don’t try to parallelize huge matrix operations with loops. There are already (parallel) optimized linear algebra libraries that exist and which will be much faster. For example, you could use Microsoft R Open.

• Some will tell you to use `parallel::detectCores() - 1` cores.

## Conclusion

Hope this can help some.

Don’t hesitate to comment if you want to add/modify something to this post.