In this post, we’ll have a look at how easy it is to manipulate Hive tables using Oracle R connectors for Hadoop (ORCH, presently known as Oracle R Advanced Analytics for Hadoop – ORAAH). We will use the weblog data from Athens Datathon 2015, which we have already loaded in a Hive table named
weblogs, as described in more detail in our previous post. Along the way, we will report about the necessary map-reduce (MR) jobs triggered by the commands we issue in R, in order to give a more thorough picture of what is happening behind the scenes.
We first have to load the required library, and connect to Hive:
> library(ORCH) Loading required package: OREbase Attaching package: ‘OREbase’ The following objects are masked from ‘package:base’: cbind, data.frame, eval, interaction, order, paste, pmax, pmin, rbind, table Loading required package: OREstats Loading required package: MASS Loading required package: ORCHcore Loading required package: rJava Oracle R Connector for Hadoop 2.5.0 (rev. 307) Info: using native C base64 encoding implementation Info: Hadoop distribution is Cloudera's CDH v5.4.0 Info: using auto-detected ORCH HAL v4.2 Info: HDFS workdir is set to "/user/oracle" Warning: mapReduce checks are skipped due to "ORCH_MAPRED_CHECK"=FALSE Warning: HDFS checks are skipped due to "ORCH_HDFS_CHECK"=FALSE Info: Hadoop 2.6.0-cdh5.4.0 is up Info: Sqoop 1.4.5-cdh5.4.0 is up Info: OLH 3.4.0 is up Info: loaded ORCH core Java library "orch-core-2.5.0-mr2.jar" Loading required package: ORCHstats > ore.connect(type="HIVE")
In contrast to what happens in the case of an Oracle database connection, the command
ore.ls() for a Hive connection gives no results; we first have to explicitly attach the specific Hive tables with which we are going to work (in our case
weblogs), and then use
ore.attach() without arguments:
> ore.ls() character(0) > ore.sync(table="weblogs") > ore.ls()  "weblogs" > ore.attach()
Let’s suppose now that we want to create two new tables from
weblogs: the first one will include only the actions of “legitimate” registered users, i.e. all actions with a non-empty value of
registereduserid, but without the anomalous user with
registereduserid = "cdc75a725963530a598e71c70677926b9a5f1bc1" (see this post for details); the second table will contain only the actions of this phantom user, possibly in order to examine them in more detail further downstrean in our data pipeline. Let us start with the first table:
> anom <- 'cdc75a725963530a598e71c70677926b9a5f1bc1' > reg <- weblogs[weblogs$registereduserid != '' & weblogs$registereduserid != anom,] # triggers 2 MR jobs, gives no. of columns in RStudio viewer (13)
We can see that the command above looks indistinguishable from a similar command we would use for an ordinary R dataframe; behind the scenes, it triggers 2 MR jobs, and it also returns the number of columns of the respective ore.frame item. We can verify that no new MR job is triggered if we ask for the number of columns, but a new MR job is indeed triggered to return the number of rows (records):
> ncol(reg) # no new MR job triggered  13 > class(reg)  "ore.frame" attr(,"package")  "OREbase" > nrow(reg) # triggers MR job  418329
We can now proceed to create a new Hive table,
weblogs_registered, containing only the actions of the “legitimate” registered users, and verify its creation as follows:
> ore.create(reg, table='weblogs_registered') # triggers 2 MR jobs > ore.exists("weblogs_registered")  TRUE
We can repeat the above procedure, this time for creating our second table
weblogs_phantom, containing the actions of our anomalous user:
> phantom <- weblogs[weblogs$registereduserid == anom,] # triggers 2 MR jobs > nrow(phantom) # triggers MR job  147638 > ore.create(phantom, table='weblogs_phantom') # triggers 2 MR jobs
Notice that the tables we create during a session are automatically added to our workspace, without needing to invoke again any
> ore.ls()  "weblogs" "weblogs_phantom" "weblogs_registered"
Since we have created a Hive table with only ~ 150,000 rows which should be manageable in-memory, let’s pull it to a regular R dataframe for further detailed processing:
> phantom_df <- ore.pull(weblogs_phantom) # triggers MR job Warning message: ORE object has no unique key - using random order > class(phantom_df)  "data.frame" > head(phantom_df) sttp usersessionid registereduserid pathurl 1 800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/292/toggle_item/287321.json 2 800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/292/toggle_item/419492.json 3 800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/292/toggle_item/93301.json 4 800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/292/toggle_item/344422.json 5 800 af1594b83274edbd633ce613464fffc9 cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/292/toggle_item/344423.json 6 800 138552666a0d0216373941df880d8cce cdc75a725963530a598e71c70677926b9a5f1bc1 /comparelists/1384/toggle_item/3290612.json action datetime query categoryid shopid productid refurl sku_id results 1 csa 2013-09-22 00:05:36 2 csa 2013-09-22 00:05:54 3 csa 2013-09-22 00:05:58 4 csa 2013-09-22 00:06:09 5 csa 2013-09-22 00:07:23 6 csa 2013-09-22 00:08:55
On a second thought, we realize that we don’t really need the registereduserid field in the
weblogs_phantom table (it consists of just a single value); so, as a last demonstration, we will:
- Drop the existing Hive table
- Remove the registereduserid column from our local R dataframe
- Upload the updated R dataframe to Hive as a new
Here are the necessary R commands:
> ore.drop(table="weblogs_phantom") > ore.exists("weblogs_phantom")  FALSE > phantom_df$registereduserid <- NULL > ncol(phantom_df)  12 > ore.create(phantom_df, table='weblogs_phantom') # no MR job triggered > ore.ls()  "weblogs" "weblogs_phantom" "weblogs_registered" > ncol(weblogs_phantom) # no MR job triggered  12
As we can see, no more MR jobs are triggered in the whole process.
What if the number of records of our phantom user were so large that they could not fit in memory? In such a case we could not pull the data in a local R dataframe, as we have done above. Nevertheless, we can still do this operation (i.e selecting a subset or rows and columns) simply by using the overloaded
subset method of Oracle R on ore.frames, in a manner practically indistinguishable from a similar operation on ordinary R dataframes:
> ore.drop(table="weblogs_phantom") > cc <- colnames(weblogs) > cc  "sttp" "usersessionid" "registereduserid" "pathurl" "action" "datetime"  "query" "categoryid" "shopid" "productid" "refurl" "sku_id"  "results" > phantom <- subset(weblogs, registereduserid == anom, select=cc[cc != "registereduserid"]) # triggers 2 MR jobs > ncol(phantom) # no MR job triggered  12 > nrow(phantom) # triggers MR job  147638 > class(phantom)  "ore.frame" attr(,"package")  "OREbase" > ore.create(phantom, table = 'weblogs_phantom') > ore.ls()  "weblogs" "weblogs_phantom" "weblogs_registered"
We have demonstrated how to perform elementary ETL in Hive by creating, dropping, and altering tables, without using one single statement of HiveQL, by utilizing the Oracle R connectors for Hadoop. We claim that the code snippets presented above have indeed a ‘pure-R’ feeling, and regular R users should find them very familiar and intuitive.
> ore.disconnect() > sessionInfo() Oracle Distribution of R version 3.1.1 (--) Platform: x86_64-unknown-linux-gnu (64-bit) locale:  LC_CTYPE=en_US.UTF-8 LC_NUMERIC=C LC_TIME=C LC_COLLATE=C LC_MONETARY=C  LC_MESSAGES=C LC_PAPER=C LC_NAME=C LC_ADDRESS=C LC_TELEPHONE=C  LC_MEASUREMENT=C LC_IDENTIFICATION=C attached base packages:  stats graphics grDevices utils datasets methods base other attached packages:  ORCH_2.5.0 ORCHstats_2.5.0 ORCHcore_2.5.0 rJava_0.9-7 OREstats_1.4.1 MASS_7.3-33 OREbase_1.4.1 loaded via a namespace (and not attached):  tools_3.1.1
The post Manipulating Hive tables with Oracle R connectors for Hadoop appeared first on Nodalpoint.