Read from hdfs with R. Brief overview of SparkR.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
Disclaimer: originally I planned to write post about R functions/packages which allow to read data from hdfs (with benchmarks), but in the end it became more like an overview of SparkR capabilities.
Nowadays working with “big data” almost always means working with hadoop ecosystem. A few years ago this also meant that you also would have to be a good java programmer to work in such environment – even simple word count program took several dozens of lines of code. But 2-3 years ago things changed – thanks to Apache Spark with its concise (but powerful!) functional-style API. It is written in Scala, but also has java, python and recently R APIs.
Spark
I started to use Spark more than 2 years ago (and used it a lot). In most cases I use scala because
- JVM native
- the only fully featured – RDD level API, MLlib, GraphX, etc.
- nice REPL
- scala is well suited for data munging – good tradeoff between complexity and efficiency.
During this period I tried several times SparkR, but until version 1.6 it had too many rough edges. Starting from 1.6 it became a really useful tool for simple manipulations on spark data frames. Unfortunately we still do not have R user defined functions, so sparkR functionality is limited to built-in functions. Common pipelene for data scientist can be the following:
- read data from hdfs
- do some data wrangling (join/filter/etc.)
- optionally take subset/sample and collect data to local R session for exploratory analysis and fitting models.
Lets have a closer look into these steps.
Reading data from hdfs
Files in hdfs are usually stored in the following formats:
- plain txt/csv/json files
- sequence files. You can think of them as serialized java objects. In recent years became less popular. Also they are not portable (need custom readers), so I do not find them interesting for this post.
- avro (row-based)
- paruqet (column-based)
- orc (column-based)
Good news is that Spark (and SparkR!) can read json
, parquet
, orc
with built-in read.df
function and csv
, avro
with read.df
and spark-avro, spark-csv spark packages.
Data wrangling
SparkR allows to perform dplyr-style manipulations on spark data frames. See official DataFrame and SparkR documents for details. Also I would like to highlight, that package provides quite comprehensive set of methods for manipulations on spark data frames including functions for:
- data frames
join
,filter
,group_by
,sample
- date / time manipulations
- string manipulations, regular expressions
- general math / statistical functions like
sin
,cos
,mean
, etc.
See full list of functions in package documentation.
Collecting data to local R session
However if you need to perform more complex manipulations to fit some model, you may need to collect data to local R session (and take a sample if size is too big). And here you can be unpleasantly surprised – collecting even small 50mb data frame can take minutes (see example below). Current mechanism of serialization / deserealization between R and JVM was designed primarily for exchanging meta-information (like function calls), not data. See this JIRA tikcket for details. Hopefully this issue will be fixed in the next release.
Examples and timings
First of all we need several things to be installed:
- hadoop. I have it installed at
/opt/hadoop-2.6.0
. - Spark and SparkR – just download prebuilded version and unpack it.
/opt/spark-1.6.0-bin-hadoop2.6
in my case.
Setting up SparkR on YARN
At work I have YARN cluster and client machine with Rstudio Server from which I usually work. To make SparkR work with Rstudio Server you should set up several system variables – SPARK_HOME
, YARN_CONF_DIR
, etc. You can follow official manual, but doing this each time makes me sad. The simpler way is to add this variables to ~/.Renviron.site
or {R_HOME}/etc/Renviron.site
(for system-wide oprions) files. Here are my configs:
SPARK_HOME=/opt/spark-1.6.0-bin-hadoop2.6
R_LIBS_SITE=${R_LIBS_SITE}:${SPARK_HOME}/R/lib
YARN_CONF_DIR=/opt/hadoop-2.6.0/etc/hadoop
JAVA_HOME=/usr/java/jdk170_64_45
Reading from hdfs to local R session
For becnhmarks we will generate small data frame with 1M rows:
Now we will save it to disk and copy to hdfs:
Now lets try to read it with SparkR and collect to local R session:
more than 2 minutes! So at least until next release we should avoid using collect
for any medium to large size data frames.
Alternatives
data.table
Here my favourite package comes in - data.table and fread
function. I believe many of data.table
users don’t know, that fread
input can be not only a file name, but also a unix pipe!
This takes only 4 seconds! Antother great thing is that fs -text
command can automatically choose codec for uncompressing files:
dataconnector
One drawback of data.table::fread
is that it can parse only flat files. Spark data frames can consists of nested columns (like R data frame with columns of type list
). For such (usually rare) cases we can save data frame in orc
format and then read it with dataconnector::orc2dataframe
function.
dataconnector
is new package developed HP Vertica Analytics Team (probably initially for working with DistributedR) and unfortunately not well known yet. But it is incredibly useful - it allows to:
- read
orc
andcsv
files from local file system or hdfs. Hope eventually we will also obtain parquet support; - write arbitrary R objects directly to hdfs;
Another nice thing is that it doesn’t requre hadoop and java/RJava!
other options
- rhdfs and ravro packages by RevolutionAnalytics. Never tried, so can’t say anything.
h2o::h2o.importFile
, but it can be tricky to set up h2o in hdfs-client mode.
What tools you use? Please, share your experience in comments.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.