Playing Map() and Reduce() in R – By-Group Calculation

[This article was first published on S+/R – Yet Another Blog in Statistical Computing, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Clojure is such an interesting programming language that it can not only enhance our skill set but also change the way how we should write the program. After learning Clojure, I can’t help thinking about how to employ the functional programming and MapReduce paradigm to improve our experience with other programming languages, e.g. R in my case.

When calculating the statistical summary in R, we would go straight to aggregate() or sqldf() function without a second thought. Such by-group calculations seem so simple that we often might not bother to think about the problem itself schematically. Let’s take a look at how to approach this problem in Clojure by using the code below that I copied from https://statcompute.wordpress.com/2018/03/18/do-we-really-need-dataframe-in-clojure.

(def country_sum
  (map (fn [[billingcountry total]]
    {:billiingcountry billingcountry :total (reduce + (map :total total))})
    (group-by :billingcountry inv)))

Although the code looks a little awkward with lots of parenthesis, the idea is very clear and makes sense. We first partition the data into multiple pieces based on groups that we’d like to summarize and then define an anonymous function to sum up the invoice amount, by using a reduce() function, that we used a map() function extracting from the original data, e.g. a list of maps in this case. The whole calculation logic is a loyal reflection of MapReduce.

Now let’s come back to R and think about how to re-frame the solution for the by-group calculation. Using data(iris) as an example, we should first partition the data.frame by “species” with split() so as to convert the data.frame into a list of data.frames by groups. If I apply the class() function to each item in the list “lst1”, we should be able to see three data.frames.

data(iris)
lst1 <- split(iris, iris$Species)
Map(class, lst1)
#$setosa
#[1] "data.frame"
#$versicolor
#[1] "data.frame"
#$virginica
#[1] "data.frame"

After the data partition, we can proceed to calculate the by-group summary with each data.frame in the list. Luckily enough, because the data.frame is generically constructed as a collection of columns instead of rows, we don’t need to use the map operation to extract values from corresponding rows. Instead, we can directly calculate the column summary based on each partitioned data.frame. As shown below, the code is straightforward yet flexible given the use of an anonymous function, which can be customized to accommodate any arbitrary calculation.

Map(function(x) data.frame(grp = unique(x$Species), sl_avg = mean(x$Sepal.Length), sw_avg = mean(x$Sepal.Width)), lst1)
#$setosa
#     grp sl_avg sw_avg
#1 setosa  5.006  3.428
#$versicolor
#         grp sl_avg sw_avg
#1 versicolor  5.936   2.77
#$virginica
#        grp sl_avg sw_avg
#1 virginica  6.588  2.974

Up to now, the problem has been successfully solved. However, if we have a closer look at the solution, it doesn’t take long for us to notice that the calculation in one group is completely orthogonal to the calculation in another group and therefore the by-group calculation doesn’t have to be in a sequential order. In addition, the partitioned data consumes significantly more memory than the original one, which is not an issue for small data sets but could be a potential concern for big data sets. After all, there is no need to have the data always stored in the memory, as long as it is available for us when needed.

To address the first observation, we would bring in the parallel computation by using the parallel::mcMap() function and kicking off multiple CPUs simultaneously. For the second concern, we can introduce the concept of Future, which is the abstraction for a data.frame instead of the data.frame physically stored in the memory. The future, once created with future::future() function, would remain unresolved until we want it to be resolved in the computation by using the future::value() function, at the computing cost for evaluating the future.

With everything put together, below is the final code with the parallel map and the future abstraction.

pkg <- list("parallel", "future")
mapply(function(x) require(x, character.only = T), pkg)
ft <- future({split(iris, iris$Species)})
mcMap(function(i) with(value(ft)[[i]], data.frame(grp = unique(Species), sl_avg = mean(Sepal.Length), sw_avg = mean(Sepal.Width))), 1:length(unique(iris$Species)), mc.cores = detectCores())
#[[1]]
#     grp sl_avg sw_avg
#1 setosa  5.006  3.428
#[[2]]
#         grp sl_avg sw_avg
#1 versicolor  5.936   2.77
#[[3]]
#        grp sl_avg sw_avg
#1 virginica  6.588  2.974

If we would like the output prettier, we could wrap the list into a nice-looking data.frame with a reduce operation by either Reduce(rbind, …) or do.call(rbind, …), where … is the final list from Map() or mcMap() shown above.

To leave a comment for the author, please follow the link and comment on their blog: S+/R – Yet Another Blog in Statistical Computing.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)