Big Data Analytics in R – the tORCH has been lit!

[This article was first published on Oracle R Enterprise, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

This guest post from Anand Srinivasan compares performance of the Oracle R Connector for Hadoop with the R {parallel} package for covariance matrix computation, sampling, and parallel linear model fitting. 

Oracle R Connector for Hadoop (ORCH) is a collection of R
packages that enables Big Data analytics from the R environment. It enables a
Scientist /Analyst to work on data straddling multiple data platforms (HDFS,
Hive, Oracle Database, local files) from the comfort of the R environment and
benefit from the R ecosystem.

ORCH provides:

1) Out of the box predictive analytic techniques for linear regression, neural networks for
prediction, matrix completion using low rank matrix factorization, non-negative
matrix factorization, kmeans clustering, principal components analysis and
multivariate analysis. While all these techniques have R interfaces, they are
implemented either in Java or in R as distributed parallel implementations
leveraging all nodes of your Hadoop cluster

2) A general
framework, where a user can use the R language to write c
ustom logic executable
in a distributed parallel manner using available compute and storage resources.

The main idea
behind the ORCH architecture and its approach to Big Data analytics is to
leverage the Hadoop infrastructure and thereby inherit all its advantages.

The crux of ORCH is read parallelization and robust methods
over parallelized data. Efficient parallelization of reads is the single most
important step necessary for Big Data Analytics because it is either expensive
or impractical to load all available data in a single thread.

ORCH is often compared/contrasted with the other options
available in R, in particular the popular open source R package called parallel. The
parallel package provides a low-level
infrastructure for “coarse-grained” distributed and parallel computation. While
it is fairly general, it tends to encourage an approach that is based on using
the aggregate RAM in the cluster as opposed to using the file system.
Specifically, it lacks a data management component, a task management component
and an administrative interface for monitoring. Programming, however, follows
the broad Map Reduce paradigm.

 In the rest of this article, we assume that the reader has
basic familiarity with the
parallel package and proceed to compare ORCH and its approach with
parallel package.
The goal of this comparison is to explain what it takes for a user to build a
solution for their requirement using each of these technologies and also to
understand the performance characteristics of these solutions.

We do this comparison using three concrete use cases –
covariance matrix computation, sampling and partitioned linear model fitting.
The exercise is designed to be repeatable, so you, the reader, can try this “at
home”. We will demonstrate that ORCH is functionally and performance-wise
superior to the available alternative of using R’s parallel package.

A six node Oracle Big Data
Appliance v2.1.1 cluster is used in the experiments. Each node in this test
environment has 48GB RAM and 24 CPU cores.

Covariance Matrix Computation

Computing covariance matrices is one of the most fundamental
of statistical techniques.

In this use case, we have a
single input file, “allnumeric_200col_10GB” (see appendix on how to generate
this data set), that is about 10GB in size and has a data matrix with about 3
million rows and 200 columns. The requirement is to compute the covariance
matrix of this input matrix.

Since a single node in the test environment has 48GB RAM and
the input file is only 10GB, we start with the approach of loading the entire
file into memory and then computing the covariance matrix using R’s
cov function.

system.time(m <- matrix(scan(file="/tmp/allnumeric_200col_10GB",
sep=”,”), ncol=200, byrow=TRUE))

Read 611200000 items

user system elapsed

683.159 17.023 712.527

> system.time(res
<- cov(m))

user system elapsed

561.627 0.009 563.044

We observe that the loading of data takes 712 seconds (vs.
563 seconds for the actual covariane computation) and dominates the cost. It
would be even more pronounced (relative to the total elapsed time) if the
cov(m) computation were parallelized using mclapply from the parallel package.

Based on this, we see that for an efficient parallel
solution, the main requirement is to parallelize the data loading. This
requires that the single input file be split into multiple smaller-sized files.
package does not offer any data management facilities
; hence this step has to be performed
manually using a Linux command like
split. Since there are 24 CPU cores, we split the input file into
24 smaller files.

time(split -l 127334 /tmp/allnumeric_200col_10GB)

real 0m54.343s

user 0m3.598s

sys 0m24.233s

Now, we can run
the R script:


# Read the data

readInput <- function(id) {

infile <- file.path("/home/oracle/anasrini/cov",paste("p",id,sep=""))


m <- matrix(scan(file=infile, what=0.0, sep=","), ncol=200, byrow=TRUE)



# Main MAPPER function

compCov <- function(id) {

m <- readInput(id)  # read the input

cs <- colSums(m)    # compute col sums, num rows

# compute main cov portion

nr <- nrow(m)      

mtm <- crossprod(m)

list(mat=mtm, colsum=cs, nrow=nr)


numfiles <- 24

numCores <- 24

# Map step

<- mclapply(seq_len(numfiles), compCov, mc.cores=numCores))

# Reduce step

system.time(xy <- Reduce("+", lapply(mapres, function(x) x$mat)))

system.time(csf <- Reduce("+", lapply(mapres, function(x) x$colsum)))

system.time(nrf <- Reduce("+", lapply(mapres, function(x) x$nrow)))

sts <- csf %*% t(csf)

m1 <- xy / (nrf -1)

m2 <- sts / (nrf * (nrf-1))

m3 <- 2 * sts / (nrf * (nrf-1))

covmat <- m1 + m2 - m3

user system elapsed

1661.196 21.209 77.781

We observe that the elapsed time (excluding time to split the
files) has now come down to 77 seconds. However, it took 54 seconds for
splitting the input file into smaller files, making it a significant portion of
the total elapsed time of 77+54 = 131 seconds.

Besides impacting performance, there are a number of more
serious problems with having to deal with data management manually. We list a
few of them here:

1) In other
scenarios, with larger files or larger number of chunks, placement of chunks
also becomes a factor that influences I/O parallelism.
Optimal placement
of chunks of data over the available set of disks is a non-trivial problem

