sparklyr 0.6

July 30, 2017
By

(This article was first published on RStudio Blog, and kindly contributed to R-bloggers)

We’re excited to announce a new release of the sparklyr package, available in CRAN today! sparklyr 0.6 introduces new features to:

  • Distribute R computations using spark_apply() to execute arbitrary R code across your Spark cluster. You can now use all of your favorite R packages and functions in a distributed context.
  • Connect to External Data Sources using spark_read_source(), spark_write_source(), spark_read_jdbc() and spark_write_jdbc().
  • Use the Latest Frameworks including dplyr 0.7, DBI 0.7, RStudio 1.1 and Spark 2.2.

and several improvements across:

  • Spark Connections add a new Databricks connection that enables using sparklyr in Databricks through mode="databricks", add support for Yarn Cluster through master="yarn-cluster" and connection speed was also improved.
  • Dataframes add support for sdf_pivot(), sdf_broadcast(), cbind(), rbind(), sdf_separate_column(), sdf_bind_cols(), sdf_bind_rows(), sdf_repartition() and sdf_num_partitions().
  • Machine Learning adds support for multinomial regression in ml_logistic_regression(), weights.column for GLM, ml_model_data() and a new ft_count_vectorizer() function for ml_lda().
  • Many other improvements, from initial support for broom over ml_linear_regression() and ml_generalized_linear_regression(), dplyr support for %like%, %rlike% and %regexp%, sparklyr extensions now support download_scalac() to help you install the required Scala compilers while developing extensions, Hive database management got simplified with tbl_change_db() and src_databases() to query and switch between Hive databases. RStudio started a joint effort with Microsoft to support a cross-platform Spark installer under github.com/rstudio/spark-install.

Additional changes and improvements can be found in the sparklyr NEWS file.

Updated documentation and examples are available at spark.rstudio.com. For questions or feedback, please feel free to open a sparklyr github issue or a sparklyr stackoverflow question.

Distributed R

sparklyr 0.6 provides support for executing distributed R code through spark_apply(). For instance, after connecting and copying some data:

library(sparklyr)
sc <- spark_connect(master = "local")
iris_tbl <- sdf_copy_to(sc, iris)

We can apply an arbitrary R function, say jitter(), to each column over each row as follows:

iris_tbl %>% spark_apply(function(e) sapply(e[,1:4], jitter))
## # Source:   table [?? x 4]
## # Database: spark_connection
##    Sepal_Length Sepal_Width Petal_Length Petal_Width
##                                 
##  1     5.102223    3.507372     1.406654   0.1990680
##  2     4.900148    3.002006     1.396052   0.2002922
##  3     4.699807    3.204126     1.282652   0.2023850
##  4     4.618854    3.084675     1.508538   0.2119644
##  5     4.985322    3.596079     1.388837   0.1846146
##  6     5.381947    3.881051     1.686600   0.3896673
##  7     4.613713    3.400265     1.404120   0.2954829
##  8     4.995116    3.408897     1.493193   0.1945901
##  9     4.418538    2.916306     1.392230   0.1976161
## 10     4.891340    3.096591     1.498078   0.1174069
## # ... with 140 more rows

One can also group by columns to apply an operation over each group of rows, say, to perform linear regression over each group as follows:

spark_apply(
  iris_tbl,
  function(e) broom::tidy(lm(Petal_Width ~ Petal_Length, e)),
  names = c("term", "estimate", "std.error", "statistic", "p.value"),
  group_by = "Species"
)
## # Source:   table [?? x 6]
## # Database: spark_connection
##      Species         term    estimate  std.error  statistic      p.value
##                                           
## 1 versicolor  (Intercept) -0.08428835 0.16070140 -0.5245029 6.023428e-01
## 2 versicolor Petal_Length  0.33105360 0.03750041  8.8279995 1.271916e-11
## 3  virginica  (Intercept)  1.13603130 0.37936622  2.9945505 4.336312e-03
## 4  virginica Petal_Length  0.16029696 0.06800119  2.3572668 2.253577e-02
## 5     setosa  (Intercept) -0.04822033 0.12164115 -0.3964146 6.935561e-01
## 6     setosa Petal_Length  0.20124509 0.08263253  2.4354220 1.863892e-02

Packages can be used since they are automatically distributed to the worker nodes; however, using spark_apply() requires R to be installed over each worker node. Please refer to Distributing R Computations for additional information and examples.

External Data Sources

sparklyr 0.6 adds support for connecting Spark to databases. This feature is useful if your Spark environment is separate from your data environment, or if you use Spark to access multiple data sources. You can use spark_read_source(), spark_write_source with any data connector available in Spark Packages. Alternatively, you can use spark_read_jdbc() and spark_write_jdbc() and a JDBC driver with almost any data source.

For example, you can connect to Cassandra using spark_read_source(). Notice that the Cassandra connector version needs to match the Spark version as defined in their version compatibility section.

config <- spark_config()
config[["sparklyr.defaultPackages"]] <- c(
   "datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11")

sc <- spark_connect(master = "local", config = config)

spark_read_source(sc, "emp",
  "org.apache.spark.sql.cassandra",
  list(keyspace = "dev", table = "emp"))

To connect to MySQL, one can download the MySQL connector and use spark_read_jdbc() as follows:

