Introduction to sparklyr

[This article was first published on Modeling with R, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Introduction

The programming language R has very powerful tools and functions to do almost every thing we want to do, such as wrangling , visualizing, modeling…etc. However, R such as all the classical languages, requires the whole data to be completely loaded into its memory before doing anything, and this is a big disadvantage when we deal with large data set using less powerful machine, so that any even small data manipulation is time consuming, and may be in some cases the data size can exceed the memory size and R fails even to load the data.

However, there are two widely used engines for this type of data hadoop and spark which both use a distributed system to partition the data into different storage locations and distribute any computation processes among different machines (computing clusters), or among different CPU’s inside a single machine.

Spark is more recent and recognized to be more faster than hadoop (2010). scala is its native language, but it can also support SQL and java. Obviously, if you do not know neither spark nor hadoop it would be obvious to choose spark . However, if you are R user and you do not want to spent time to learn the spark languages (scala, or sql) good news for you is that sparklyr package (or sparkR) is R interface for spark from which you can use the most of the R codes and other functions from some packages such as dplyr …etc.

In this paper we will go step by step to learn how to use sparklyr by making use of some examples .

Installing sparklyr

Such as any R package we call the function install.packages to install sparklyr, but before that make sure you have java installed in your system since the programming language scala is run by the java virtual machine.

#install.packages("sparklyr")

Installing spark

We have deliberately installed sparklyr before spark to provide us with the function spark_install() that downloads, installs, and configures the latest version of spark at once.

#spark_install()

Connecting to spark

Usually, spark is designed to create a clusters using multiple machines either physical machines or virtual machines (in the cloud). However, it can also create a local cluster in your single machine by making use of the CPU’s, if exist in this machine, to speed up the data processing.

Wherever the clusters are created (local or in cloud), the data processing functions work in the same way, and the only difference is how to create and interact with these clusters. Since this is the case, then we can get started in our local cluster to learn the most basic things of data science such as importing, analyzing, visualizing data, and perform machine learning models using spark via sparklyr.

To connect to spark in the local mode we use the function spark_connect as follows.

library(sparklyr)
library(tidyverse)
sc<-spark_connect(master = "local")

Importing data

If the data is build-in R we load it to the spark memory using the function copy_to.

mydata<-copy_to(sc,airquality)

Then R can get access to this data by the help of sparklyr, for example we can use the dplyr function glimpse.

glimpse(mydata)
## Rows: ??
## Columns: 6
## Database: spark_connection
## $ Ozone   <int> 41, 36, 12, 18, NA, 28, 23, 19, 8, NA, 7, 16, 11, 14, 18, 1...
## $ Solar_R <int> 190, 118, 149, 313, NA, NA, 299, 99, 19, 194, NA, 256, 290,...
## $ Wind    <dbl> 7.4, 8.0, 12.6, 11.5, 14.3, 14.9, 8.6, 13.8, 20.1, 8.6, 6.9...
## $ Temp    <int> 67, 72, 74, 62, 56, 66, 65, 59, 61, 69, 74, 69, 66, 68, 58,...
## $ Month   <int> 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5,...
## $ Day     <int> 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, ...

And if the data is stored anywhere outside R with any different format, then sparklyr provides some functions to import these data. For example to load csv file we use the function spark_read_csv, and for json we use spark_read_json. To get the list of all the sparklyr functions and their usages click here.

For illustration we will call the data creditcards stored in my machine as follows

card<-spark_read_csv(sc,"creditcard.csv")
sdf_dim(card)
## [1] 284807     31

As you see using the same connection sc we load two data mydata and card

if we want to show what is going on in spark we call the function spark_web() that lead us to the spark website

#spark_web(sc)

Manipulating data

With the help of sparklyr, we can access very easily to the data into spark memory by using the dplyr functions. Let’s apply some manipulations on the data card like, for instance, filtering the data using the variable Time , then computing the mean of Amount for each class label in the variable Class.

card %>%
  filter(Time <= mean(Time,na.rm = TRUE))%>%
      group_by(Class)%>%
  summarise(Class_avg=mean(Amount,na.rm=TRUE))
## # Source: spark<?> [?? x 2]
##   Class Class_avg
##   <int>     <dbl>
## 1     0      89.0
## 2     1     117.

As you can see now the output is a very small table which can moved from spark memory into R memory for further analysis by making use of the function collect. In other words, if you feel with ease in R then each spark output that is small enough to be processed with R add this function at the end of your script before running it to bring this output into R. For example we cannot use the function plot to plot the above table, that is why we should fist pull this output into R then apply the function plot as follows

card %>%
  filter(Time <= mean(Time,na.rm = TRUE))%>%
      group_by(Class)%>%
  summarise(Class_avg=mean(Amount,na.rm=TRUE))%>%
  collect()%>%
  plot(col="red",pch=19,main = "Class average vs Class")

However , we can plot the sparklyr outputs without having to remove them to R memory by using the dbplot functions, since most of the functions of this package are supported by sparklyr. Let’s for example plot the mean of Amount by Class for cards transaction that have time less than the mean.

library(dbplot)
card %>%
  filter(Time <= mean(Time,na.rm = TRUE))%>%
        dbplot_bar(Class,mean(Amount))
## Warning: Missing values are always removed in SQL.
## Use `mean(x, na.rm = TRUE)` to silence this warning
## This warning is displayed only once per session.

As we see the Amount mean of fraudulent cards is higher than that of regular cards.

Disconnecting

each time you finish your work think to disconnect from spark to save your resources as follows.

#spark_disconnect(sc)

saving data

Sparklyr provides functions to save files directly from spark memory into our directory. For example, to save data in csv file we use spark function spark_write_csv (we can save in other type of formats such as spark_write_parquet,…etc) as follows

#spark_write_csv(card,"card.csv")

Example of modeling in spark

For machine learning models spark has its own library MLlib that has almost every thing we need so that we do not need the library caret.

To illustrate how do we perform a machine learning model, we train a logistic regression model to predict the fraudulent cards form the data card.

first let’s split the data between training set and testing set as follows, and to do this we use the function sdf_random_split as follows

partitions<-card%>%
  sdf_random_split(training=0.8,test=0.2,seed = 123)
train<-partitions$training
test<-partitions$test

Now we will use the set train to train our model, and for the model performance we make use of the set test.

model_in_spark<-train %>%
  ml_logistic_regression(Class~.)

we can get the summary of this model by typing its name

model_in_spark
## Formula: Class ~ .
## 
## Coefficients:
##   (Intercept)          Time            V1            V2            V3 
## -8.305599e+00 -4.074154e-06  1.065118e-01  1.473891e-02 -8.426563e-03 
##            V4            V5            V6            V7            V8 
##  6.996793e-01  1.380980e-01 -1.217416e-01 -1.205822e-01 -1.700146e-01 
##            V9           V10           V11           V12           V13 
## -2.734966e-01 -8.277600e-01 -4.476393e-02  7.416858e-02 -2.828732e-01 
##           V14           V15           V16           V17           V18 
## -5.317753e-01 -1.221061e-01 -2.476344e-01 -1.591295e-03  3.403402e-02 
##           V19           V20           V21           V22           V23 
##  9.213132e-02 -4.914719e-01  3.863870e-01  6.407714e-01 -1.096256e-01 
##           V24           V25           V26           V27           V28 
##  1.366914e-01 -5.108841e-02  9.977837e-02 -8.384655e-01 -3.072630e-01 
##        Amount 
##  1.039041e-03

Fortunately, sparklyr also supports the functions of broom package so that We can get nicer table using the function tidy.

library(broom)
tidy(model_in_spark)
## # A tibble: 31 x 2
##    features    coefficients
##    <chr>              <dbl>
##  1 (Intercept)  -8.31      
##  2 Time         -0.00000407
##  3 V1            0.107     
##  4 V2            0.0147    
##  5 V3           -0.00843   
##  6 V4            0.700     
##  7 V5            0.138     
##  8 V6           -0.122     
##  9 V7           -0.121     
## 10 V8           -0.170     
## # ... with 21 more rows

To evaluate the model performance we use the function ml_evaluate as follows

model_summary<-ml_evaluate(model_in_spark,train)
model_summary
## BinaryLogisticRegressionSummaryImpl 
##  Access the following via `$` or `ml_summary()`. 
##  - features_col() 
##  - label_col() 
##  - predictions() 
##  - probability_col() 
##  - area_under_roc() 
##  - f_measure_by_threshold() 
##  - pr() 
##  - precision_by_threshold() 
##  - recall_by_threshold() 
##  - roc() 
##  - prediction_col() 
##  - accuracy() 
##  - f_measure_by_label() 
##  - false_positive_rate_by_label() 
##  - labels() 
##  - precision_by_label() 
##  - recall_by_label() 
##  - true_positive_rate_by_label() 
##  - weighted_f_measure() 
##  - weighted_false_positive_rate() 
##  - weighted_precision() 
##  - weighted_recall() 
##  - weighted_true_positive_rate()

To extract the metric that we want we use $. we can extract for example the accuracy rate, the AUC or the roc

model_summary$area_under_roc()
## [1] 0.9765234
model_summary$accuracy()
## [1] 0.999149
model_summary$roc()
## # Source: spark<?> [?? x 2]
##        FPR   TPR
##      <dbl> <dbl>
##  1 0       0    
##  2 0.00849 0.876
##  3 0.0185  0.898
##  4 0.0285  0.908
##  5 0.0386  0.917
##  6 0.0487  0.922
##  7 0.0587  0.922
##  8 0.0688  0.925
##  9 0.0788  0.929
## 10 0.0888  0.934
## # ... with more rows

we can retrieve this table into R to plot it with ggplot by using the function collect

model_summary$roc()%>%
collect()%>%
ggplot(aes(FPR,TPR ))+
  geom_line(col="blue")+
  geom_abline(intercept = 0,slope = 1,col="red")+
  ggtitle("the roc of model_in_spark ")

High accuracy rate for the training set can be only the result of overfitting problem. the accuracy rate using the testing set is the more reliable one.

pred<-ml_evaluate(model_in_spark,test)
pred$accuracy()
## [1] 0.9994722
pred$area_under_roc()
## [1] 0.9692158

Finally, to get the prediction we use the function ml_predict

pred<-ml_predict(model_in_spark,test)%>%
select(.,Class,prediction,probability_0,probability_1)
pred  
## # Source: spark<?> [?? x 4]
##    Class prediction probability_0 probability_1
##    <int>      <dbl>         <dbl>         <dbl>
##  1     0          0         1.00       0.000221
##  2     0          0         1.00       0.000441
##  3     0          0         1.00       0.000184
##  4     0          0         1.00       0.000490
##  5     0          0         1.00       0.000199
##  6     0          0         0.999      0.000708
##  7     0          0         1.00       0.000231
##  8     0          0         0.999      0.000640
##  9     0          0         1.00       0.000265
## 10     0          0         0.999      0.000720
## # ... with more rows

Here we can also use the function collect to plot the results

pred%>%
  collect()%>%
  ggplot(aes(Class,prediction ))+
  geom_point(size=0.1)+
  geom_jitter()+
  ggtitle("Actual vs predicted")

Streaming

Among the most powrful properties of spark is that can handle streaming data very easily. to show that let’s use a simple example by creating a folder to contain the input for some data transformations and then we save the output in another folder so that each time we add files to the first folder the above transformations will be excuted automotically and the output will be saved in the last folder.

#dir.create("raw_data")

once the file is created we split the data card into tow parts the first part will be exported now to the folder raw_data, and then we apply some operations using spark functions stream_read_csv and spark_wrirte_csv as follows .

#card1<-card%>%
  #filter(Time<=mean(Time,na.rm = TRUE))
#write.csv(card1,"raw_data/card1.csv")
#stream <- stream_read_csv(sc,"raw_data/")%>%
 # select(Class,Amount) %>%
#  stream_write_csv("result/")

If we add the second part in the file raw_data the streaming process lunch to execute the above operation.

#card2<-card%>%
 # filter(Time>mean(Time,na.rm = TRUE))
#write.csv(card,"raw_data/card2.csv")
#dir("result",pattern = ".csv")

we stop the stream

#stream_stop(stream)
sdf_describe(card)
## # Source: spark<?> [?? x 32]
##   summary Time  V1    V2    V3    V4    V5    V6    V7    V8    V9    V10  
##   <chr>   <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr> <chr>
## 1 count   2848~ 2848~ 2848~ 2848~ 2848~ 2848~ 2848~ 2848~ 2848~ 2848~ 2848~
## 2 mean    9481~ 1.75~ -8.2~ -9.6~ 8.32~ 1.64~ 4.24~ -3.0~ 8.81~ -1.1~ 7.09~
## 3 stddev  4748~ 1.95~ 1.65~ 1.51~ 1.41~ 1.38~ 1.33~ 1.23~ 1.19~ 1.09~ 1.08~
## 4 min     0     -56.~ -72.~ -48.~ -5.6~ -113~ -26.~ -43.~ -73.~ -13.~ -24.~
## 5 max     1727~ 2.45~ 22.0~ 9.38~ 16.8~ 34.8~ 73.3~ 120.~ 20.0~ 15.5~ 23.7~
## # ... with 20 more variables: V11 <chr>, V12 <chr>, V13 <chr>, V14 <chr>,
## #   V15 <chr>, V16 <chr>, V17 <chr>, V18 <chr>, V19 <chr>, V20 <chr>,
## #   V21 <chr>, V22 <chr>, V23 <chr>, V24 <chr>, V25 <chr>, V26 <chr>,
## #   V27 <chr>, V28 <chr>, Amount <chr>, Class <chr>

Now we disconnect

spark_disconnect(sc)

To leave a comment for the author, please follow the link and comment on their blog: Modeling with R.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)