Big Data Analytics in R – the tORCH has been lit!

May 22, 2013
By

(This article was first published on Oracle R Enterprise, and kindly contributed to R-bloggers)

This guest post from Anand Srinivasan compares performance of the Oracle R Connector for Hadoop with the R {parallel} package for covariance matrix computation, sampling, and parallel linear model fitting.

Oracle R Connector for Hadoop (ORCH) is a collection of R packages that enables Big Data analytics from the R environment. It enables a Data Scientist /Analyst to work on data straddling multiple data platforms (HDFS, Hive, Oracle Database, local files) from the comfort of the R environment and benefit from the R ecosystem.

ORCH provides:

1) Out of the box predictive analytic techniques for linear regression, neural networks for prediction, matrix completion using low rank matrix factorization, non-negative matrix factorization, kmeans clustering, principal components analysis and multivariate analysis. While all these techniques have R interfaces, they are implemented either in Java or in R as distributed parallel implementations leveraging all nodes of your Hadoop cluster

2) A general framework, where a user can use the R language to write custom logic executable in a distributed parallel manner using available compute and storage resources.

The main idea behind the ORCH architecture and its approach to Big Data analytics is to leverage the Hadoop infrastructure and thereby inherit all its advantages.

The crux of ORCH is read parallelization and robust methods over parallelized data. Efficient parallelization of reads is the single most important step necessary for Big Data Analytics because it is either expensive or impractical to load all available data in a single thread.

ORCH is often compared/contrasted with the other options available in R, in particular the popular open source R package called parallel. The parallel package provides a low-level infrastructure for “coarse-grained” distributed and parallel computation. While it is fairly general, it tends to encourage an approach that is based on using the aggregate RAM in the cluster as opposed to using the file system. Specifically, it lacks a data management component, a task management component and an administrative interface for monitoring. Programming, however, follows the broad Map Reduce paradigm.

In the rest of this article, we assume that the reader has basic familiarity with the parallel package and proceed to compare ORCH and its approach with the parallel package. The goal of this comparison is to explain what it takes for a user to build a solution for their requirement using each of these technologies and also to understand the performance characteristics of these solutions.

We do this comparison using three concrete use cases – covariance matrix computation, sampling and partitioned linear model fitting. The exercise is designed to be repeatable, so you, the reader, can try this “at home”. We will demonstrate that ORCH is functionally and performance-wise superior to the available alternative of using R’s parallel package.

A six node Oracle Big Data Appliance v2.1.1 cluster is used in the experiments. Each node in this test environment has 48GB RAM and 24 CPU cores.

Covariance Matrix Computation

Computing covariance matrices is one of the most fundamental of statistical techniques.

In this use case, we have a single input file, “allnumeric_200col_10GB” (see appendix on how to generate this data set), that is about 10GB in size and has a data matrix with about 3 million rows and 200 columns. The requirement is to compute the covariance matrix of this input matrix.

Since a single node in the test environment has 48GB RAM and the input file is only 10GB, we start with the approach of loading the entire file into memory and then computing the covariance matrix using R’s cov function.

> system.time(m <- matrix(scan(file="/tmp/allnumeric_200col_10GB",what=0.0, sep=","), ncol=200, byrow=TRUE))

user system elapsed

683.159 17.023 712.527

> system.time(res <- cov(m))

user system elapsed

561.627 0.009 563.044

We observe that the loading of data takes 712 seconds (vs. 563 seconds for the actual covariane computation) and dominates the cost. It would be even more pronounced (relative to the total elapsed time) if the cov(m) computation were parallelized using mclapply from the parallel package.

Based on this, we see that for an efficient parallel solution, the main requirement is to parallelize the data loading. This requires that the single input file be split into multiple smaller-sized files. The parallel package does not offer any data management facilities; hence this step has to be performed manually using a Linux command like split. Since there are 24 CPU cores, we split the input file into 24 smaller files.

time(split -l 127334 /tmp/allnumeric_200col_10GB)

real 0m54.343s

user 0m3.598s

sys 0m24.233s

Now, we can run the R script:

library(parallel)

infile <- file.path("/home/oracle/anasrini/cov",paste("p",id,sep=""))

print(infile)

m <- matrix(scan(file=infile, what=0.0, sep=","), ncol=200, byrow=TRUE)

m

}

# Main MAPPER function

compCov <- function(id) {

cs <- colSums(m)    # compute col sums, num rows

# compute main cov portion

nr <- nrow(m)

mtm <- crossprod(m)

list(mat=mtm, colsum=cs, nrow=nr)

}

numfiles <- 24

numCores <- 24

# Map step

system.time(mapres <- mclapply(seq_len(numfiles), compCov, mc.cores=numCores))

# Reduce step

system.time(xy <- Reduce("+", lapply(mapres, function(x) x$mat))) system.time(csf <- Reduce("+", lapply(mapres, function(x) x$colsum)))