config <- spark_config()
config$`sparklyr.shell.driver-class-path` <- 
  "~/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar"

sc <- spark_connect(master = "local", config = config)

spark_read_jdbc(sc, "person_jdbc",  options = list(
  url = "jdbc:mysql://localhost:3306/sparklyr",
  user = "root", password = "",
  dbtable = "person"))

Notice that the Cassandra connector version needs to match the Spark version as defined in their version compatibility section. See also crassy, an sparklyr extension being developed to read data from Cassandra with ease.

Dataframe Functions

sparklyr 0.6 includes many improvements for working with DataFrames. Here are some additional highlights.

x_tbl <- sdf_copy_to(sc, data.frame(a = c(1,2,3), b = c(2,3,4))) 
y_tbl <- sdf_copy_to(sc, data.frame(b = c(3,4,5), c = c("A","B","C")))

Pivoting DataFrames

It is now possible to pivot (i.e. cross tabulate) one or more columns using sdf_pivot().

sdf_pivot(y_tbl, b ~ c, list(b = "count"))
## # Source:   table [?? x 4]
## # Database: spark_connection
##       b     A     B     C
##      
## 1     4   NaN     1   NaN
## 2     3     1   NaN   NaN
## 3     5   NaN   NaN     1

Binding Rows and Columns

Binding DataFrames by rows and columns is supported through sdf_bind_rows() and sdf_bind_cols():

sdf_bind_rows(x_tbl, y_tbl)
## # Source:   table [?? x 3]
## # Database: spark_connection
##       a     b     c
##     
## 1     1     2  
## 2     2     3  
## 3     3     4  
## 4   NaN     3     A
## 5   NaN     4     B
## 6   NaN     5     C
sdf_bind_cols(x_tbl, y_tbl)
## # Source:   lazy query [?? x 4]
## # Database: spark_connection
##       a   b.x   b.y     c
##      
## 1     1     2     3     A
## 2     3     4     5     C
## 3     2     3     4     B

Separating Columns

Separate lists into columns with ease. This is especially useful when working with model predictions that are returned as lists instead of scalars. In this example, each element in the probability column contains two items. We can use sdf_separate_column() to isolate the item that corresponds to the probability that vs equals one.

library(magrittr)

mtcars[, c("vs", "mpg")] %>%
  sdf_copy_to(sc, .) %>% 
  ml_logistic_regression("vs", "mpg") %>%
  sdf_predict() %>%
  sdf_separate_column("probability", list("P[vs=1]" = 2))
## # Source:   table [?? x 7]
## # Database: spark_connection
##       vs   mpg id58fb64e07a38 rawPrediction probability prediction
##                                   
##  1     0  21.0              0                  1
##  2     0  21.0              1                  1
##  3     1  22.8              2                  1
##  4     1  21.4              3                  1
##  5     0  18.7              4                  0
##  6     1  18.1              5                  0
##  7     0  14.3              6                  0
##  8     1  24.4              7                  1
##  9     1  22.8              8                  1
## 10     1  19.2              9                  0
## # ... with 22 more rows, and 1 more variables: `P[vs=1]` 

Machine Learning

Multinomial Regression

sparklyr 0.6 adds support for multinomial regression for Spark 2.1.0 or higher:

iris_tbl %>%
  ml_logistic_regression("Species", features = c("Sepal_Length", "Sepal_Width"))
## Call: Species ~ Sepal_Length + Sepal_Width
## 
## Coefficients:
##      (Intercept) Sepal_Length Sepal_Width
## [1,]   -201.5540     73.19269   -59.83942
## [2,]   -214.6001     75.09506   -59.43476
## [3,]    416.1541   -148.28775   119.27418

Improved Text Mining with LDA

ft_tokenizer() was introduced in sparklyr 0.5 but sparklyr 0.6 introduces ft_count_vectorizer() and vocabulary.only to simplify LDA:

library(janeaustenr)
lines_tbl <- sdf_copy_to(sc,austen_books()[c(1,3),])

lines_tbl %>%
  ft_tokenizer("text", "tokens") %>%
  ft_count_vectorizer("tokens", "features") %>%
  ml_lda("features", k = 4)
## An LDA model fit on 1 features
## 
## Topics Matrix:
##           [,1]      [,2]      [,3]      [,4]
## [1,] 0.8996952 0.8569472 1.1249431 0.9366354
## [2,] 0.9815787 1.1721218 1.0795771 0.9990090
## [3,] 1.1738678 0.8600233 0.9864862 0.9573433
## [4,] 0.8951603 1.0730703 0.9562389 0.8899160
## [5,] 1.0528748 1.0291708 1.0699833 0.8731401
## [6,] 1.1857015 1.0441299 1.0987713 1.1247574
## 
## Estimated Document Concentration:
## [1] 13.52071 13.52172 13.52060 13.51963

The vocabulary can be printed with:

lines_tbl %>%
  ft_tokenizer("text", "tokens") %>%
  ft_count_vectorizer("tokens", "features", vocabulary.only = TRUE)
## [1] "jane"        "sense"       "austen"      "by"          "sensibility"
## [6] "and"

That’s all for now, disconnecting:

spark_disconnect(sc)

To leave a comment for the author, please follow the link and comment on their blog: RStudio Blog.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.

Search R-bloggers

Sponsors

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)