I’ve been having some great fun parallelizing R code on Amazon’s cloud. Now that things are chugging away nicely, it’s time to document my foibles so I can remember not to fall into the same pits of despair again.
The goal was to perform lots of trails of a randomized statistical simulation. The jobs were independent and fairly chunky, taking from a couple of minutes up to 90 minutes or so. From each simulation, we got back a couple dozen numbers. We worked our way up to running a few thousand simulations at a time on 32 EC2 nodes.
The helpful folks of the Bioconductor group put together a CloudFormation template and AMI: Bioconductor parallel cluster with SSH. It’s a great way to get started hacking R in the cloud. All it takes is a mouse-click and an Amazon account.
Using the parallel package
Here’s a quick example using the parallel package to start up a cluster and fool around a bit.
library(parallel) help(package=parallel) ## create the cluster passing an IP address for ## the head node ## hostname -i works on Linux, but not on BSD ## descendants (like OS X) cl
That was easy. But, it wouldn’t be any fun if a few things didn’t go wrong.
Why 32 machines? As we scaled up to larger runs, I started hitting limits. The first was in the number of machines Amazon lets you start up.
According to the email Amazon sent me: “Spot Instances, which have an instance limit that is typically five times larger than your On-Demand Instance limit (On-Demand default is 20)…”
You can get the limits raised, but I’m a total cheapskate anyway, so I hacked Dan’s Cloudformation template to use spot instances, adding a “SpotPrice” property in the right place. Spot instances can go away, at any time, but they’re so much cheaper that it’s worth dealing with that.
Then, I prompty hit another limit:
Error in socketConnection ... all connections are in use
Apparently, there’s a limit built into R on the number of socketConnections you can open. It says, in src/main/connections.c:
#define NCONNECTIONS 128 /* snow needs one per slave node */
Sadly, my lust for more cores is doomed to be thwarted, for now. We can have a maximum of 128 workers, or only 32 instances with 4 cores apiece. Bummer.
Usually, the worker processes come up fine. But sometimes, they couldn’t connect to the head node. The symptom of this problem is that makePSOCKcluster hangs.
I did ps -aexww and stared at the following gobbledy-gook for a while:
/usr/local/lib/R/bin/exec/R --slave --no-restore -e parallel:::.slaveRSOCK() --args MASTER=ip-10-4-215-155 PORT=10187 OUT= TIMEOUT=2592000 METHODS=TRUE XDR=TRUE SED=/bin/sed R_INCLUDE_DIR=/usr/local/lib/R/include R_DEFAULT_PACKAGES=datasets,utils,grDevices,graphics,stats SHELL=/bin/bash SSH_CLIENT=10.4.215.155 46154 22 USER=ubuntu LD_LIBRARY_PATH=/usr/local/lib/R/lib:/usr/local/lib:/usr/lib/jvm/java-6-sun-126.96.36.199/jre/lib/amd64/server:/usr/lib/jvm/java-6-sun-188.8.131.52/jre/lib/amd64 PATH=/usr/local/sbin:/usr/local/bin:/usr/sbin:/usr/bin:/sbin:/bin:/usr/games MAIL=/var/mail/ubuntu PWD=/home/ubuntu R_SHARE_DIR=/usr/local/lib/R/share LANG=en_US.UTF-8 R_ARCH= HOME=/home/ubuntu SHLVL=1 LOGNAME=ubuntu LC_CTYPE=en_US.UTF-8 SSH_CONNECTION=10.4.215.155 46154 10.158.53.86 22 R_HOME=/usr/local/lib/R R_DOC_DIR=/usr/local/lib/R/doc
I confirmed that I could connect to the dud machines manually, and also from there back to the head node, like so:
ssh -i ~/.ssh/id_rsa [email protected]
The bug is resistant to rebooting and even terminating the dud node. Seemingly at random, somewhere between none and 3 machines out of 32 would turn out to be duds. How irritating!
Luckily, Dan from the Bioconductor group found the problem, and you can even see it, if you know where to look, in the afore-mentioned gobbledy-gook. The parameter MASTER=ip-10-4-215-155 means the worker has to do name resolution, which apparently sometimes fails. (See the notes under master in the docs for makePSOCKCluster)
We can give it an IP address, neatly bypassing any name resolution tar-pit:
Huge props to Dan for figuring that out and giving me a serious case of geek envy.
The LB in parLapplyLB stands for load balancing. It uses a simple and sensible strategy: give each worker one job, then when a worker is finished, give it another job, until all the jobs are assigned.
I think I saw cases where there were idle workers at a time when there were jobs that had not yet started. The only way that could happen is if the jobs were already assigned to a busy worker.
Looking at the code, that doesn’t look possible, but I have a theory. There’s an option in makePSOCKcluster to specify an outfile and outfile=”” sends stdout and stderr back to the head node. I thought that might be handy for debugging.
Next, consider the call stack for parLapplyLB (down is deeper):
- recvOneData / recvOneData.SOCKcluster
One could start to imagine that a chatty and long-running worker sending output back to the head node via the outfile=”” option would cause a socket to be readable before the job is done. So, another job gets submitted to that worker. Then workers become available and go idle for lack of work, which has already been hogged up (but not started) by the chatty worker.
If it’s only a weird interaction between outfile=”” and parLapplyLB, it’s not that big of a deal. A more unfortunate property of parLapplyLB is what happens when a worker goes away; say, a connection is dropped or a spot instance is terminated. The result of that is that parLapplyLB bombs out with a socket error, and all work on all workers is lost. Doh!
For this reason, I had the workers write out checkpoints and collected them onto the head node periodically. This way, getting a return value back from parLapplyLB wasn’t all that critical. And that brings me to the topic of automation.
Automation is great. Don’t tell anyone, but I take something of a lazy approach to automation: starting with a hacky mess that just barely works with lots of manual intervention and gradually refining it as needed, in the general direction of greater automation and more robust error handling.
Here are a few half-hearted attempts:
- In lieu of a progress bar, I hacked up a little R script to process checkpoints and cough up summary statistics.
- I pulled some ruby code out of Dan’s bootstrap script that queries the EC2 API for the currently running worker nodes.
- A little snippet of Python run from cron grabs checkpoint files from the workers every 20 minutes.
All this is closer to a hacky mess than clean automation. A lot of babysitting is still required.
Features I’d like to see
- shared EBS volume (via NFS?) for packages, checkpoints and results
- a queuing system that doesn’t require persistent socket connections to the workers
- async-lapply – returns a list of futures, which can be used to ask for status and results
- progress monitoring on head node
- support for scalable pools of spot instances that can go away at any time.
- grow and shrink pool according to size of queue
The right tool for 10,000 jobs
There are many ways to parallelize R. The approach in this document uses the parallel package and RStudio on Amazon EC2. The parallel package is nice for interactive development and has the advantage of keeping R worker processes alive rather than starting a new process for each job. But, this approach only works up to a point.
Different styles include interactive vs. batch, implicit vs. explicit and reformulating the problem as a map/reduce job. For map/reduce style computations, look at Rhipe. R at 12,000 Cores describes the “Programming with Big Data in R” project (pbdR). For batch jobs, Starcluster may be a better choice.
Starcluster provides several of those features, albeit with the caveat of restarting R for each job. Having pushed R/parallel to its limits, I intend try Starcluster a little more. So far I’ve only learned the term-of-art for when your Starcluster job goes horribly wrong – that’s called a starclusterfuck.