[This article was first published on Markov Chain Monte Carlo in Item Response Models, and kindly contributed to R-bloggers]. (You can report issue about the content on this page here)
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.

MCMC is by its very nature a serial algorithm — each iteration depends on the results of the last iteration. It is, therefore, rather difficult to parallelize MCMC code so that a single chain will run more quickly by splitting the work along multiple processors.

A more straightforward way to parallelize MCMC code is to run multiple chains on multiple processors. Since the chains are independent of each other, it is mainly an issue of bookkeeping to distribute the work among several processing units.

In the next post we will see why running multiple chains can be very useful, for now we simply focus on the how. Specifically, this post explains how to exploit the multicore nature of modern computers to run several chains in roughly the same about of time as it takes to run one chain.

# Running code in parallel

An easy way to run R code in parallel on a multicore system is with the mclapply() function. Unfortunately, mclapply() does not work on Windows machines because mclapply() relies on forking and Windows does not support forking.

For the purposes of this online supplement, I have re-implemented mclapply() so that it runs on Windows. My personal blog gives the details of how I implmented the fix as it is a bit out of scope for the supplement. Hopefully, by modifying mclapply() so that it works across Windows, Mac, and Linux, we are able to focus on the statistical ideas instead of getting lost in computational details.

## A cross-platform mclapply() example

The following example illustrates how mclapply() is used. Consider the following function:

wait.then.square <- function(xx){
# Wait for ten seconds
Sys.sleep(10);
# Square the argument
xx^2 }


Applying it over the integers from 1 to 4 with lapply() will take about 40 seconds:
## Run in serial
system.time( serial.output <- lapply( 1:4, wait.then.square ) )
##  user  system elapsed
## 0.000   0.000   40.023


If we run it in parallel with mclapply() on Mac or Linux, it will take about 10 seconds:
## The following script does two things:
##   (i)  it loads the parallel R package, and
##   (ii) if the current R session is running on a Windows
##        machine, it implements a custom version of
##        parallel:mclapply().
source("http://mcmcinirt.stat.cmu.edu/setup/post-10-mclapply-hack.R")

## Note two changes to the wait.then.square call:
##   (i)   lapply becomes mclapply, and
##   (ii)  mc.cores (the number of processors to use in parallel)
##         is set to 4.
system.time( par.output <- mclapply( 1:4, wait.then.square,
mc.cores=4             ) )
##  user  system elapsed
## 0.008   0.000   10.020


If we run it in parallel with mclapply() on Windows, it will (i) take about 12 seconds, and (ii) produce a lot more console output:
source("http://mcmcinirt.stat.cmu.edu/setup/post-10-mclapply-hack.R")
##
##  *** Microsoft Windows detected ***
##
##  For technical reasons, the MS Windows version of mclapply()
##  is implemented as a serial function instead of a parallel
##  function.
##
##  As a quick hack, we replace this serial version of mclapply()
##  with a wrapper to parLapply() for this R session. Please see
##
##    http://www.stat.cmu.edu/~nmv/2014/07/14/implementing-mclapply-on-windows
##
##  for details.

system.time( par.output <- mclapply( 1:4, wait.then.square,
mc.cores=4             ) )
## starting worker pid=5512 on localhost:11616 at 12:30:30.691
## ... snip ...
## ... snip ...
##
## Attaching package: 'parallel'
##
## The following object is masked _by_ '.GlobalEnv':
##
##     mclapply
##
## ... snip ...
##
##  user  system elapsed
##   0.05    0.02   11.39


where the ... snip ... lines represent the three additional times that the message is printed to the console by the other cores. The "following object is masked" warning can be safely ignored. In this case, it is warning you that mclapply() has been replaced with something else, i.e. the Windows specific implementation of mclapply()!

# Implementing a parallel sampler

Conceptually, running chains in parallel is as simple as

    chain.list <- mclapply(1:num.chains, run.chain.2pl, ... )


where num.chains is the total number of chains that we wish to run in parallel and ... represents the additional arguments to run.chain.2pl.

The practice, however, is more complicated because each chain will need its own set of initial values, tuning parameters, and other arguments passed to run.chain.2pl. Otherwise, we would simply be duplicating work by running the exact same chain on separate cores.

Our full implementation is as follows:

## run.chain.2pl.list
##
## A wrapper to call run.chain.2pl with mclapply() in order to
## run MCMC chains in parallel. Returns a coda::mcmc.list.
##
## Inputs:
##   seq.of.random.seeds   A sequence of random seeds, one for
##                         each chain.
##   init.list             mcmc.list of initial values
##   MH.th.list            tuning parameters for theta.abl
##   MH.a.list             tuning parameters for a.disc
##   MH.b.list             tuning parameters for b.diff
##   ...                   the remaining arguments for run.chain.2pl
run.chain.2pl.list <- function(seq.of.random.seeds, init.list,
MH.th.list, MH.a.list, MH.b.list,
... ) {

## N.B. the length of seq.of.random.seeds is the number of
##      chains to be run
##
chain.list <- mclapply(1:length(seq.of.random.seeds),
function( ii ) {
## Set the seed for this core
this.seed <- seq.of.random.seeds[[ii]]
print(paste("Setting the seed to", this.seed))
set.seed(this.seed)

## Run the chain on this core
## N.B. la.get.all() is a helper function to extract
##      the relevant initial values for the init.list
##      data structure.
this.chain <- run.chain.2pl(
th.init = la.get.all(init.list, 'theta.abl')[[ii]],
a.init  = la.get.all(init.list, 'a.disc')[[ii]],
b.init  = la.get.all(init.list, 'b.diff')[[ii]],
s2.init = la.get.all(init.list, 'sig2.theta')[[ii]],
## Set the tuning from the inits list
MH.th = MH.th.list[[ii]],
MH.a  = MH.a.list[[ii]],
MH.b  = MH.b.list[[ii]],
... ## Set everything else
)

## Return the chain from this core
return( this.chain ) },
mc.cores=min(length( seq.of.random.seeds), detectCores() )
)

## Save the initial random seed as the name of the chain
names(chain.list) <- unlist(seq.of.random.seeds)

## Convert to a coda::mcmc.list object
converted.chain <- mcmc.list(
lapply( chain.list, function(xx) {
mcmc(  t(xx),
start=(list(...)$M.burnin + list(...)$M.thin),
thin=list(...)\$M.thin)
}))

return(converted.chain)
}

