Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

In the R parallel package, there are two implementations of parallelism, e.g. fork and socket, with pros and cons.

For the fork, each parallel thread is a complete duplication of the master process with the shared environment, including objects or variables defined prior to the kickoff of parallel threads. Therefore, it runs fast. However, the major limitation is that the fork doesn’t work on the Windows system.

On the other hand, the socket works on all operating systems. Each thread runs separately without sharing objects or variables, which can only be passed from the master process explicitly. As a result, it runs slower due to the communication overhead.

Below is an example showing the performance difference between the fork and the socket. A self-defined filter function runs in parallel and exacts three rows out of 336,776 that are meeting criteria. As shown, the fork runs 40% faster than the socket.

df <- read.csv("data/nycflights")

ex <- expression(carrier == "UA" & origin == "EWR" & day == 1 & is.na(arr_time))
# SELECT 3 ROWS OUT OF 336,776
#        year month day dep_time dep_delay arr_time arr_delay carrier tailnum flight origin dest air_time ...
# 56866  2013    11   1       NA        NA       NA        NA      UA            252    EWR  IAH       NA ...
# 84148  2013    12   1       NA        NA       NA        NA      UA            643    EWR  ORD       NA ...
# 251405 2013     7   1       NA        NA       NA        NA      UA            394    EWR  ORD       NA ...

parFilter <- function(df, ex, type) {
cn <- parallel::detectCores() - 1
cl <- parallel::makeCluster(cn, type = type)
### DIVIDE THE DATAFRAME BASED ON # OF CORES
sp <- parallel::parLapply(cl, parallel::clusterSplit(cl, seq(nrow(df))),
function(c_) df[c_,])
### PASS THE OBJECT FROM MASTER PROCESS TO EACH NODE
parallel::clusterExport(cl, "ex")
### EXTRACT ROW INDEX ON EACH NODE
id <- Reduce(c, parallel::parLapply(cl, sp,
function(s_) with(s_, eval(ex))))
parallel::stopCluster(cl)
return(df[which(id),])
}

rbenchmark::benchmark(replications = 10, order = "elapsed", relative = "elapsed",
columns = c("test", "replications", "elapsed", "relative"),
"  FORK" = parFilter(df, ex, "FORK"),
"SOCKET" = parFilter(df, ex, "PSOCK")
)
#     test replications elapsed relative
# 1   FORK           10  59.396    1.000
# 2 SOCKET           10  83.856    1.412