Big Data Analytics in R – the tORCH has been lit!
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
This guest post from Anand Srinivasan compares performance of the Oracle R Connector for Hadoop with the R {parallel} package for covariance matrix computation, sampling, and parallel linear model fitting.
Oracle R Connector for Hadoop (ORCH) is a collection of R
packages that enables Big Data analytics from the R environment. It enables a Data
Scientist /Analyst to work on data straddling multiple data platforms (HDFS,
Hive, Oracle Database, local files) from the comfort of the R environment and
benefit from the R ecosystem.
ORCH provides:
1) Out of the box predictive analytic techniques for linear regression, neural networks for
prediction, matrix completion using low rank matrix factorization, non-negative
matrix factorization, kmeans clustering, principal components analysis and
multivariate analysis. While all these techniques have R interfaces, they are
implemented either in Java or in R as distributed parallel implementations
leveraging all nodes of your Hadoop cluster
2) A general
framework, where a user can use the R language to write custom logic executable
in a distributed parallel manner using available compute and storage resources.
The main idea
behind the ORCH architecture and its approach to Big Data analytics is to
leverage the Hadoop infrastructure and thereby inherit all its advantages.
The crux of ORCH is read parallelization and robust methods
over parallelized data. Efficient parallelization of reads is the single most
important step necessary for Big Data Analytics because it is either expensive
or impractical to load all available data in a single thread.
ORCH is often compared/contrasted with the other options
available in R, in particular the popular open source R package called “parallel”. The parallel package provides a low-level
infrastructure for “coarse-grained” distributed and parallel computation. While
it is fairly general, it tends to encourage an approach that is based on using
the aggregate RAM in the cluster as opposed to using the file system.
Specifically, it lacks a data management component, a task management component
and an administrative interface for monitoring. Programming, however, follows
the broad Map Reduce paradigm.
In the rest of this article, we assume that the reader has
basic familiarity with the parallel package and proceed to compare ORCH and its approach with
the parallel package.
The goal of this comparison is to explain what it takes for a user to build a
solution for their requirement using each of these technologies and also to
understand the performance characteristics of these solutions.
We do this comparison using three concrete use cases –
covariance matrix computation, sampling and partitioned linear model fitting.
The exercise is designed to be repeatable, so you, the reader, can try this “at
home”. We will demonstrate that ORCH is functionally and performance-wise
superior to the available alternative of using R’s parallel package.
A six node Oracle Big Data
Appliance v2.1.1 cluster is used in the experiments. Each node in this test
environment has 48GB RAM and 24 CPU cores.
Covariance Matrix Computation
Computing covariance matrices is one of the most fundamental
of statistical techniques.
In this use case, we have a
single input file, “allnumeric_200col_10GB” (see appendix on how to generate
this data set), that is about 10GB in size and has a data matrix with about 3
million rows and 200 columns. The requirement is to compute the covariance
matrix of this input matrix.
Since a single node in the test environment has 48GB RAM and
the input file is only 10GB, we start with the approach of loading the entire
file into memory and then computing the covariance matrix using R’s cov function.
>
system.time(m <- matrix(scan(file="/tmp/allnumeric_200col_10GB",what=0.0,
sep=”,”), ncol=200, byrow=TRUE))
Read 611200000 items
user system elapsed
683.159 17.023 712.527
> system.time(res
<- cov(m))
user system elapsed
561.627 0.009 563.044
We observe that the loading of data takes 712 seconds (vs.
563 seconds for the actual covariane computation) and dominates the cost. It
would be even more pronounced (relative to the total elapsed time) if the cov(m) computation were parallelized using mclapply from the parallel package.
Based on this, we see that for an efficient parallel
solution, the main requirement is to parallelize the data loading. This
requires that the single input file be split into multiple smaller-sized files.
The parallel
package does not offer any data management facilities; hence this step has to be performed
manually using a Linux command like split. Since there are 24 CPU cores, we split the input file into
24 smaller files.
time(split -l 127334 /tmp/allnumeric_200col_10GB)
real 0m54.343s
user 0m3.598s
sys 0m24.233s
Now, we can run
the R script:
library(parallel)
# Read the data
readInput <- function(id) {
infile <- file.path("/home/oracle/anasrini/cov",paste("p",id,sep=""))
print(infile)
m <- matrix(scan(file=infile, what=0.0, sep=","), ncol=200, byrow=TRUE)
m
}
# Main MAPPER function
compCov <- function(id) {
m <- readInput(id) # read the input
cs <- colSums(m) # compute col sums, num rows
# compute main cov portion
nr <- nrow(m)
mtm <- crossprod(m)
list(mat=mtm, colsum=cs, nrow=nr)
}
numfiles <- 24
numCores <- 24
# Map step
system.time(mapres
<- mclapply(seq_len(numfiles), compCov, mc.cores=numCores))
# Reduce step
system.time(xy <- Reduce("+", lapply(mapres, function(x) x$mat)))
system.time(csf <- Reduce("+", lapply(mapres, function(x) x$colsum)))
system.time(nrf <- Reduce("+", lapply(mapres, function(x) x$nrow)))
sts <- csf %*% t(csf)
m1 <- xy / (nrf -1)
m2 <- sts / (nrf * (nrf-1))
m3 <- 2 * sts / (nrf * (nrf-1))
covmat <- m1 + m2 - m3
user system elapsed
1661.196 21.209 77.781
We observe that the elapsed time (excluding time to split the
files) has now come down to 77 seconds. However, it took 54 seconds for
splitting the input file into smaller files, making it a significant portion of
the total elapsed time of 77+54 = 131 seconds.
Besides impacting performance, there are a number of more
serious problems with having to deal with data management manually. We list a
few of them here:
1) In other
scenarios, with larger files or larger number of chunks, placement of chunks
also becomes a factor that influences I/O parallelism. Optimal placement
of chunks of data over the available set of disks is a non-trivial problem
2) Requirement of root
access – Optimal placement of file chunks on different disks often requires
root access. For example, only root has permissions to create files on disks
corresponding to the File Systems mounted on /u03, /u04 etc on an Oracle Big
Data Appliance node
3) When multiple nodes are
involved in the computation, moving fragments of the original data into
different nodes manually can drain productivity
4) This form of
split can only work in a static environment – in a real-world dynamic
environment, information about other workloads and their resource utilization
cannot be factored in a practical manner by a human
5) Requires
admin to provide user access to all nodes of the cluster in order to
allow the user to move data to different nodes
ORCH-based solution
On the other hand, using ORCH, we can directly use the out of
the box support for multivariate analysis. Further, no manual steps related to
data management (like splitting files and addressing chunk placement issues)
are required since Hadoop (specifically HDFS) handles all those requirements
seamlessly.
>x <- hdfs.attach("allnumeric_200col_10GB")
> system.time(res
<- orch.cov(x))
user system elapsed
18.179 3.991 85.640
Forty-two concurrent
map tasks were involved in the computation above as determined by
Hadoop.
To conclude, we
can see the following advantages of the ORCH based approach in this scenario :
1) No
manual steps. Data Management completely handled transparently by HDFS
2) Out
of the box support for cov. The distributed parallel algorithm is available out of the
box and the user does not have to work it out from scratch
3) Using
ORCH we get comparable performance to that obtained through manual coding
without any of the manual overheads
Sampling
We use the same single input file,
“allnumeric_200col_10GB” in this case as well. The requirement is to obtain a
uniform random sample from the input data set. The size of the sample required
is specified as a percentage of the input data set size.
Once again for the solution using
the parallel package, the input
file has to be split into smaller sized files for better read parallelism.
library(parallel)
# Read the data
readInput <- function(id) {
infile <- file.path("/home/oracle/anasrini/cov", paste(“p”,id,sep=””))
print(infile)
system.time(m <- matrix(scan(file=infile, what=0.0, sep=","),
ncol=200, byrow=TRUE))
m
}
# Main MAPPER
function
samplemap <- function(id, percent) {
m <- readInput(id) # read the input
v <- runif(nrow(m)) # Generate runif
# Pick only those rows where random < percent*0.01
keep <- which(v < percent*0.01)
m1 <- m[keep,,drop=FALSE]
m1
}
numfiles <- 24
numCores <- 24
# Map step
percent <- 0.001
system.time(mapres
<- mclapply(seq_len(numfiles), samplemap, percent,
mc.cores=numCores))
user system elapsed
1112.998 23.196 49.561
ORCH based solution
>x <- hdfs.attach("allnumeric_200col_10GB_single")
>system.time(res <- orch.sample(x, percent=0.001))
user system elapsed
8.173 0.704 33.590
The ORCH based solution
out-performs the solution based on the parallel
package. This is because orch.sample
is implemented in Java and the read rates obtained by a Java implementation are
superior to what can be achieved in R.
Partitioned Linear Model Fitting
Partitioned Linear Model Fitting is a very popular use case. The
requirement here is to fit separate linear models, one for each partition of
the data. The data itself is partitioned based on a user-specified partitioning
key.
For example, using the ONTIME
data set, the user could specify destination city as the partitioning key
indicating the requirement for separate linear models (with, for example,
ArrDelay as target), 1 per destination city.
ORCH based solution
dfs_res <- hadoop.run(
data = input,
mapper = function(k, v) {
orch.keyvals(v$Dest, v) },
reducer =
function(k, v) {
lm_x
<- lm(ArrDelay ~ DepDelay + Distance, v)
orch.keyval(k,
orch.pack(model=lm_x, count = nrow(v)))
},
config =
new(“mapred.config”,
job.name = “ORCH Partitioned lm by Destination City”,
map.output = mapOut,
mapred.pristine = TRUE,
reduce.output =
data.frame(key=””, model=”packed”),
)
)
Notice that the Map Reduce
framework is performing the partitioning. The mapper just picks out the
partitioning key and the Map Reduce framework handles the rest. The linear
model for each partition is then fitted in the reducer.
parallel
based solution
As in the previous use cases, for
good read parallelism, the single input file needs to be split into smaller
files. However, unlike the previous use cases, there is a twist here.
We noted that with the ORCH based
solution it is the Map Reduce framework that does the actual partitioning.
There is no such out of the box feature available with a parallel package-based solution. There
are two options:
1) Break up the file arbitrarily into smaller pieces for
better read parallelism. Implement your own partitioning logic mimicking what
the Map Reduce framework provides. Then fit linear models on each of these
partitions in parallel.
OR
2) Break the file into smaller pieces such that each piece
is a separate partition. Fit linear models on each of these partitions in
parallel
Both of these options are not easy and require a lot of user
effort. The custom coding required for achieving parallel reads is significant.
Conclusion
ORCH provides a holistic approach
to Big Data Analytics in the R environment. By leveraging the Hadoop
infrastructure, ORCH inherits several key components that are all required to
address real world analytics requirements.
The rich set of out-of-the-box predictive analytic techniques along with the
possibility of authoring custom parallel distributed analytics using the
framework (as demonstrated in the partitioned linear model fitting case) helps
simplify the user’s task while meeting the performance and scalability requirements.
Appendix – Data Generation
We show the steps required to
generate the single input file “allnumeric_200col_10GB”.
Run the following in R:
x <- orch.datagen(datasize=10*1024*1024*1024, numeric.col.count=200,
map.degree=40)
hdfs.mv(x, “allnumeric_200col_10GB”)
Then, from the Linux shell:
hdfs dfs –rm –r –skipTrash
/user/oracle/allnumeric_200col_10GB/__ORCHMETA__
hdfs dfs
–getmerge /user/oracle/allnumeric_200col_10GB
/tmp/allnumeric_200col_10GB
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.