2) Requirement of root
access – Optimal placement of file chunks on different disks often requires
root access. For example, only root has permissions to create files on disks
corresponding to the File Systems mounted on /u03, /u04 etc on an Oracle Big
Data Appliance node

3) When multiple nodes are
involved in the computation, moving
fragments of the original data into
different nodes manually can drain productivity

4) This form of
split can only work in a static environment – in a real-world dynamic
environment, information about other workloads and their resource utilization
cannot be factored in a practical manner by a human

5) Requires
admin to provide user access to all nodes of the cluster
in order to
allow the user to move data to different nodes

ORCH-based solution

On the other hand, using ORCH, we can directly use the out of
the box support for multivariate analysis. Further, no manual steps related to
data management (like splitting files and addressing chunk placement issues)
are required since Hadoop (specifically HDFS) handles all those requirements

>x <- hdfs.attach("allnumeric_200col_10GB")

> system.time(res
<- orch.cov(x))

user system elapsed

18.179 3.991 85.640

Forty-two concurrent
map tasks were involved in the computation above
as determined by

To conclude, we
can see the following advantages of the ORCH based approach in this scenario :

1) No
manual steps. Data Management completely handled transparently by HDFS

2) Out
of the box support for
cov. The distributed parallel algorithm is available out of the
box and the user does not have to work it out from scratch

3) Using
ORCH we get comparable performance to that obtained through manual coding
without any of the manual overheads


We use the same single input file,
“allnumeric_200col_10GB” in this case as well. The requirement is to obtain a
uniform random sample from the input data set. The size of the sample required
is specified as a percentage of the input data set size.

Once again for the solution using
the parallel package, the input
file has to be split into smaller sized files for better read parallelism.


# Read the data

readInput <- function(id) {

infile <- file.path("/home/oracle/anasrini/cov", paste(“p”,id,sep=””))


system.time(m <- matrix(scan(file=infile, what=0.0, sep=","),

ncol=200, byrow=TRUE))




samplemap <- function(id, percent) {

m <- readInput(id)    # read the input

v <- runif(nrow(m))   # Generate runif

# Pick only those rows where random < percent*0.01

keep <- which(v < percent*0.01)

m1 <- m[keep,,drop=FALSE]



numfiles <- 24

numCores <- 24

# Map step

percent <- 0.001

<- mclapply(seq_len(numfiles), samplemap, percent,


user system elapsed

1112.998 23.196 49.561

ORCH based solution

>x <- hdfs.attach("allnumeric_200col_10GB_single")

>system.time(res <- orch.sample(x, percent=0.001))

user system elapsed

8.173 0.704 33.590

The ORCH based solution
out-performs the solution based on the parallel
package. This is because orch.sample
is implemented in Java and the read rates obtained by a Java implementation are
superior to what can be achieved in R.

Partitioned Linear Model Fitting

Partitioned Linear Model Fitting is a very popular use case. The
requirement here is to fit separate linear models, one for each partition of
the data. The data itself is partitioned based on a user-specified partitioning

For example, using the ONTIME
data set, the user could specify destination city as the partitioning key
indicating the requirement for separate linear models (with, for example,
ArrDelay as target), 1 per destination city.

ORCH based solution

dfs_res <-

data = input,

mapper = function(k, v) {
orch.keyvals(v$Dest, v) },

reducer =
function(k, v) {

<- lm(ArrDelay ~ DepDelay + Distance, v)

orch.pack(model=lm_x, count = nrow(v)))


config =
new(“mapred.config”, = “ORCH Partitioned lm by Destination City”,

map.output = mapOut,

mapred.pristine = TRUE,

reduce.output =
data.frame(key=””, model=”packed”),



Notice that the Map Reduce
framework is performing the partitioning. The mapper just picks out the
partitioning key and the Map Reduce framework handles the rest. The linear
model for each partition is then fitted in the reducer.

based solution

As in the previous use cases, for
good read parallelism, the single input file needs to be split into smaller
files. However, unlike the previous use cases, there is a twist here.

We noted that with the ORCH based
solution it is the Map Reduce framework that does the actual partitioning.
There is no such out of the box feature available with a parallel package-based solution. There
are two options:

1) Break up the file arbitrarily into smaller pieces for
better read parallelism. Implement your own partitioning logic mimicking what
the Map Reduce framework provides. Then fit linear models on each of these
partitions in parallel.


2) Break the file into smaller pieces such that each piece
is a separate partition. Fit linear models on each of these partitions in

Both of these options are not easy and require a lot of user
effort. The custom coding required for achieving parallel reads is significant.


ORCH provides a holistic approach
to Big Data Analytics in the R environment. By leveraging the Hadoop
infrastructure, ORCH inherits several key components that are all required to
address real world analytics requirements.

The rich set of out-of-the-box predictive analytic techniques along with the
possibility of authoring custom parallel distributed analytics using the
framework (as demonstrated in the partitioned linear model fitting case) helps
simplify the user’s task while meeting the performance and scalability requirements. 

Appendix – Data Generation

We show the steps required to
generate the single input file “allnumeric_200col_10GB”.

Run the following in R:

x <- orch.datagen(datasize=10*1024*1024*1024, numeric.col.count=200,, “allnumeric_200col_10GB”)

Then, from the Linux shell:

hdfs dfs –rm –r –skipTrash

hdfs dfs

To leave a comment for the author, please follow the link and comment on their blog: Oracle R Enterprise. offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)