**Oracle R Enterprise**, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)

Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

*This guest post from Anand Srinivasan compares performance of the Oracle R Connector for Hadoop with the R {parallel} package for covariance matrix computation, sampling, and parallel linear model fitting. *

Oracle R Connector for Hadoop (ORCH) is a collection of R

packages that enables Big Data analytics from the R environment. It enables a Data

Scientist /Analyst to work on data straddling multiple data platforms (HDFS,

Hive, Oracle Database, local files) from the comfort of the R environment and

benefit from the R ecosystem.

ORCH provides:

1) Out of the box predictive analytic techniques for linear regression, neural networks for

prediction, matrix completion using low rank matrix factorization, non-negative

matrix factorization, kmeans clustering, principal components analysis and

multivariate analysis. While all these techniques have R interfaces, they are

implemented either in Java or in R as distributed parallel implementations

leveraging all nodes of your Hadoop cluster

2) A general

framework, where a user can use the R language to write custom logic executable

in a distributed parallel manner using available compute and storage resources.

The main idea

behind the ORCH architecture and its approach to Big Data analytics is to

leverage the Hadoop infrastructure and thereby inherit all its advantages.

The crux of ORCH is read parallelization and robust methods

over parallelized data. Efficient parallelization of reads is the single most

important step necessary for Big Data Analytics because it is either expensive

or impractical to load all available data in a single thread.

ORCH is often compared/contrasted with the other options

available in R, in particular the popular open source R package called “parallel”. The parallel package provides a low-level

infrastructure for “coarse-grained” distributed and parallel computation. While

it is fairly general, it tends to encourage an approach that is based on using

the aggregate RAM in the cluster as opposed to using the file system.

Specifically, it lacks a data management component, a task management component

and an administrative interface for monitoring. Programming, however, follows

the broad Map Reduce paradigm.

In the rest of this article, we assume that the reader has

basic familiarity with the parallel package and proceed to compare ORCH and its approach with

the parallel package.

The goal of this comparison is to explain what it takes for a user to build a

solution for their requirement using each of these technologies and also to

understand the performance characteristics of these solutions.

We do this comparison using three concrete use cases –

covariance matrix computation, sampling and partitioned linear model fitting.

The exercise is designed to be repeatable, so you, the reader, can try this “at

home”. We will demonstrate that ORCH is functionally and performance-wise

superior to the available alternative of using R’s parallel package.

A six node Oracle Big Data

Appliance v2.1.1 cluster is used in the experiments. Each node in this test

environment has 48GB RAM and 24 CPU cores.

## Covariance Matrix Computation

Computing covariance matrices is one of the most fundamental

of statistical techniques.

In this use case, we have a

single input file, “allnumeric_200col_10GB” (see appendix on how to generate

this data set), that is about 10GB in size and has a data matrix with about 3

million rows and 200 columns. The requirement is to compute the covariance

matrix of this input matrix.

Since a single node in the test environment has 48GB RAM and

the input file is only 10GB, we start with the approach of loading the entire

file into memory and then computing the covariance matrix using R’s cov function.

*>
system.time(m <- matrix(scan(file="/tmp/allnumeric_200col_10GB",what=0.0,
sep=","), ncol=200, byrow=TRUE))*

*Read 611200000 items*

* user system elapsed *

*683.159 17.023 712.527 *

*> system.time(res
<- cov(m))*

* user system elapsed *

*561.627 0.009 563.044*

We observe that the loading of data takes 712 seconds (vs.

563 seconds for the actual covariane computation) and dominates the cost. It

would be even more pronounced (relative to the total elapsed time) if the cov(m) computation were parallelized using mclapply from the parallel package.

Based on this, we see that for an efficient parallel

solution, the main requirement is to parallelize the data loading. This

requires that the single input file be split into multiple smaller-sized files.

**The ****parallel
package does not offer any data management facilities**; hence this step has to be performed

manually using a Linux command like split. Since there are 24 CPU cores, we split the input file into

24 smaller files.

time(split -l 127334 /tmp/allnumeric_200col_10GB)

