A guide to retrieval and processing of data from relational database systems using Apache Spark and JDBC with R and sparklyr

[This article was first published on Jozef's Rblog, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Introduction

The {sparklyr} package lets us connect and use Apache Spark for high-performance, highly parallelized, and distributed computations. We can also use Spark’s capabilities to improve and streamline our data processing pipelines, as Spark supports reading and writing from many popular sources such as Parquet, Orc, etc. and most database systems via JDBC drivers.

In this post, we will explore using R to perform data loads to Spark and optionally R from relational database management systems such as MySQL, Oracle, and MS SQL Server and show how such processes can be simplified. We will also provide reproducible code via a Docker image, such that interested readers can experiment with it easily.

Getting test data into a MySQL database

If you are interested only in the Spark loading part, feel free to skip this paragraph.

For a fully reproducible example, we will use a local MySQL server instance as due to its open-source nature it is very accessible. We will use the {DBI} and {RMySQL} packages to connect to the server directly from R and populate a database with data provided by the {nycflights13} package that we will later use for our Spark loads.

Let us write the flights data frame into the MySQL database using {DBI} and call the newly created table test_table:

test_df <- nycflights13::flights

# Create a connection to database `testdb`
con <- DBI::dbConnect(
  drv = RMySQL::MySQL(),
  host = "localhost",
  dbname = "testdb",
  user = "rstudio",
  password = "pass"
)

# Write our `test_df` into a table called `test_table`
DBI::dbWriteTable(con, "test_table", test_df, overwrite = TRUE)

# Close the connection
DBI::dbDisconnect(con)

Now we have our table available and we can focus on the main part of the article.

Using JDBC to connect to database systems from Spark

Getting a JDBC driver and using it with Spark and sparklyr

Since Spark runs via a JVM, the natural way to establish connections to database systems is using Java Database Connectivity (JDBC). To do that, we will need a JDBC driver which will enable us to interact with the database system of our choice. For this example, we are using MySQL, but we provide details on other RDBMS later in the article.

Downloading and extracting the connector jar

With a bit of online search, we can download the driver and extract the contents of the zip file:

mkdir -p $HOME/jars
wget -q -t 3 \
  -O $HOME/jars/mysql-connector.zip \
  https://cdn.mysql.com/Downloads/Connector-J/mysql-connector-java-8.0.21.zip 
unzip -q -o \
  -d $HOME/jars \
  $HOME/jars/mysql-connector.zip

Now the file we are most interested in for our use case the .jar file that contains classes necessary to establish the connection. Using R, we can locate the extracted jar file(s), for example using the dir() function:

jars <- dir("~/jars", pattern = "jar$", recursive = TRUE, full.names = TRUE)
basename(jars)
## [1] "mysql-connector-java-8.0.21.jar"

Connecting using the jar

Next we need to tell {sparklyr} to use that resource when establishing a Spark connection, for example by adding a sparklyr.jars.default element with the paths to the necessary jar files to the config list and finally establish the Spark connection using our config:

config <- list(sparklyr.jars.default = jars)
sc <- sparklyr::spark_connect("local", config = config)

Retrieving data from a database with sparklyr

With the Spark connection established, we can connect to our MySQL database from Spark and retrieve the data. {sparklyr} provides a handy spark_read_jdbc() function for this exact purpose. The API maps closely to the Scala API, but it is not very explicit in how to set up the connection. The key here is the options argument to spark_read_jdbc(), which will specify all the connection details we need.

Setting the options argument of spark_read_jdbc()

First, let us create a jdbcConnectionOpts list with the basic connection properties. These are the connection URL and the driver. Below we also explictly specify the user and password, but these can usually also be provided as part of the URL:

# Connection options
jdbcConnectionOpts <- list(
  url = "jdbc:mysql://localhost:3306/testdb",
  driver = "com.mysql.cj.jdbc.Driver",
  user = "rstudio", 
  password = "pass"
)

The last bit of information we need to provide is the identification of the data we want to extract once the connection is established. For this, we can use one of two options:

  • dbtable - in case we want to create a Spark DataFrame by extracting contents of a specific table
  • query - in case we want to create a Spark DataFrame by executing a SQL query

Loading a specific database table

First let us go with the option to load a database table that we populated with the flights earlier and named test_table, putting it all together and loading the data using spark_read_jdbc():

# Other options specific to the action
jdbcDataOpts <- list(dbtable = "test_table")

# Use spark_read_jdbc() to load the data
test_tbl <- sparklyr::spark_read_jdbc(
  sc = sc,
  name = "test_table",
  options = append(jdbcConnectionOpts, jdbcDataOpts),
  memory = FALSE
)

# Print some records
test_tbl
## # Source: spark<test_table> [?? x 20]
##    row_names  year month   day dep_time sched_dep_time dep_delay arr_time
##    <chr>     <dbl> <dbl> <dbl>    <dbl>          <dbl>     <dbl>    <dbl>
##  1 1          2013     1     1      517            515         2      830
##  2 2          2013     1     1      533            529         4      850
##  3 3          2013     1     1      542            540         2      923
##  4 4          2013     1     1      544            545        -1     1004
##  5 5          2013     1     1      554            600        -6      812
##  6 6          2013     1     1      554            558        -4      740
##  7 7          2013     1     1      555            600        -5      913
##  8 8          2013     1     1      557            600        -3      709
##  9 9          2013     1     1      557            600        -3      838
## 10 10         2013     1     1      558            600        -2      753
## # … with more rows, and 12 more variables: sched_arr_time <dbl>,
## #   arr_delay <dbl>, carrier <chr>, flight <dbl>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <chr>

We provided the following arguments:

  • sc is the Spark connection that we established using the config that includes necessary jars
  • name is a character string with the name to be assigned to the newly generated table within Spark SQL, not the name of the source table we want to read from our database
  • options is a list with both the connection options and the data-related options, so we use append() to combine the jdbcConnectionOpts and jdbcDataOpts lists into one
  • memory is a logical that tells Spark whether we want to cache the table into memory. A bit more on that and some performance implications below

Executing a query instead

We mentioned above that apart from just loading a table, we can also choose to execute a SQL query and use its result as the source for our Spark DtaFrame. Here is a simple example of that.

# Use `query` instead of `dbtable`
jdbcDataOpts <- list(
  query = "SELECT * FROM test_table WHERE tailnum = 'N14228'"
)

# Use spark_read_jdbc() to load the data
test_qry <- sparklyr::spark_read_jdbc(
  sc = sc,
  name = "test_table",
  options = append(jdbcConnectionOpts, jdbcDataOpts),
  memory = FALSE
)

# Print some records
test_qry
## # Source: spark<test_table> [?? x 20]
##    row_names  year month   day dep_time sched_dep_time dep_delay arr_time
##    <chr>     <dbl> <dbl> <dbl>    <dbl>          <dbl>     <dbl>    <dbl>
##  1 1          2013     1     1      517            515         2      830
##  2 6570       2013     1     8     1435           1440        -5     1717
##  3 7111       2013     1     9      717            700        17      812
##  4 7349       2013     1     9     1143           1144        -1     1425
##  5 10593      2013     1    13      835            824        11     1030
##  6 13775      2013     1    16     1829           1730        59     2117
##  7 18967      2013     1    22     1902           1808        54     2214
##  8 19417      2013     1    23     1050           1056        -6     1143
##  9 19648      2013     1    23     1533           1529         4     1641
## 10 21046      2013     1    25      724            720         4     1000
## # … with more rows, and 12 more variables: sched_arr_time <dbl>,
## #   arr_delay <dbl>, carrier <chr>, flight <dbl>, tailnum <chr>,
## #   origin <chr>, dest <chr>, air_time <dbl>, distance <dbl>, hour <dbl>,
## #   minute <dbl>, time_hour <chr>

Note that the only element that changed is the jdbcDataOpts list, which now contains a query element instead of a dbtable element.

Other RDBM Systems

Our toy example with MySQL worked fine, but in practice, we might need to access data in other popular RDBM systems, such as Oracle, MS SQL Server, and others. The pattern we have shown above however remains, as the API design is the same regardless of the system in question.

In general, we will need 3 elements to successfully connect:

  1. A JDBC driver specified and the resources provided to {sparklyr} in the config argument of spark_connect(), usually in the form of paths to .jar files containing the necessary resources
  2. A connection URL that will depend on the system and other setup specifics
  3. Last but not least, all the technical and infrastructural prerequisites such as credentials with the proper access rights, the host being accessible from the Spark cluster, etc.

Now for some examples that we have worked with in the past and had success with.

Oracle

Oracle JDBC Driver

The drivers can be downloaded (after login) from Oracle’s website and the driver name usually is "oracle.jdbc.driver.OracleDriver". Make sure you use the appropriate version.

Using fully qualified host identification

hostName <- "0.0.0.0"
portNumber <- "1521"
serviceName <- "service_name"

jdbcConnectionOpts <- list(
  user = "username",
  password = "password",
  driver = "oracle.jdbc.driver.OracleDriver",
  fetchsize = "100000",
  url = paste0(
    "jdbc:oracle:thin:@//",
    hostName, ":", portNumber,
    "/", serviceName
  )
)

Using tnsnames.ora

The tnsnames.ora file is a configuration file that contains network service names mapped to connect descriptors for the local naming method, or net service names mapped to listener protocol addresses. With this in place, we can use just the service name instead of fully qualified host, port, and service identification, for example:

serviceName <- "service_name"

jdbcConnectionOpts <- list(
  user = "username",
  password = "password",
  driver = "oracle.jdbc.driver.OracleDriver",
  fetchsize = "100000",
  url = paste0("jdbc:oracle:thin:@", serviceName)
)

Parsing special data types

Note that the JDBC driver on its own may not be enough to parse all data types in an Oracle database. For instance, parsing the XMLType will very likely require xmlparserv2.jar, and xdb.jar along with the proper ojdbc*.jar.

MS SQL Server

MS SQL Server JDBC Driver

The drivers for different JRE versions can be downloaded from the Download Microsoft JDBC Driver for SQL Server website. Again, make sure that the JRE version matches the one you use in your environments.

MS SQL Server connection options

serverName <- "0.0.0.0"
portNumber  <- "1433"
databaseName <- "db_name"

jdbcConnectionOpts <- list(
  user = "username",
  password = "password",
  driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver",
  fetchsize = "100000",
  url = paste0(
    "jdbc:sqlserver://",
    serverName, ":", portNumber,
    ";databaseName=", databaseName
  )
)

Even more RDBM Systems

Logos of R, sparklyr, Spark and selected RDBMS systems

Logos of R, sparklyr, Spark and selected RDBMS systems

Vlad Mihalcea wrote a very useful article on JDBC Driver Connection URL strings which has the connection URL details for several other common database systems.

Some notes on performance

The memory argument

The memory argument to spark_read_jdbc() can prove very important when performance is of interest. What happens when using the default memory = TRUE is that the table in the Spark SQL context is cached using CACHE TABLE and a SELECT count(*) FROM query is executed on the cached table. This forces Spark to perform the action of loading the entire table into memory.

Depending on our use case, it might be much more beneficial to use memory = FALSE and only cache into Spark memory the parts of the table (or processed results) that we need, as the most time-costly operations usually are data transfers over the network. Transferring as little data as possible from the database into Spark memory may bring significant performance benefits.

This is a bit difficult to show with our toy example, as everything is physically happening inside the same container (and therefore the same file system), but differences can be observed even with this setup and our small dataset:

microbenchmark::microbenchmark(
  times = 10,
  setup = {
    library(dplyr)
    library(sparklyr)
    sparklyr::spark_disconnect_all()
    sc <- sparklyr::spark_connect("local", config = config)
  },
  
  # with memory=TRUE (the default)
  eager = {
    one <- sparklyr::spark_read_jdbc(
      sc = sc,
      name = "test",
      options = append(jdbcConnectionOpts, list(dbtable = "test_table"))
    ) %>%
      filter(tailnum == "N14228") %>%
      select(tailnum, distance) %>%
      compute("test")
  },

  # with memory=FALSE
  lazy = {
    two <- sparklyr::spark_read_jdbc(
      sc = sc,
      name = "test",
      options = append(jdbcConnectionOpts, list(dbtable = "test_table")),
      memory = FALSE
    ) %>% 
      filter(tailnum == "N14228") %>%
      select(tailnum, distance) %>%
      compute("test")
  }
)
# Unit: seconds
#  expr       min       lq     mean   median       uq      max neval
# eager 15.460844 16.24838 17.07560 17.03592 17.88299 18.73005    10
#  lazy  9.821039 10.12435 10.40718 10.42766 10.70024 10.97283    10

We see that the “lazy” approach that does not cache the entire table into memory has yielded the result around 41% faster. This is of course by no means a relevant benchmark for real-life data loads but can provide some insight into optimizing the loads.

Partitioning

Partitioning the data can bring a very significant performance boost and we will look into setting it up and optimizing it in detail in a separate article.

Running the code in this article

If you have Docker available, running the following should yield a Docker container with RStudio Server exposed on port 8787, so you can open your web browser at http://localhost:8787 to access it and experiment with the code. The user name is rstudio and the password is as you choose below:

docker run -d -p 8787:8787 -e PASSWORD=pass jozefhajnala/jozefio

References

To leave a comment for the author, please follow the link and comment on their blog: Jozef's Rblog.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)