Using RcppParallel to aggregate to a vector

[This article was first published on Rcpp Gallery, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

This article demonstrates using the
RcppParallel package to aggregate
to an output vector. It extends directly from previous demonstrations of
single-valued
aggregation
, through
providing necessary details to enable aggregation to a vector, or by
extension, to any arbitrary form.

The General Problem

Many tasks require aggregation to a vector result, and many such tasks can be
made more efficient by performing such aggregation in parallel. The general
problem is that the vector in which results are to be aggregated has to be
shared among the parallel threads. This is a parallelReduce task – we need
to split the singular task into effectively independent, parallel tasks,
perform our aggregation operation on each of those tasks, yielding as many
instances of our aggregate result vector as there are parallel tasks, and
then finally join all of those resultant vectors from the parallel tasks into
our desired singular result vector. The general structure of the code
demonstrated here extends from the previous Gallery article on parallel
vector sums
, through
extending to summation to a vector result, along with the passing of
additional variables to the parallel worker. The following code demonstrates
aggregation to a vector result that holds the row sums of a matrix, noting at
the output that is not intended to represent efficient code, rather it is
written to explicitly emphasise the principles of using RcppParallel to
aggregate over a vector result.

The parallelReduce Worker

The following code defines our parallel worker, in which the input is
presumed for demonstration purposes to be a matrix stored as a single vector,
and so has of total length nrow * ncol. The demonstration includes a few
notable features:

  1. The main input simply provides an integer index into the rows of the
    matrix, with the parallel job splitting the task among elements of that
    index. This explicit specification of an index vector is not necessary, but
    serves here to clarify what the worker is actually doing. An alternative
    would be for input to be the_matrix, and subsequently call the parallel
    worker only over [0 ... nrow] of that vector which has a total length of
    nrow * ncol.

  2. We are passing two additional variables specifying nrow and ncol.
    Although one of these could be inferred at run time, we pass them simply to
    demonstrate how this is done. Note in particular the form in the second
    constructor, called for each Split job, which accepts as input the
    variables as defined by the main constructor, and so all variable definitions
    are of the form, nrow(oneJob.nrow). The initial constructor also has input
    variables explicitly defined with _in suffices, to clarify exactly how such
    variable passing works.

  3. No initial values for the output are passed to the constructors. Rather,
    output must be resized to the desired size by each of those constructors,
    and so each repeats the line output.resize(nrow, 0.0), which also
    initialises the values. (This is more readily done using a std::vector
    than an Rcpp vector, with final conversion to an Rcpp vector result
    achieved through a simple Rcpp::wrap call.)

#include 
// [[Rcpp::depends(RcppParallel)]]
#include 
using namespace Rcpp;
using namespace RcppParallel;

struct OneJob : public Worker
{
    RVector<int> input;

    const NumericVector the_matrix;
    const size_t nrow;
    const size_t ncol;

    std::vector<double> output;

    // Constructor 1: The main constructor
    OneJob (
            const IntegerVector input_in,
            const NumericVector the_matrix_in,
            const size_t nrow_in,
            const size_t ncol_in) :
        input(input_in), the_matrix(the_matrix_in),
        nrow(nrow_in), ncol(ncol_in), output()
    {
        output.resize(nrow, 0.0);
    }

    // Constructor 2: Called for each split job
    OneJob (
            const OneJob &oneJob,
            Split) :
        input(oneJob.input), the_matrix(oneJob.the_matrix),
        nrow(oneJob.nrow), ncol(oneJob.ncol), output()
    {
        output.resize(nrow, 0.0);
    }

    // Parallel function operator
    void operator() (std::size_t begin, std::size_t end)
    {
        for (size_t i = begin; i < end; i++)
        {
            // Very inefficient yet explicit way to calculate row sums:
            for (size_t j = 0; j < ncol; j++) {
                // static_cast becuase (i,j,nrow) are size_t, aka unsigned long,
                // but Rcpp vectors require `R_xlen_t`, aka long.
                output[i] += the_matrix[static_cast<R_xlen_t>(i + j * nrow)];
            }
        }
    } // end parallel function operator

    void join (const OneJob &rhs)
    {
        for (size_t i = 0; i < nrow; i++) {
            output[i] += rhs.output[i];
        }
    }
};

The worker can then be called via parallelReduce with the following code,
in which static_casts are necessary because .size() applied to Rcpp
objects returns an R_xlen_t or long value, but we need to pass unsigned
long
or size_t values to the worker to use as indices into standard C++
vectors. The output of oneJob is a std::vector, which is
converted to an Rcpp::NumericVector through a simple call to Rcpp::wrap.

// [[Rcpp::export]]
NumericVector vector_aggregator (IntegerVector index, NumericVector x)
{
    const size_t nrow = static_cast <size_t> (index.size ());
    const size_t ncol = static_cast <size_t> (x.size ()) / nrow;
    OneJob oneJob (index, x, nrow, ncol);
    parallelReduce (0, nrow, oneJob);
    return wrap (oneJob.output);
}

Demonstration

Finally, the following code demonstrates that this parallel worker correctly
returns the row sums of the input matrix.

# allocate a vector
nrow <- 1e5
ncol <- 10
x <- runif (nrow * ncol) # input matrix
res <- vector_aggregator (seq(nrow), x)

# confirm that this equals rowsums of the matrix:
xmat <- matrix(x, ncol = ncol)
identical(res, rowSums(xmat))
[1] TRUE

You can learn more about using RcppParallel at
https://rcppcore.github.com/RcppParallel.

To leave a comment for the author, please follow the link and comment on their blog: Rcpp Gallery.

R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)