Scaling H2O analytics with AWS and p(f)urrr (Part 3)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
H2O + AWS + purrr (Part III)
This is the final installment of a three part series that looks at how we can leverage AWS, H2O and purrr
in R to build analytical pipelines. In the previous posts I looked at starting up the environment through the EC2 dashboard on AWS’ website. The other aspect we looked at, in Part II, was how we can use purrr
to train models using H2O’s awesome api. Here are the links to these first two posts:
For the final blogpost, I will look at how we can use boto3
to start up an EC2 instance programmatically and then connect to it through furrr
. For this post I will spin up a spot EC2 instance and show you 2 types of calculations:
- Simple sum function
- H2O training function
Do Note I will not show how to set up reticulate (which we need for boto3
), but here is a good place to start if you struggling.
The packages we will use in this post:
library(tidyverse)
library(reticulate)
library(furrr)
library(glue)
library(tictoc)
1
Use boto3 to start EC2 instance
To start up an instance remotely, you need to specify your AWS credentials somewhere. I find the easiest way to do this is to place the credentials in your R environment using usethis::edit_r_environ()
. You will need to specify 3 separate variables:
- AWS_ACCESS_KEY_ID=XXXXXXXXXXX
- AWS_SECRET_ACCESS_KEY=XXXXXXXXXXXXXX
- AWS_DEFAULT_REGION=us-east-2
To generate these, you can log into your AWS dashboard, go to the IAM (Identity and Access Management) dashboard and select the Users
tab. On the Users
tab, add a user and also the administration rights that you want the user to have. Remember to restart R once you have filled in the access key information in the .Renviron file for it to take effect.
At this point, those familiar with cloudyr
suite is probably asking – “This is exactly the same as library(aws.ec2)
, so why use boto3?“. Well, to be honest, I was using aws.ec2
for a while, but I find spot-instances, which the current version of aws.ec2
does not support. In addition I found that boto3
has some other functionalitue – which I prefer. For a full list of boto3
functions to interact with an EC2 instance, have a look at the reference manual.
To import python
’s boto3
, we use reticulate::import
:
boto <- import("boto3") # Import EC2 client ec2 <- boto$client('ec2') ec2
The ec2
variable you see here is of class <botocore.client.EC2>
. It is what we will use to interact with AWS in a programmatic fashion. Next, lets kick off a new spot-instance. I will use the c4.4xlarge
instance for the remainder of the post:
- 16 cores
- 30GB RAM
- EBS Only
- Spot Price: $0.1448/h
Note I also specify the ImageId
that I want to use. If you remember from part I and part II, I built an AMI that already has H2O and caret installed: ami-0157656a8c5b46458. The function has multiple other options you can read up on in the reference manual. If you have a prefered SecurityGroupIds
, you can also add that into the request as I did here.
response <- ec2$request_spot_instances( InstanceCount = 1L, LaunchSpecification= list( ImageId = 'ami-0157656a8c5b46458', InstanceType = 'c4.4xlarge', SecurityGroupIds = list("sg-0147497a63ft8ae432"), Placement = list(AvailabilityZone = "us-east-2a") ), SpotPrice = '1', Type = 'one-time' )
The response
variable returns a ton of information. The best way I found to deal with this is just to write a small function which captures the most useful information.
ec2_info <- function(req_id, type){ ec2_all <- ec2$describe_instances(Filters = list( list(Name = "spot-instance-request-id", Values = list(req_id))))$Reservations[[1]] if(type == "ip") return(ec2_all$Instances[[1]]$PublicIpAddress) if(type == "instance_id") return(ec2_all$Instances[[1]]$InstanceId) if(type == "cores") return(ec2_all$Instances[[1]]$CpuOptions$CoreCount) }
This function needs a request_id
as an input and will return either the ip
, instance_id
or number of cores
. It uses the id as input to a very useful filtering function as part of the EC2 module: describe_instances
. Again, this function can take multiple inputs, I prefer the request_id
.
To get the request_id
we can use purrr
:
xtract_reqid <- function(response){ tibble(req_id = response$SpotInstanceRequests %>% map_chr(~.x %>% .$SpotInstanceRequestId)) } req_id <- response %>% xtract_reqid() instance_information <- req_id %>% mutate( ip = map_chr(req_id, ec2_info, "ip"), instance_id = map_chr(req_id, ec2_info, "instance_id"), cores = map_dbl(req_id, ec2_info, "cores")*2 ) instance_information # Closing instance # ec2$terminate_instances(InstanceIds = list(instance_information$instance_id))
Now we have all the pieces, we can conduct some calculations using these machines. We will use furrr
to connect. I will show you how to connect to the EC2 instances using a Windows example - the reason being that I found it a bit more tricky than when working on Linux. To see how we can connect when we operating off of a linux machine, see the reference by Davis Vaughan, Matt Dancho.
Remember to specify where your private key file is located. If you don’t know what a private key is, I refer you to the first post if you need some help.
public_ip <- ec2_info(req_id, type = "ip") ssh_private_key_file <- "C:/Users/User/your_keypair.ppk"
A very important change that you need to take note of here is the fact that I use Plink to connect. The Windows connection I feel is a bit unstable and finicky. So I do recommend you work off of a linux machine if possible.
# Connect! cl <- future::makeClusterPSOCK( ## Public IP number of EC2 instance public_ip, ## User name (always 'ubuntu') user = "ubuntu", ## Use private SSH key registered with AWS rshcmd = c("plink", "-ssh", "-i", ssh_private_key_file), ## Set up .libPaths() for the 'ubuntu' user and ## install future/purrr/furrr packages rscript_args = c( "-e", shQuote("local({p <- Sys.getenv('R_LIBS_USER'); dir.create(p, recursive = TRUE, showWarnings = FALSE); .libPaths(p)})"), "-e", shQuote("install.packages(c('future', 'furrr'))") ), dryrun = F, verbose = TRUE, homogeneous = FALSE )
The makeClusterPSOCK
function will become your workhorse function to connect to remote servers. The biggest changes you will make will be in the rscript_args
argument of the function. As you will see later, we can add commands to this argument to initiate the H2O server we want to use for estimation.
To confirm that the processes are running in parallel and that we have an increase in speed, we will first run the sequence sequentially and then parallel. Something that was made clear to my by Davis Vaughan on why we use nested future_map
s are because when you start spinning up more than 1 instance, you can shard the data across servers and then you must distribute it over cores. So imagine you want to train three models: rf, gbm and deeplearning. You can then spin up three different EC2
instances and distributed the training of the models across mutliple machines. But lets first try and get basic arithmetic going.
f <- function(x){ x <- rnorm(x) rm(x) return("success") } plan(list(tweak(cluster, workers = cl), tweak(sequential))) tic() x <- future_map(1, ~{ rep(1e6, 500) %>% future_map(~f(.x)) }) sec_time <- toc()
The sequential method takes around 40 seconds to complete. Lets use the plan
function to do the next estimation in parallel:
plan(list(tweak(cluster, workers = cl), tweak(multiprocess, workers = 15))) tic() x <- future_map(1, ~{ rep(1e6, 500) %>% future_map(~f(.x)) }) par_time <- toc()
The time difference is staggering, 8 seconds in parallel. We can confirm that the process is running in parallel by SSHing into the remote machine and using htop
.
Using H2O with remote server
Its all good and well doing random number generation uing a large machine, but in the end we would most likely want to redirect our compuational burden to the remote server, especially for machine learning estimations.
To do this we need to change one small line in the connection function: makeClusterPSOCK
. I wrap the makeClusterPSOCK
function so that it is easier to use. This new function is called h2o.makeEC2ClusterPsock
and it will be our primary function to start and connect to remote server initializing h2o
as it connects.
h2o.makeEC2ClusterPsock <- function(public_ip, ssh_private_key_file, max_mem_size = "10g", dryrun = F, h2o = T){ if(h2o){ r_command <- c( "-e", shQuote(".libPaths('/home/rstudio/R/x86_64-pc-linux-gnu-library/3.5')"), "-e", shQuote(glue("if (!require('furrr')) install.packages('furrr');library(h2oEnsemble);system('pkill java');h2o.init(nthreads = -1, max_mem_size = '{max_mem_size}');h2o.removeAll();gc()")) ) } else { r_command <- c( "-e", shQuote(".libPaths('/home/rstudio/R/x86_64-pc-linux-gnu-library/3.5')"), "-e", shQuote("if (!require('furrr')) install.packages('furrr')") ) } makeClusterPSOCK( # Public IP number of EC2 instance workers = public_ip, # User name (always 'ubuntu') user = "ubuntu", # Use private SSH key registered with AWS rshcmd = c("plink", "-ssh", "-i", ssh_private_key_file), # Check available memory # Set up .libPaths() for the 'ubuntu' user and # install furrr rscript_args = r_command, # Switch this to TRUE to see the code that is run on the workers without # making the connection dryrun = dryrun, verbose = T ) }
We also need to remember to send the data to the remote server so it can be used in the estimation. This is where furrr
and futures
will be used. I am going to use the Higgs dataset from PART II in the following example. Its important to coerce the data using the as.h2o
function using a future_map
as this sends the data to the remote server.
train <- read_csv("https://s3.amazonaws.com/erin-data/higgs/higgs_train_5k.csv") %>% mutate(response = as.factor(response)) %>% list test <- read_csv("https://s3.amazonaws.com/erin-data/higgs/higgs_test_5k.csv") %>% mutate(response = as.factor(response)) %>% list cl <- h2o.makeEC2ClusterPsock(public_ip = public_ip, max_mem_size = "120g", ssh_private_key_file = ssh_private_key_file, dryrun = F, h2o = T) plan(cluster, workers = cl) training_df <- tibble(model = "gbm", train, test) %>% mutate(h2o_frame_name = "train.hex", h2o_frame_name_test = "test.hex") %>% ##### USE FUTURE MAP TO SEND THE TRAINING DF TO REMOTE SERVER mutate(train = future_map2(train, h2o_frame_name, ~as.h2o(.x, destination_frame = .y))) %>% mutate(test = future_map2(test, h2o_frame_name_test, ~as.h2o(.x, destination_frame = .y)))
We can confirm that the h2o server is running by using htop
again. This time we should see a lot of java
processes running. If we inspect the training_df
, you will notice that the class of the training and testing frames have been transformed to H2OFrame
. These frames are now sitting on the remote server. How cool is that?
training_df # A tibble: 1 x 4 # train test h2o_frame_name h2o_frame_name_test # <list> <list> <chr> <chr> # <S3: H2OFrame> <S3: H2OFrame> train.hex test.hex
The last step is to train this badboy.
I need to use the future_map
function to send the train commands to the remote server. The way to do this is to write a wrapper function for training a specific model.
h2o.model <- function(model, training, validation, Y, X, folds = 2){ if(model == "gbm"){ return(h2o.gbm(y = Y, x = X, training_frame = training, validation_frame = validation, ntrees = 100, stopping_metric = "AUTO", stopping_tolerance = 0.01, max_depth = 10, min_rows = 2, learn_rate = 0.1, stopping_rounds = 2, learn_rate_annealing = 0.99, nfolds = folds)) } } ml.Res <- training_df %>% mutate(trained_models = future_pmap(list(model, train, test), ~h2o.model(..1, training = ..2, validation = ..3, Y = "response", X = setdiff(names(..2), "response"), folds = 5))) ml.Res$trained_models
Your model has now been trained using H2O
and AWS
on a 16 core 30GB machine, all from your local workstation! This exercise can be obviously be expanded for much more intensive testing and experimentation. But for now, lets leave it there.
Always always always remember to shut down the instance/s after you are finished!
ec2$terminate_instances(InstanceIds = list(instance_information$instance_id))
Conclusion
If you have gotten this far over the last few weeks, I hope that you have learned as much as I did on how to scale your analytics to almost any size. The power to harness really impressive machines for any estimation problem has changed the way I approach analytics and research in general. The problem is no longer bounded by the size of the machine I have, but rather my experimental design.
I am hoping to talk about this idea of AWS + R at SatRday Johannesburg, South Africa the 6th of April. So if you are in the area, remember to buy some tickets and see how one can expand this even further!
- This will just be used to capture function execution time ^
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.