Using Spark from R for performance with arbitrary code – Part 3 – Using R to construct SQL queries and let Spark execute them

October 12, 2019
By

[This article was first published on Jozef's Rblog, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Introduction

In the previous part of this series, we looked at writing R functions that can be executed directly by Spark without serialization overhead with a focus on writing functions as combinations of dplyr verbs and investigated how the SQL is generated and Spark plans created.

In this third part, we will look at how to write R functions that generate SQL queries that can be executed by Spark, how to execute them with DBI and how to achieve lazy SQL statements that only get executed when needed. We also briefly present wrapping these approaches into functions that can be combined with other Spark operations.

Preparation

The full setup of Spark and sparklyr is not in the scope of this post, please check the previous one for some setup instructions and a ready-made Docker image.

# Load packages
suppressPackageStartupMessages({
  library(sparklyr)
  library(dplyr)
  library(nycflights13)
})

# Prepare the data
weather <- nycflights13::weather %>%
  mutate(id = 1L:nrow(nycflights13::weather)) %>% 
  select(id, everything())

# Connect
sc <- sparklyr::spark_connect(master = "local")

# Copy the weather dataset to the instance
tbl_weather <- dplyr::copy_to(
  dest = sc, 
  df = weather,
  name = "weather",
  overwrite = TRUE
)
# Copy the flights dataset to the instance
tbl_flights <- dplyr::copy_to(
  dest = sc, 
  df = nycflights13::flights,
  name = "flights",
  overwrite = TRUE
)

R functions as Spark SQL generators

There are use cases where it is desirable to express the operations directly with SQL instead of combining dplyr verbs, for example when working within multi-language environments where re-usability is important. We can then send the SQL query directly to Spark to be executed. To create such queries, one option is to write R functions that work as query constructors.

Again using a very simple example, a naive implementation of column normalization could look as follows. Note that the use of SELECT * is discouraged and only here for illustration purposes:

normalize_sql <- function(df, colName, newColName) {
  paste0(
    "SELECT",
    "\n  ", df, ".*", ",",
    "\n  (", colName, " - (SELECT avg(", colName, ") FROM ", df, "))",
    " / ",
    "(SELECT stddev_samp(", colName,") FROM ", df, ") as ", newColName,
    "\n", "FROM ", df
  )
}

Using the weather dataset would then yield the following SQL query when normalizing the temp column:

normalize_temp_query <- normalize_sql("weather", "temp", "normTemp")
cat(normalize_temp_query)
## SELECT
##   weather.*,
##   (temp - (SELECT avg(temp) FROM weather)) / (SELECT stddev_samp(temp) FROM weather) as normTemp
## FROM weather

Now that we have the query created, we can look at how to send it to Spark for execution.

Apache Spark and R logos

Apache Spark and R logos

Executing the generated queries via Spark

Using DBI as the interface

The R package DBI provides an interface for communication between R and relational database management systems. We can simply use the dbGetQuery() function to execute our query, for instance:

res <- DBI::dbGetQuery(sc, statement = normalize_temp_query)
head(res)
##   id origin year month day hour  temp  dewp humid wind_dir wind_speed
## 1  1    EWR 2013     1   1    1 39.02 26.06 59.37      270   10.35702
## 2  2    EWR 2013     1   1    2 39.02 26.96 61.63      250    8.05546
## 3  3    EWR 2013     1   1    3 39.02 28.04 64.43      240   11.50780
## 4  4    EWR 2013     1   1    4 39.92 28.04 62.21      250   12.65858
## 5  5    EWR 2013     1   1    5 39.02 28.04 64.43      260   12.65858
## 6  6    EWR 2013     1   1    6 37.94 28.04 67.21      240   11.50780
##   wind_gust precip pressure visib           time_hour   normTemp
## 1       NaN      0   1012.0    10 2013-01-01 06:00:00 -0.9130047
## 2       NaN      0   1012.3    10 2013-01-01 07:00:00 -0.9130047
## 3       NaN      0   1012.5    10 2013-01-01 08:00:00 -0.9130047
## 4       NaN      0   1012.2    10 2013-01-01 09:00:00 -0.8624083
## 5       NaN      0   1011.9    10 2013-01-01 10:00:00 -0.9130047
## 6       NaN      0   1012.4    10 2013-01-01 11:00:00 -0.9737203

As we might have noticed thanks to the way the result is printed, a standard data frame is returned, as opposed to tibbles returned by most sparklyr operations.

It is important to note that using dbGetQuery() automatically computes and collects the results to the R session. This is in contrast with the dplyr approach which constructs the query and only collects the results to the R session when collect() is called, or computes them when compute() is called.

We will now examine 2 options to use the prepared query lazily and without collecting the results into the R session.

Invoking sql on a Spark session object

Without going into further details on the invoke() functionality of sparklyr which we will focus on in the fourth installment of the series, if the desire is to have a “lazy” SQL that does not get automatically computed and collected when called from R, we can invoke a sql method on a SparkSession class object.

The method takes a string SQL query as input and processes it using Spark, returning the result as a Spark DataFrame. This gives us the ability to only compute and collect the results when desired:

# Use the query "lazily" without execution:
normalized_lazy_ds <- sc %>%
  spark_session() %>%
  invoke("sql",  normalize_temp_query)
normalized_lazy_ds
## 
##   org.apache.spark.sql.Dataset
##   [id: int, origin: string ... 15 more fields]
# Collect when needed:
normalized_lazy_ds %>% collect()
## # A tibble: 26,115 x 17
##       id origin  year month   day  hour  temp  dewp humid wind_dir
##                 
##  1     1 EWR     2013     1     1     1  39.0  26.1  59.4      270
##  2     2 EWR     2013     1     1     2  39.0  27.0  61.6      250
##  3     3 EWR     2013     1     1     3  39.0  28.0  64.4      240
##  4     4 EWR     2013     1     1     4  39.9  28.0  62.2      250
##  5     5 EWR     2013     1     1     5  39.0  28.0  64.4      260
##  6     6 EWR     2013     1     1     6  37.9  28.0  67.2      240
##  7     7 EWR     2013     1     1     7  39.0  28.0  64.4      240
##  8     8 EWR     2013     1     1     8  39.9  28.0  62.2      250
##  9     9 EWR     2013     1     1     9  39.9  28.0  62.2      260
## 10    10 EWR     2013     1     1    10  41    28.0  59.6      260
## # … with 26,105 more rows, and 7 more variables: wind_speed ,
## #   wind_gust , precip , pressure , visib ,
## #   time_hour , normTemp 

Using tbl with dbplyr’s sql

The above method gives us a reference to a Java object as a result, which might be less intuitive to work with for R users. We can also opt to use dbplyr’s sql() function in combination with tbl() to get a more familiar result.

Note that when printing the below normalized_lazy_tbl, the query gets partially executed to provide the first few rows. Only when collect() is called the entire set is retrieved to the R session:

# Nothing is executed yet
normalized_lazy_tbl <- normalize_temp_query %>%
  dbplyr::sql() %>%
  tbl(sc, .)

# Print the first few rows
normalized_lazy_tbl
## # Source: spark [?? x
## #   2]
##       id id_div_5
##        
##  1     1        0
##  2     2        0
##  3     3        0
##  4     4        0
##  5     5        1
##  6     6        1
##  7     7        1
##  8     8        1
##  9     9        1
## 10    10        2
## # … with more rows

Even though the numeric value of the results is correct here, we may still notice that the class of the returned id_div_5 column is actually numeric instead of integer. Such is the life of developers using data processing interfaces.

When portability is important

Since the languages that provide interfaces to Spark are not limited to R and multi-language setups are quite common, another reason to use SQL statements directly is the portability of such solutions. A SQL statement can be executed by interfaces provided for all languages – Scala, Java, and Python, without the need to rely on R-specific packages such as dbplyr.

References

To leave a comment for the author, please follow the link and comment on their blog: Jozef's Rblog.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.

Search R-bloggers

Sponsors

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)