system.time(nrf <- Reduce("+", lapply(mapres, function(x) x$nrow))) sts <- csf %*% t(csf) m1 <- xy / (nrf -1) m2 <- sts / (nrf * (nrf-1)) m3 <- 2 * sts / (nrf * (nrf-1)) covmat <- m1 + m2 - m3 user system elapsed 1661.196 21.209 77.781 We observe that the elapsed time (excluding time to split the files) has now come down to 77 seconds. However, it took 54 seconds for splitting the input file into smaller files, making it a significant portion of the total elapsed time of 77+54 = 131 seconds. Besides impacting performance, there are a number of more serious problems with having to deal with data management manually. We list a few of them here: 1) In other scenarios, with larger files or larger number of chunks, placement of chunks also becomes a factor that influences I/O parallelism. Optimal placement of chunks of data over the available set of disks is a non-trivial problem 2) Requirement of root access – Optimal placement of file chunks on different disks often requires root access. For example, only root has permissions to create files on disks corresponding to the File Systems mounted on /u03, /u04 etc on an Oracle Big Data Appliance node 3) When multiple nodes are involved in the computation, moving fragments of the original data into different nodes manually can drain productivity 4) This form of split can only work in a static environment – in a real-world dynamic environment, information about other workloads and their resource utilization cannot be factored in a practical manner by a human 5) Requires admin to provide user access to all nodes of the cluster in order to allow the user to move data to different nodes ORCH-based solution On the other hand, using ORCH, we can directly use the out of the box support for multivariate analysis. Further, no manual steps related to data management (like splitting files and addressing chunk placement issues) are required since Hadoop (specifically HDFS) handles all those requirements seamlessly. >x <- hdfs.attach("allnumeric_200col_10GB") > system.time(res <- orch.cov(x)) user system elapsed 18.179 3.991 85.640 Forty-two concurrent map tasks were involved in the computation above as determined by Hadoop. To conclude, we can see the following advantages of the ORCH based approach in this scenario : 1) No manual steps. Data Management completely handled transparently by HDFS 2) Out of the box support for cov. The distributed parallel algorithm is available out of the box and the user does not have to work it out from scratch 3) Using ORCH we get comparable performance to that obtained through manual coding without any of the manual overheads Sampling We use the same single input file, “allnumeric_200col_10GB” in this case as well. The requirement is to obtain a uniform random sample from the input data set. The size of the sample required is specified as a percentage of the input data set size. Once again for the solution using the parallel package, the input file has to be split into smaller sized files for better read parallelism. library(parallel) # Read the data readInput <- function(id) { infile <- file.path("/home/oracle/anasrini/cov", paste("p",id,sep="")) print(infile) system.time(m <- matrix(scan(file=infile, what=0.0, sep=","), ncol=200, byrow=TRUE)) m } # Main MAPPER function samplemap <- function(id, percent) { m <- readInput(id) # read the input v <- runif(nrow(m)) # Generate runif # Pick only those rows where random < percent*0.01 keep <- which(v < percent*0.01) m1 <- m[keep,,drop=FALSE] m1 } numfiles <- 24 numCores <- 24 # Map step percent <- 0.001 system.time(mapres <- mclapply(seq_len(numfiles), samplemap, percent, mc.cores=numCores)) user system elapsed 1112.998 23.196 49.561 ORCH based solution >x <- hdfs.attach("allnumeric_200col_10GB_single") >system.time(res <- orch.sample(x, percent=0.001)) user system elapsed 8.173 0.704 33.590 The ORCH based solution out-performs the solution based on the parallel package. This is because orch.sample is implemented in Java and the read rates obtained by a Java implementation are superior to what can be achieved in R. Partitioned Linear Model Fitting Partitioned Linear Model Fitting is a very popular use case. The requirement here is to fit separate linear models, one for each partition of the data. The data itself is partitioned based on a user-specified partitioning key. For example, using the ONTIME data set, the user could specify destination city as the partitioning key indicating the requirement for separate linear models (with, for example, ArrDelay as target), 1 per destination city. ORCH based solution dfs_res <- hadoop.run( data = input, mapper = function(k, v) { orch.keyvals(v$Dest, v) },

reducer = function(k, v) {

lm_x <- lm(ArrDelay ~ DepDelay + Distance, v)

orch.keyval(k, orch.pack(model=lm_x, count = nrow(v)))

},

config = new("mapred.config",

job.name = "ORCH Partitioned lm by Destination City",

map.output = mapOut,

mapred.pristine = TRUE,

reduce.output = data.frame(key="", model="packed"),

)

)

Notice that the Map Reduce framework is performing the partitioning. The mapper just picks out the partitioning key and the Map Reduce framework handles the rest. The linear model for each partition is then fitted in the reducer.

parallel based solution

As in the previous use cases, for good read parallelism, the single input file needs to be split into smaller files. However, unlike the previous use cases, there is a twist here.

We noted that with the ORCH based solution it is the Map Reduce framework that does the actual partitioning. There is no such out of the box feature available with a parallel package-based solution. There are two options:

1) Break up the file arbitrarily into smaller pieces for better read parallelism. Implement your own partitioning logic mimicking what the Map Reduce framework provides. Then fit linear models on each of these partitions in parallel.

OR

2) Break the file into smaller pieces such that each piece is a separate partition. Fit linear models on each of these partitions in parallel

Both of these options are not easy and require a lot of user effort. The custom coding required for achieving parallel reads is significant.

Conclusion

ORCH provides a holistic approach to Big Data Analytics in the R environment. By leveraging the Hadoop infrastructure, ORCH inherits several key components that are all required to address real world analytics requirements.

The rich set of out-of-the-box predictive analytic techniques along with the possibility of authoring custom parallel distributed analytics using the framework (as demonstrated in the partitioned linear model fitting case) helps simplify the user’s task while meeting the performance and scalability requirements.

Appendix – Data Generation

We show the steps required to generate the single input file “allnumeric_200col_10GB”.

Run the following in R:

x <- orch.datagen(datasize=10*1024*1024*1024, numeric.col.count=200,

map.degree=40)

hdfs.mv(x, "allnumeric_200col_10GB")

Then, from the Linux shell:

hdfs dfs –rm –r –skipTrash /user/oracle/allnumeric_200col_10GB/__ORCHMETA__

hdfs dfs –getmerge /user/oracle/allnumeric_200col_10GB /tmp/allnumeric_200col_10GB