**Revolutions**, and kindly contributed to R-bloggers)

*by Edward Ma and Vishrut Gupta (Hewlett Packard Enterprise)*

A few weeks ago, we revealed **ddR** (Distributed Data-structures in R), an exciting new project started by R-Core, Hewlett Packard Enterprise, and others that provides a fresh new set of computational primitives for distributed and parallel computing in R. The package sets the seed for what may become a standardized and easy way to write parallel algorithms in R, regardless of the computational engine of choice.

In designing **ddR**, we wanted to keep things simple and familiar. We expose only a small number of new user functions that are very close in semantics and API to their R counterparts. You can read the introductory material about the package here. In this post, we show how to use **ddR **functions.

**Classes** **dlist, darray, ****and** **dframe**: These classes are the distributed equivalents of list, matrix, and data.frame, respectively. Keeping their APIs similar to those for the vanilla R classes, we implemented operators and functions that work on these functions in the same ways. The example below creates two distributed lists — one of five 3s and one out of the elements 1 through 5.

a <- dmapply(function(x) { x }, rep(3,5))

b <- dlist(1,2,3,4,5,nparts=1L)

The argument *nparts* specifies the number of partitions to split the resulting *dlist* **b **into. For *darrays *and *dframes*, which are two-dimensional, *nparts* also permits a two-element vector, which specifies the two-dimensional partitioning of the output.

**Functions dmapply and dlapply: **Following R’s functional-programming paradigm, we have created these two functions as the distributed equivalents of R’s *mapply* and *lapply*. One can supply any combination of distributed objects and regular R args into *dmapply:*

*addThenSubtract* *<-* *function**(x,y,z) { x **+** y **–** z}** *

* **c **<-** dmapply(addThenSubtract,a,b,**MoreArgs**=list**(**z**=**5**))*

**Functions parts and collect:** The *parts *construct gives users the ability to partition data in a manner that is very explicit. *parts *is often used in conjunction with *dmapply *to achieve partition-level parallelism. To fetch data, the *collect* keyword is used. So, if we wanted to check our result in **c **from our previous example, we may do:

collect(c)

## [[1]]

## [1] 2

##

## [[2]]

## [1] 3

##

## [[3]]

## [1] 4

##

## [[4]]

## [1] 5

##

## [[5]]

## [1] 6

Backends can easily provide custom implementations of *dlist, darray, *and *dframe, *as well as for *dmapply*. At a minimum, backends define only a couple of new custom classes (extending ddR’s classes), as well as the definitions for a couple of generic functions, including *dmapply*.

With these definitions in place, ddR knows how to properly dispatch work to backends where behaviors differ, whilst taking care of the rest of the work — since most of these other operations can be defined using just *dmapply*. For example, *colSums* should automatically work on any *darray* created by a backend that has defined *dmapply*!

**Putting it to Work: RandomForest written in ddR**

In addition to adding new backend drivers for **ddR** (e.g., for Spark), part of this initiative is to develop an initial suite of algorithms written in ddR, such that they are portable to all ddR backends. **RandomForest.ddR** is one such algorithm that we have completed, now available on CRAN. ddR packages for K-Means and GLM (generalized linear models) are now also available.

Random Forest is an algorithm that can be parallelized in a very simple way by asking each worker to create a subset of the trees:

simple_RF <-function(formula, data, ntree = 500, …, nparts = 2)

{

execute_randomForest_parallel <- function(ntree, formula, data, inputArgs)

{

inputArgs$formula <- formula

inputArgs$data <- data

inputArgs$ntree <- ntree

suppressMessages(requireNamespace("randomForest"))

model <- do.call(randomForest::randomForest,inputArgs)

}

dmodel <- dmapply(execute_randomForest_parallel,

ntree = rep(ceiling(500/nparts),nparts),

MoreArgs = list(formula=formula,

data=data,inputArgs=list(…)),

output.type = "dlist", nparts = nparts)

model <- do.call(randomForest::combine, collect(dmodel))

}

model <- simple_RF(Species ~ ., iris)

The main **dmapply** in the above code snippet simply broadcasts all the objects passed to the function to the workers and calls *randomForest* with the same parameters. An important point here is that even if ‘data’ is a distributed object, it will still be broadcast because it is listed in ** MoreArgs, **which accepts a key-value list of either distributed objects or normal R objects.

Here is a sample performance plot of running randomforest:

We tested the **randomForest.ddR** package on a medium sized dataset to measure speedup when increasing the number of cores. From the graph, it is clear that up until 4 cores, there is great improvement and only then does it start to reach the point of diminishing returns. Since most computers these days have several cores, the **randomForest.ddR** package should be helpful for most people. On a single node you can use ** parallel** which stops at 24 cores which corresponds to all the cores of the test machine. You can use

**to continue to scale beyond 24 cores, as it can utilize multiple machines.**

*DistributedR*To read up a bit more on ddR and its semantics, visit our GitHub page here or read the user guide on CRAN.

**leave a comment**for the author, please follow the link and comment on their blog:

**Revolutions**.

R-bloggers.com offers

**daily e-mail updates**about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...