Quick guide to parallel R with snow

[This article was first published on My Data Atelier » R, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Probably, the most common complains against R are related to its speed issues, especially when handling a high volume of information. This is, in principle, true, and relies partly on the fact that R does not run parallely…. unless you tell it to do so!

R offers a wide variety of packages dedicated to parallelisation. After doing several tries with some of them, I found snow the best one (for me) to perform classical parallellizable operations. Among the functions that this package provides, I found the parApply family the most useful ones and the easiest to work with to take profit of parallelisation.

In order to use these functions, it is necessary to have firstly a solid knoweldge of the apply-like functions in traditional R, i.e., lapply, sapply, vapply and apply. What are they exactly? In essence, they apply a function over an array of objects.

The example below would return a numeric vector consistent of 3,4,5…10 for variable “result”


my.vec <- 1:10

result <- vapply(my.vec,function(x) x+2,FUN.VALUE=0)

This code returns exactly the same output as the one below


my.vec <- 1:10

result <- numeric(length(my.vec))

for(i in 1:length(my.vec)){

result[i] <- my.vec[i]+2}

Of course, in this example no significant speed differences can be found between the 2 pieces of code shown. However, even in one-node executions, the first alternative is considerably faster, which can be appreciated when working with larger amount of data. For a vector of length 1000000, the difference is 0.94 vs 3.04 secs

In the internet, plenty of posts and tutorials about the use of lapply,sapply,vapply and apply can be found, for example here. However, it is not redundant to explain again what each function does:

a)Lapply

Applies a function over a vector and returns a vector of lists. If lapply were used in the previous example and would want to obtain the first element, a “list-like” indexing should be used


my.vec <- 1:10

result <- lapply(my.vec,function(x) x+2)

#get the third element of result

result[3][[1]]

b)Sapply/Vapply

It is very similar to lapply, but, instead of a vector of lists, it returns a vector of some type (numeric, character, Date, etc). Sapply will “deduce” the class of the output elements. In vapply, the output type has to be specified. Although it is simpler to use sapply, as there is no need to specify output type, vapply is faster (0.94 secs vs 4.04) and enables the user to control output type.



my.vec <- 1:1000000

system.time(result <- vapply(my.vec,function(x) x+2,FUN.VALUE=0))

system.time(result <- sapply(my.vec,function(x) x+2))

The FUN.VALUE argument is where the output type of vapply is specified, which is done by passing a “general form” to which the output should fit. If the output returned by the function does not match with the specified return type, R will throw an error


#These 2 alternatives are equivalent

vapply(letters[1:20], function(x) paste0(x,"_concat"),FUN.VALUE="")
vapply(letters[1:20], function(x) paste0(x,"_concat"),FUN.VALUE="aaa")

#This throws an error

vapply(letters[1:20], function(x) paste0(x,"_concat"),FUN.VALUE=0)

c)Apply

apply() is used to apply a function over a matrix row or columnwise, which is specified in its MARGIN argument with 1 for row and 2 for columns.


#Some random dataframe
ex.df <- data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100))

#Apply rowwise: The first element of each row plus the second, multiplied by the third

aa <- apply(ex.df,1, function(x) (x[1]+x[2])*x[3])

#Apply columnwise. Extract the mean of each column

bb <- apply(ex.df,2, mean)

Tip: You may have noticed that you can write apply-like functions with a function(…) argument or without it. Usually you use “function(…) + a function” when the attributes of the object you are passing to the function have to do different things (like in the rowwise apply example). However, if it is not the case, you can just pass the name of the function, like in the apply columnwise example. Other functions you could use similarly are median, max, min, quantile, among others.

Parallelisation

If the use of apply functions is clear, then parallelisation is just one small step beyond with snow. The functions equivalents are


Base R snow
lapply parLapply
sapply parSapply
vapply -
apply(rowwise) parRapply, parApply(,1)
apply(columnwise) parCapply, parApply(,2)

The only thing you really have to keep in mind is that when parallelising you have to explicitly export/declare everything you need to perform the parallel operation to each thread. This is done mainly with the clusterExport() and clusterEvalQ() functions. This is very important to keep in mind because you might be able to run something on a one-node traditional R code and then get errors with the same execution in parallel due to the fact that these things are missing.

From apply() rowwise to parRapply()

Below you will find a small example, very similar to the one done above with apply rowwise, illustrating the above mentioned small changes/additions needed in order to run your code parallely


#Return if the result of (x[1]+x[2])*x[3] is greater than 20 or not

# The same df as before

ex.df <- data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100))

# Define the threshold

ths <- 20

# These 2 statements in Base R are equivalent

aa <- apply(ex.df,1, function(x) (x[1]+x[2])*x[3] > 20)
aa <- apply(ex.df,1, function(x) (x[1]+x[2])*x[3] > ths)


### Equivalent parallel execution ###

# Declare the cluster object. Here we use the default settings (SOCK) 
# and the number of nodes is specified by the number given

clus <- makeCluster(3)

# The equivalent for the first alternative would be very easy

aa <- parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > 20)

#However, if the variable "ths" needs to be used, a line has to be added

clusterExport(clus,"ths")
aa <- parRapply(clus,ex.df, function(x) (x[1]+x[2])*x[3] > ths)

The clusterExport() function exports an object to each node, enabling them to work parallely. The use of it, as it can be appreciated, is extremely simple: you need to pass the variable name/s in a character vector (or a single string, as in this case)

To conclude, imagine you would need to apply a custom function to each row of your data frame. Following the same example, in Base R it would be


#Declare the function

custom.function <- function(a,b,c){
result <- (a+b)*c
return(result)}


ex.df <- data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100))

#Apply the declared function

aa <- apply(ex.df,1, function(x) custom.function(x[1],x[2],x[3]))


To perform the same action parallely with snow, you can declare the function inside a clusetEvalQ() statement or declare it in the base-workspace and then export it. If you use clusterEvalQ() you will not see the function in your workspace


#Create cluster

clus <- makeCluster(3)

#Option 1. Declare the function for each node

clusterEvalQ(clus, custom.function <- function(a,b,c){
result <- (a+b)*c
return(result)})

#Option 2. Export it form base workspace

custom.function <- function(a,b,c){
result <- (a+b)*c
return(result)}

clusterExport(clus,"custom.function")

ex.df <- data.frame(a=seq(1,100,1),b=seq(10,1000,10),c=runif(100))

#Apply the declared function

aa <- parRapply(clus,ex.df, function(x) custom.function(x[1],x[2],x[3]))


Performance differences

Of course, the main goal of parallelisation is to reduce execution times. However, it does not make much sense to work over a small set (a small list, data frame, etc) as parallelisation requires time for distributing the task among the nodes and collecting the results from them. This opertaion consumes some time of course, which is not significant when working with large volume of data but in cases of small datasets, the overall execution time required to fulfill a task parallely might be greater than doing it in just one core.

Although the execution time reduces considerably, it should not be assumed that it will decrease proportionally to the nodes introduced.,i.e., if the execution time in one node is 30 seconds it will not decrease to 10 seconds with 3 nodes. Probably, it will be higher.

These are the execution times for 1 and 3 nodes for the example above with a data frame of 1M and 3M rows.


# of Rows 1 Node 3 Nodes
1M 18.81 secs 10.90 secs
3M 65.67 secs 40.26 secs

Feel free to write any questions,suggestions, comments, etc.! I hope you liked it and find it useful


To leave a comment for the author, please follow the link and comment on their blog: My Data Atelier » R.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)