The la.get.all() helper function extracts the relevant parameters by name from a coda mcmc.list. This allows us to use the output from a previous chain to initialize a new chain, in effect, continuing where the old chain left off. The function is implemented in two steps: first, a get.all function that operates on a single chain, and second, the definition of la.get.all that operates on an mcmc.list:

## get.all
##
## Helper function to extract values by parameter name
get.all <- function( named.list , which.param ) {
trimmed.names <- substr( names(named.list),
1, nchar(which.param) )
return( named.list[
which( trimmed.names %in% which.param )] )
}

## la.get.all
##
## A shortcut for the lapply get.all pattern
la.get.all <- function( list.of.named.lists, which.param) {
return( lapply(
list.of.named.lists, get.all, which.param ) )
}

To initialize a chain without the values from a prior chain we must re-create the data-structure for an mcmc.list. The first step is to define a function init.chain() that create the initialization structure for a single chain.

init.chain <- function( seed.value, P.persons, I.items,
theta.mean, theta.sd,
a.scale, b.mean, b.sd,
s2.init ) {
## Reset the seed each time to remove dependence on
## previous draws
set.seed(seed.value)
theta.init <- rnorm(P.persons, theta.mean, theta.sd)

set.seed(seed.value)
a.init     <- exp(rnorm(I.items, a.scale,  sqrt(a.scale)  ))

set.seed(seed.value)
b.init     <- rnorm(I.items, b.mean, b.sd)

## Create the initial values as named list
vals <- c( theta.init, a.init, b.init, s2.init )
names(vals) <- get.2pl.params(1:P.persons,
1:I.items, 1:I.items, 1)

return(vals)
}


The second step is to manually create a list of these initializations, as we do in the example below.

# An example

The additional complexity to run a set of chains in parallel is mainly in specifying initial values and tuning parameters for the separate chains.

## Load the necessary code from this and previous posts
source("http://mcmcinirt.stat.cmu.edu/setup/post-10.R")

## Define the hyper parameters to values from Post 1
hyperpars <- list( mu.a         = 1.185,
s2.a         = 1.185,
s2.b         = 100,
alpha.th     = 1,
beta.th      = 1 )

## Make a list of initial values the init.chain function
chain.inits <- list(
##
## A chain with small values
chain1 = init.chain( 1, P.persons, I.items,
theta.mean = -10    , theta.sd = 1,
a.scale    =   0.001,
b.mean     =  -10    , b.sd     = 1,
s2.init    =   0.75 ),
##
## A chain near the true values
chain2 = init.chain( 1, P.persons, I.items,
theta.mean = 0  , theta.sd = 1,
a.scale    = 0.1,
b.mean     = 0  , b.sd     = 1,
s2.init    = 1 ),
##
## A chain with large values
chain3 = init.chain( 1, P.persons, I.items,
theta.mean =  10 , theta.sd = 1,
a.scale    =   1 ,
b.mean     =  10 , b.sd    = 1,
s2.init    =   3 )
)

## Define the random seeds
seq.of.random.seeds <- c(0, 1, 2) + 314159

run.par <- run.chain.2pl.list(
seq.of.random.seeds, chain.inits,
M.burnin=0, M.keep=100, M.thin=1,
U.data=U, hyperpars=hyperpars,
MH.th.list = list( 1,     1, 1   ),
MH.a.list  = list( 1/5, 1/5, 1/5 ),
MH.b.list  = list( 1/2, 1/2, 1/2 ), verbose=TRUE)
## [1] "Setting the seed to 314159"
## [1] "Setting the seed to 314160"
## [1] "Setting the seed to 314161"
## [1] 100
## Average acceptance rates:
##  theta.abl: 0.419
##  a.disc:    0.328
##  b.diff:    0.184
## [1] 100
## Average acceptance rates:
##  theta.abl: 0.66
##  a.disc:    0.213
##  b.diff:    0.334
## [1] 100
## Average acceptance rates:
##  theta.abl: 0.222
##  a.disc:    0.488
##  b.diff:    0.113 

The resulting set of three chains can be visualized with the same coda functions used for the single chains. The output, however, is slightly different, the trace plots of the three chains are plotted together.

 plot( run.par[,get.2pl.params(1,1,1,1)], density=FALSE )

As can be seen from the plot, the three chains have not converged to the same region of the parameter space in only 100 iterations. In the next post we examine how to formally detect when the chains have, in fact, converged to the same region of the parameter space.