R in big data pipeline

August 15, 2015

(This article was first published on Opiate for the masses, and kindly contributed to R-bloggers)

R is my fabovite tool for research. There are still quite a few things that only R can do or quicker/easier with R.

But unfortunately a lot of people think R becomes less powerful at production stage where you really need to make sure all the functionalities run as you planned against incoming big data.

Personally, what makes R special in the data field is its ability to become friend with many other tools. R can easily ask JavaScript for data visualization, node.js for interactive web app and data pipeline tools/databases for production ready big data system.

In this post I address how to use R stably combined with other tools in big data pipeline without losing its awesomeness.


You’ll find how to include R into luigi, light weight python data workflow management library. You can still use R’s awesomeness in complex big data pipeline while handling big data tasks by other appropriate tools.

I’m not covering luigi basics in this post. Please refer to luigi website if necesary.

Simple pipeline

Here is a very simple example;

  • HiveTask1: Wait for external hive data task (table named “externaljob” partitioned by timestamp)

  • RTask: Run awesome R code as soon as pre-aggregation finishes

  • HiveTask2: Upload it back to Hive as soon as the above job finishes (table names “awesome” partitioned by timestamp)

and you wanna do this job everyday in an easily debuggable fashion with fancy workflow UI.

That’s super easy, just run

python awesome.py --HiveTask1-timestamp 2015-08-20

This runs python file called awesome.py. --HiveTask1-timestamp 2015-08-20 sets 2015-08-20 as timestamp argument in HiveTask1 class.

Yay, all the above tasks are now connected in the luigi task UI!
Notice our workflow goes from bottom to top.
You can see there is an error in the very first HiveTask2 but this is just by design.



Let’s take a look at awesome.py:

import luigi
from luigi.file import LocalTarget
from luigi.hive import HiveTableTarget
from luigi.contrib.hdfs import HdfsTarget
import subprocess
import sys

class HiveTask1(luigi.ExternalTask):
    timestamp = luigi.DateParameter(is_global=True)
    def output(self):
        return HdfsTarget('/user/storage/externaljob/timestamp=%s' % self.timestamp.strftime('%Y%m%d'))

class RTask(luigi.Task):
    timestamp = HiveTask1.timestamp
    def requires(self):
        return HiveTask1()
    def run(self):
        subprocess.call('Rscript awesome.R %s' % self.timestamp.strftime('%Y%m%d'),shell=True)
    def output(self):
        return LocalTarget('awesome_is_here_%s.txt' % self.timestamp.strftime('%Y%m%d'))

class HiveTask2(luigi.Task):
    def requires(self):
        return RTask()
    def run(self):
        subprocess.call('Rscript update2hive.R %s' % self.timestamp.strftime('%Y%m%d'),shell=True)
    def output(self):
        return HdfsTarget('/user/hive/warehouse/awesome/timestamp=%s' % self.timestamp.strftime('%Y%m%d'))

if __name__ == '__main__':

Basically the file only contains three classes (HiveTask1, RTask and HiveTask2) and their dependency is specified by

def requires(self):
        return TASKNAME

luigi checks dependencies and outputs of each step so it checks existense of;

  • '/user/storage/externaljob/timestamp=%s' % self.timestamp.strftime('%Y%m%d')
  • 'awesome_is_here_%s.txt' % self.timestamp.strftime('%Y%m%d')
  • '/user/hive/warehouse/awesome/timestamp=%s' % self.timestamp.strftime('%Y%m%d')

The most important thing here is using python’s subprocess module with shell=True, so you can run your R file

def run(self):
        subprocess.call('Rscript YOUR_R_FILE',shell=True)

The timestamp argument you gave at the very beginning is stored as global variable timestamp (well, this is not necessarily the coolest option)

timestamp = luigi.DateParameter(is_global=True)

and can be used in other tasks by

timestamp = HiveTask1.timestamp

Moreover, you can pass timestamp to R file by

'Rscript awesome.R %s' % self.timestamp.strftime('%Y%m%d')

Then let’s take a look at awesome.R

args <- commandArgs(TRUE)
X <- as.character(args[1])
timestamp <- format(as.Date(X,"%Y-%m-%d"),"%Y%m%d")



In R side, you can receive timestamp argument you passed from python by

args <- commandArgs(TRUE)
X <- as.character(args[1])

Similarly, update2hive.R can look like

args <- commandArgs(TRUE)
X <- as.character(args[1])
temp <- list.files(pattern='TEMPLATE.hql')
Q <- infuse(temp, timestamp=timestamp,verbose=T)
writeLines(Q, fileConn)
system('hive -f FINALHQL.hql')
#this file updates

One last thing you might like to do is to set a cronjob.

0 1 * * * python awesome.py --HiveTask1-timestamp `date --date='+1 days' +%Y-%m-%d`

This one for example runs the whole thing at 1 a.m everyday.


In this post I’ve shown simple example of how to quickly convert your research R project into solid deployable product.
This is not limited to simple R-hive integration but you can let R, spark, databases, stan/bugs, H2O, vowpal wabbit and millions of other data tools dance together as you wish. and you’ll recognize R still plays a central role in the play.


The full codes are available from here.

R in big data pipeline was originally published by Kirill Pomogajko at Opiate for the masses on August 16, 2015.

To leave a comment for the author, please follow the link and comment on their blog: Opiate for the masses.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...

If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.


Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)