real **0m54.343s**

user 0m3.598s

sys 0m24.233s

Now, we can run

the R script:

library(parallel)

# Read the data

readInput <-

function(id) {

infile <-

file.path("/home/oracle/anasrini/cov",paste("p",id,sep=""))

print(infile)

m <- matrix(scan(file=infile, what=0.0,

sep=","), ncol=200, byrow=TRUE)

m

}

# Main MAPPER function

compCov <-

function(id) {

m <- readInput(id) # read the input

cs <- colSums(m) # compute col sums, num rows

# compute main cov portion

nr <- nrow(m)

mtm <- crossprod(m)

list(mat=mtm, colsum=cs, nrow=nr)

}

numfiles <- 24

numCores <- 24

# Map step

system.time(mapres

<- mclapply(seq_len(numfiles), compCov, mc.cores=numCores))

# Reduce step

system.time(xy <-

Reduce("+", lapply(mapres, function(x) x$mat)))

system.time(csf <-

Reduce("+", lapply(mapres, function(x) x$colsum)))

system.time(nrf <-

Reduce("+", lapply(mapres, function(x) x$nrow)))

sts <- csf %*%

t(csf)

m1 <- xy / (nrf

-1)

m2 <- sts / (nrf *

(nrf-1))

m3 <- 2 * sts /

(nrf * (nrf-1))

covmat <- m1 + m2

– m3

user system elapsed

1661.196 21.209 **77.781**

We observe that the elapsed time (excluding time to split the

files) has now come down to 77 seconds. However, it took 54 seconds for

splitting the input file into smaller files, making it a significant portion of

the **total elapsed time** of 77+54 = **131 seconds.**

Besides impacting performance, there are a number of more

serious problems with having to deal with data management manually. We list a

few of them here:

1) In other

scenarios, with larger files or larger number of chunks, placement of chunks

also becomes a factor that influences I/O parallelism. Optimal placement

of chunks of data over the available set of disks is a non-trivial problem

2) Requirement of root

access – Optimal placement of file chunks on different disks often requires

root access. For example, only root has permissions to create files on disks

corresponding to the File Systems mounted on /u03, /u04 etc on an Oracle Big

Data Appliance node

3) When multiple nodes are

involved in the computation, moving fragments of the original data into

different nodes manually can drain productivity

4) This form of

split can only work in a static environment – in a real-world dynamic

environment, information about other workloads and their resource utilization

cannot be factored in a practical manner by a human

5) Requires

admin to provide user access to all nodes of the cluster in order to

allow the user to move data to different nodes

### ORCH-based solution

On the other hand, using ORCH, we can directly use the out of

the box support for multivariate analysis. Further, no manual steps related to

data management (like splitting files and addressing chunk placement issues)

are required since Hadoop (specifically HDFS) handles all those requirements

seamlessly.

>x <-

hdfs.attach("allnumeric_200col_10GB")

> system.time(res

<- orch.cov(x))

user system elapsed

18.179 3.991 **85.640**

Forty-two concurrent

map tasks were involved in the computation above as determined by

Hadoop.

To conclude, we

can see the following advantages of the ORCH based approach in this scenario :

1) No

manual steps. Data Management completely handled transparently by HDFS

2) Out

of the box support for cov. The distributed parallel algorithm is available out of the

box and the user does not have to work it out from scratch

3) Using

ORCH we get comparable performance to that obtained through manual coding

without any of the manual overheads

## Sampling

We use the same single input file,

“allnumeric_200col_10GB” in this case as well. The requirement is to obtain a

uniform random sample from the input data set. The size of the sample required

is specified as a percentage of the input data set size.

Once again for the solution using

the parallel package, the input

file has to be split into smaller sized files for better read parallelism.

library(parallel)

# Read the data

readInput <-

function(id) {

infile <-

file.path("/home/oracle/anasrini/cov", paste("p",id,sep=""))

print(infile)

system.time(m <- matrix(scan(file=infile,

what=0.0, sep=","),

ncol=200, byrow=TRUE))

m

}

# Main MAPPER

