Data Parallelism Using Oracle R Enterprise

[This article was first published on Oracle R Enterprise, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Modern computer processors are adequately optimized for many statistical calculations, but large data operations may require hours or days to return a result.  Oracle R Enterprise (ORE), a set of R packages designed to process large data computations in Oracle Database, can run many R operations in parallel, significantly reducing processing time. ORE supports parallelism through the transparency layer, where the database is used as a computational engine, and embedded R execution, where R scripts can be executed in a data parallel manner.

The backbone of parallel computing is breaking down a resource intensive computation into chunks that can be performed independently, while maintaining a framework that allows for the results of those independent computations to be combined.  Writing parallel code is typically trickier than writing serial code, but this is simplified using ORE, as there is no need for the user to create worker instances or combine results. Using the transparency layer, users simply execute their ORE code and the database implicitly manages the entire process, returning results for further processing.

With ORE, each R function invocation that operates on an ORE object, such as ore.frame, is translated to a SQL statement behind the scenes. This SQL, which may be stacked after several function invocations, undergoes optimization and parallelization when parsed and executed. This technique enables deferred evaluation, but that’s a topic for another blog. Depending on the resource requirements of the statement, the database decides if it should leverage parallel execution. 

For embedded R execution, database degree of parallelism settings help determine the number of parallel R engines to start.  When data parallel functions execute in parallel, each unit of work is sent to a different R external process, or extproc, at the database server. The results are automatically collated and returned as R-proxy objects, e.g., ore.frame objects, in the R interface and SQL objects in the SQL interface, which can be processed further in R or by SQL functions. The SQL functions enable the operationalizion or productization of R scripts as part of a database-based application, in what we refer to as “lights out” mode.

In the ORE Transparency Layer, where the database executes SQL generated from overloaded R functions, parallelism is automatic, assuming the database or table is configured for parallelism. Parallel computations in the transparency layer are ideal for bigger data where functionality exists in the database.

Using Embedded R Script Execution, parallelism is enabled for row, group and index operations if specified using a function parameter or parallel cursor hint:

  • ore.groupApply and rqGroupEval* split the data into grouped partitions and invoke the R function on each partition in a separate engine at the database server.
  • ore.rowApply and rqRowEval split the data into row chunks and invoke the R function on each chunk in a separate engine at the database server.
  • ore.indexApply runs an R function x times, with each iteration of the function invoked in separate engine at the database server.

With embedded R execution, the expectation is that the database server machine has greater RAM and CPU capacity than the user’s client machine.  So executing R scipts at the server will inherently allow larger data sets to be processed by an individual R engine.

In addition, users can include contributed R packages in their embedded R scripts. Consider an example using a sample of the airline on-time performance data from Research and Innovative Technology Administration (RITA), which coordinates the U.S. Department of Transportation (DOT) research programs. The data sample consists of 220K records of U.S. domestic commercial flights between 1987 and 2008. 

We use the R interface to embedded R to partition the airline data table (ONTIME_S) by the DAYOFWEEK variable, fit a linear model using the biglm package, and then combine the results. Note: To run this example, the biglm package must be installed on both the database server and client machine.

res <- ore.groupApply(ONTIME_S,
               INDEX = ONTIME_S$DAYOFWEEK,
               parallel = TRUE,
               function(dat) {
                library(biglm)
                library(ORE)
                ore.connect("rquser", "orcl", "localhost", "rquser")
                biglm(ARRDELAY ~ DEPDELAY + DISTANCE, dat)
               })

R> summary(res$Monday)
Large data regression model: biglm(ARRDELAY ~ DEPDELAY + DISTANCE, dat)
Sample size =  31649
               Coef    (95%     CI)     SE     p
(Intercept)  0.5177  0.2295  0.8058 0.1441 3e-04
DEPDELAY     0.9242  0.9178  0.9305 0.0032 0e+00
DISTANCE    -0.0014 -0.0017 -0.0011 0.0002 0e+00

The call to ore.groupApply uses Oracle Database to partition the ONTIME_S table by the categories in the DAYOFWEEK variable.  Each category is sent to an R engine at the database server machine to apply the R function in parallel.  The individual category results are combined in the returned result.  Using embedded R alleviates the typical memory problems associated with running R serially because we are fitting only a single partition, or day of the week, in memory of an R engine. Using a Linux server with 8 GB RAM and 4 CPUs, fitting the model in parallel by setting parallel = TRUE in the call to ore.groupApply, reduces the processing time from approximately 30 seconds to 10 seconds.

If the goal is to integrate the model results as an operationalized process, we can use rqGroupEval, the SQL interface equivalent to ore.groupApply.  We create a script to set up the structure of the input and grouping column and then run the script in SQL. The nature of pipelined table functions requires that we explicitly represent the type of the result, captured in the package, and create a function that includes the column used for partitioning explicitly.

# setup  

SQL> CREATE PACKAGE airlinePkg AS
  2  TYPE cur IS REF CURSOR RETURN ONTIME_S%ROWTYPE;
  3  END airlinePkg;
  4  /

Package created.

SQL> CREATE FUNCTION ontimeGroupEval(
  inp_cur  airlinePkg.cur,
  par_cur  SYS_REFCURSOR, 
  out_qry  VARCHAR2,
  grp_col  VARCHAR2,
  exp_txt  CLOB)
RETURN SYS.AnyDataSet
PIPELINED PARALLEL_ENABLE (PARTITION inp_cur BY HASH (DAYOFWEEK))
CLUSTER inp_cur BY (DAYOFWEEK)
USING rqGroupEvalImpl;
/

# model build

alter table ONTIME_S parallel;

SQL> begin
  sys.rqScriptCreate('GroupingExample',
 'function(dat) {
      library(biglm)
library(ORE)
      ore.connect("rquser", "orcl", "localhost", "rquser")
      result <- biglm(ARRDELAY ~ DISTANCE + DEPDELAY, dat)
      result}');
end;
/ 
create table ONTIME_LM as
select *
  from table(ontimeGroupEval(
         cursor(select /*+ parallel(ONTIME_S)*/
         from ONTIME_S),
         NULL, NULL, 'DAYOFWEEK', 'GroupingExample'));

begin
  sys.rqScriptDrop('GroupingExample');
end;
/


We use a parallel hint on the cursor that is the input to our rqGroupEval function to enable Oracle Database to use parallel R engines.  In this case, using the same Linux server, the processing time is reduced from approximately 25 seconds to 7 seconds as we used 7 parallel R engines (one for each day of the week) across a single server.  Of course, a real-world scenario may utilize hundreds of parallel engines across many servers, returning results on large amounts of data in short period of time.

Additional details on how parallel execution works in Oracle database can be found here. We encourage you download Oracle software for evaluation from the Oracle Technology Network. See these links for R-related software: Oracle R Distribution, Oracle R Enterprise, ROracle, Oracle R Connector for Hadoop.  As always, we welcome comments and questions on the Oracle R Forum.

*To enable execution of an R script in the SQL interface, ORE provides variants of ore.doEval, ore.groupApply and ore.indexApply in SQL. These functions are rqEval, rqTableEval, rqRowEval and rqGroupEval. The ore.groupApply feature does not have a direct parallel in the SQL interface. We refer to rqGroupApply as a concept, however, there is specific code required to enable this feature. This is highlighted in the second example.

To leave a comment for the author, please follow the link and comment on their blog: Oracle R Enterprise.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)