A new ‘boto3’ Amazon Athena client wrapper with dplyr async query support

[This article was first published on R – rud.is, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

A previous post explored how to deal with Amazon Athena queries asynchronously. The function presented is a beast, though it is on purpose (to provide options for folks).

In reality, nobody really wants to use rJava wrappers much anymore and dealing with icky Python library calls directly just feels wrong, plus Python functions often return truly daft/ugly data structures. R users deserve better than that.

R is not a first-class citizen in Amazon-land and while the cloudyr project does a fine job building native-R packages for various Amazon services, the fact remains that the official Amazon SDKs are in other languages. The reticulate package provides an elegant interface to Python so it seemed to make sense to go ahead and wrap the boto3 Athena client into something more R-like and toss in the collect_async() function for good measure.

Dependencies

I forced a dependency on Python 3.5 because friends don’t let friends rely on dated, fragmented ecosystems. Python versions can gracefully (mostly) coexist so there should be no pain/angst associated with keeping an updated Python 3 environment around. As noted in the package, I highly recommend adding RETICULATE_PYTHON=/usr/local/bin/python3 to your R environment (~/.Renviron is a good place to store it) since it will help reticulate find the proper target.

If boto3 is not installed, you will need to do pip3 install boto3 to ensure you have the necessary Python module available and associated with your Python 3 installation.

It may seem obvious, but an Amazon AWS account is also required and you should be familiar with the Athena service and AWS services in general. Most of the roto.athena functions have a set of optional parameters:

  • aws_access_key_id
  • aws_secret_access_key
  • aws_session_token
  • region_name
  • profile_name

Ideally, these should be in setup in the proper configuration files and you should let boto3 handle the details of retrieving them. One parameter you will see used in many of my examples is `profile_name = “personal”. I have numerous AWS accounts and manage them via the profile ids. By ensuring the AWS configuration files are thoroughly populated, I avoid the need to load and pass around the various keys and/or tokens most AWS SDK API calls require. You can read more about profile management in the official docs: 1, 2.

Usage

The project README and package manual pages are populated and have a smattering of usage examples. It is likely you will really just want to execute a manually prepared SQL query and retrieve the results or do the dplyr dance and collect the results asynchronously. We’ll cover both of those use-cases now, starting with a manual SQL query.

If you have not deleted it, your Athena instance comes with a sampledb that contains an elb_logs table. We’ll use that for our example queries. First, let’s get the packages we’ll be using out of the way:

library(odbc)
library(DBI) # for dplyr access later
library(odbc) # for dplyr access later
library(roto.athena) # hrbrmstr/roto.athena on gh or gl
library(tidyverse) # b/c it rocks

Now, we’ll prepare and execute the query. This is a super-simple one:

query <- "SELECT COUNT(requestip) AS ct FROM elb_logs"

start_query_execution(
  query = query,
  database = "sampledb",
  output_location = "s3://aws-athena-query-results-redacted",
  profile = "personal"
) -> qex_id

The qex_id contains the query execution id. We can pass that along to get information on the status of the query:

get_query_execution(qex_id, profile = "personal") %>%
  glimpse()
## Observations: 1
## Variables: 10
## $ query_execution_id  <chr> "7f8d8bd6-9fe6-4a26-a021-ee10470c1048"
## $ query               <chr> "SELECT COUNT(requestip) AS ct FROM elb_logs"
## $ output_location     <chr> "s3://aws-athena-query-results-redacted/7f...
## $ database            <chr> "sampledb"
## $ state               <chr> "RUNNING"
## $ state_change_reason <chr> NA
## $ submitted           <chr> "2018-07-20 11:06:06.468000-04:00"
## $ completed           <chr> NA
## $ execution_time_ms   <int> NA
## $ bytes_scanned       <dbl> NA

If the state is not SUCCEEDED then you’ll need to be patient before trying to retrieve the results.

get_query_results(qex_id, profile = "personal")
## # A tibble: 1 x 1
##   ct             
##   
## 1 4229

Now, we’ll use dplyr via the Athena ODBC driver:

DBI::dbConnect(
  odbc::odbc(),
  driver = "/Library/simba/athenaodbc/lib/libathenaodbc_sbu.dylib",
  Schema = "sampledb",
  AwsRegion = "us-east-1",
  AwsProfile = "personal",
  AuthenticationType = "IAM Profile",
  S3OutputLocation = "s3://aws-athena-query-results-redacted"
) -> con

elb_logs <- tbl(con, "elb_logs")

I've got the ODBC DBI fragment in a parameterized RStudio snippet and others may find that as a time-saver if you're not doing that already.

Now to build and submit the query:

mutate(elb_logs, tsday = substr(timestamp, 1, 10)) %>%
  filter(tsday == "2014-09-29") %>%
  select(requestip, requestprocessingtime) %>%
  collect_async(
    database = "sampledb",
    output_location = "s3://aws-athena-query-results-redacted",
    profile_name = "personal"
  ) -> qex_id

As noted in the previous blog post, collect_async() turn the dplyr chain into a SQL query then fires off the whole thing to start_query_execution() for you and returns the query execution id:

get_query_execution(qex_id, profile = "personal") %>%
  glimpse()
## Observations: 1
## Variables: 10
## $ query_execution_id  <chr> "95bd158b-7790-42ba-aa83-e7436c3470fe"
## $ query               <chr> "SELECT \"requestip\", \"requestprocessing...
## $ output_location     <chr> "s3://aws-athena-query-results-redacted/95...
## $ database            <chr> "sampledb"
## $ state               <chr> "RUNNING"
## $ state_change_reason <chr> NA
## $ submitted           <chr> "2018-07-20 11:06:12.817000-04:00"
## $ completed           <chr> NA
## $ execution_time_ms   <int> NA
## $ bytes_scanned       <dbl> NA

Again, you'll need to be patient and wait for the state to be SUCCEEDED to retrieve the results.

get_query_results(qex_id, profile = "personal")
## # A tibble: 774 x 2
##    requestip       requestprocessingtime
##    <chr>                           <dbl>
##  1 255.48.150.122              0.0000900
##  2 249.213.227.93              0.0000970
##  3 245.108.120.229             0.0000870
##  4 241.112.203.216             0.0000940
##  5 241.43.107.223              0.0000760
##  6 249.117.98.137              0.0000830
##  7 250.134.112.194             0.0000630
##  8 250.200.171.222             0.0000540
##  9 248.193.76.218              0.0000820
## 10 250.57.61.131               0.0000870
## # ... with 764 more rows

You can also use the query execution id to sync the resultant CSV from S3. Which one is more performant is definitely something you'll need to test since it varies with AWS region, result set size, your network connection and other environment variables. One benefit of using get_query_results() is that it uses the column types to set the data frame column types appropriately (I still need to setup a full test of all possible types so not all are handled yet).

Kick the tyres

The package is up on both GitLab and GitHub and any and all feedback (i.e. Issues) or tweaks (i.e. PRs) are most welcome.

To leave a comment for the author, please follow the link and comment on their blog: R – rud.is.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)