function

samplemap <-

function(id, percent) {

m <- readInput(id) # read the input

v <- runif(nrow(m)) # Generate runif

# Pick only those rows where random <

percent*0.01

keep <- which(v < percent*0.01)

m1 <- m[keep,,drop=FALSE]

m1

}

numfiles <- 24

numCores <- 24

# Map step

percent <- 0.001

system.time(mapres

<- mclapply(seq_len(numfiles), samplemap, percent,

mc.cores=numCores))

user system elapsed

1112.998 23.196 **49.561**

### ORCH based solution

__ __

>x <-

hdfs.attach("allnumeric_200col_10GB_single")

>system.time(res <- orch.sample(x,

percent=0.001))

user system elapsed

8.173 0.704 **33.590**

**The ORCH based solution**

out-performs the solution based on the parallel

package. This is because orch.sample

is implemented in Java and the read rates obtained by a Java implementation are

superior to what can be achieved in R.

## Partitioned Linear Model Fitting

Partitioned Linear Model Fitting is a very popular use case. The

requirement here is to fit separate linear models, one for each partition of

the data. The data itself is partitioned based on a user-specified partitioning

key.

For example, using the ONTIME

data set, the user could specify destination city as the partitioning key

indicating the requirement for separate linear models (with, for example,

ArrDelay as target), 1 per destination city.

### ORCH based solution

dfs_res <- hadoop.run(

data = input,

mapper = function(k, v) {

orch.keyvals(v$Dest, v) },

reducer =

function(k, v) {

lm_x

<- lm(ArrDelay ~ DepDelay + Distance, v)

orch.keyval(k,

orch.pack(model=lm_x, count = nrow(v)))

},

config =

new("mapred.config",

job.name = "ORCH Partitioned lm by Destination City",

map.output = mapOut,

mapred.pristine = TRUE,

reduce.output =

data.frame(key="", model="packed"),

)

)

Notice that the Map Reduce

framework is performing the partitioning. The mapper just picks out the

partitioning key and the Map Reduce framework handles the rest. The linear

model for each partition is then fitted in the reducer.

### __parallel__

based solution

based solution

As in the previous use cases, for

good read parallelism, the single input file needs to be split into smaller

files. However, unlike the previous use cases, there is a twist here.

We noted that with the ORCH based

solution it is the Map Reduce framework that does the actual partitioning.

There is no such out of the box feature available with a parallel package-based solution. There

are two options:

1) Break up the file arbitrarily into smaller pieces for

better read parallelism. Implement your own partitioning logic mimicking what

the Map Reduce framework provides. Then fit linear models on each of these

partitions in parallel.

OR

2) Break the file into smaller pieces such that each piece

is a separate partition. Fit linear models on each of these partitions in

parallel

Both of these options are not easy and require a lot of user

effort. The custom coding required for achieving parallel reads is significant.

### Conclusion

ORCH provides a holistic approach

to Big Data Analytics in the R environment. By leveraging the Hadoop

infrastructure, ORCH inherits several key components that are all required to

address real world analytics requirements.

The rich set of out-of-the-box predictive analytic techniques along with the

possibility of authoring custom parallel distributed analytics using the

framework (as demonstrated in the partitioned linear model fitting case) helps

simplify the user’s task while meeting the performance and scalability requirements.

### Appendix – Data Generation

We show the steps required to

generate the single input file “allnumeric_200col_10GB”.

Run the following in R:

x <-

orch.datagen(datasize=10*1024*1024*1024, numeric.col.count=200,

map.degree=40)

hdfs.mv(x, "allnumeric_200col_10GB")

Then, from the Linux shell:

hdfs dfs –rm –r –skipTrash

/user/oracle/allnumeric_200col_10GB/__ORCHMETA__

hdfs dfs

–getmerge /user/oracle/allnumeric_200col_10GB

/tmp/allnumeric_200col_10GB

**leave a comment**for the author, please follow the link and comment on their blog:

**Oracle R Enterprise**.

R-bloggers.com offers

**daily e-mail updates**about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.

Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.