Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

In blog posts a few months ago, I proposed an alternative to MapReduce, e.g. to Hadoop, which I called “Snowdoop.” I pointed out that systems like Hadoop and Spark are very difficult to install and configure, are either too primitive (Hadoop)  or too abstract (Spark) to program, and above all, are SLOW. Spark is of course a great improvement on Hadoop, but still suffers from these problems to various extents.

The idea of Snowdoop is to

• retain the idea of Hadoop/Spark to work on top of distributed file systems (“move the computation to the data rather than vice versa”)
• work purely in R, using familiar constructs
• avoid using Java or any other external language for infrastructure
• sort data only if the application requires it

I originally proposed Snowdoop just as a concept, saying that I would slowly develop it into an actual package. I later put the beginnings of a Snowdoop package in a broader package, partools, which also contains other parallel computation utilities, such as a debugging aid for the cluster portion of R’s parallel package (which I like to call Snow, as it came from the old snow package).

I remain convinced that Snowdoop is a much more appropriate tool for many R users who are currently using Hadoop or Spark. The latter two, especially Spark, may be a much superior approach for those with very large clusters, thus with a need for built-in fault tolerance (Snowdoop provides none on its own), but even the Hadoop Wiki indicates that many MapReduce users actually work on clusters of very modest size.

So, in the last few weeks, I’ve added quite a bit to Snowdoop, and have run more timing tests. The latter are still very preliminary, but they continue to be very promising. In this blog post, I’ll give an extended example of usage of the latest version, which you can obtain from GitHub. (I do have partools on CRAN as well, but have not updated that yet.)

The data set in this example will be the household power usage data set from UCI. Most people would not consider this “Big Data,” with only about 2 million rows and 9 columns, but it’s certainly no toy data set, and it will make serve well for illustration purposes.

But first, an overview of partools:

• distributed approach, either persistent (distributed files) or quasi-persistent (distributed objects at the cluster nodes, in memory but re-accessed repeatedly)
• most Snowdoop-specific function names have the form file*
• most in-memory functions have names distrib*
• misc. functions, e.g. debugging aid and “Software Alchemy”

Note that partools, therefore, is more than just Snowdoop.  One need not use distributed files, and simply use the distrib* functions as handy ways to simplify one’s parallel code.

So, here is a session with the household power data.  I’m running on a 16-core machine, using 8 of the cores. For convenience, I changed the file name to hpc.txt. We first create the cluster, and initialize partools, e.g. assign an ID number to each cluster node:


> cls <- makeCluster(8)
> setclsinfo(cls) # partools call

Next we split the file into chunks, using the partools function filesplit() (done only once, not shown here). This creates files hpc.txt.1hpc.txt.2 and so on (in this case, all on the same disk). Now have each cluster node read in its chunk:

> system.time(clusterEvalQ(cls,hp <-    read.table(filechunkname("hpc.txt",1),
user system elapsed
9.468 0.270 13.815


(Make note of that time.) The partools function filechunkname() finds the proper file chunk name for the calling cluster node, based on the latter’s ID. We now have a distributed data frame, named hp at each cluster node.

The package includes a function distribagg(), a distributed analog of R’s aggregate() function.  Here is an example of use, first converting the character variables to numeric:

> clusterEvalQ(cls,for (i in 3:9) hp[,i] <- as.numeric(hp[,i]))
> system.time(hpoutdis <-distribagg(cls,"x=hp[,3:6],
by=list(hp[,7],hp[,8])","max",2))
user system elapsed
0.194 0.017 9.918


As you can see, the second and third arguments to distribagg() are those of aggregate(), in string form.

Now let’s compare to the serial version:

> system.time(hp <- read.table("hpc.txt",header=TRUE,sep=";",
stringsAsFactors=FALSE))
user system elapsed
22.343 0.175 22.529
> for (i in 3:9) hp[,i] <- as.numeric(hp[,i])
> system.time(hpout <- aggregate(x=hp[,3:6],by=list(hp[,7],hp[,8]),FUN=max))
user system elapsed
76.348 0.184 76.552



So, the computation using distribagg() was almost 6 times faster than serial, a good speed for 8 cluster nodes. Even the input from disk was more than twice as fast, in spite of the files being on the same disk, going through the same operating system.