Reading List Faster With parallel, doParallel, and pbapply

December 11, 2018
By

(This article was first published on Daniel Marcelino's Blog, and kindly contributed to R-bloggers)

center

I have several tables that I would like to load as a sole data frame. Derived functions from read. table () have a lot of convenient features, but it seems like there is a lot of steps in the implementation that would slow things down. The gain in performance of reading 29 CSV files (about 2.2 GB) shows quite different picture. While the parallelization process does bring some improvement considering the ‘user time’, i.e. the CPU time charged for the process execution at the machine level, the ‘elapsed time’, i.e. the ‘real’ elapsed time since the process was started doesn’t show much difference. Let’s go through it.

Data

list_of_datasets <- list.files(pattern = "*.csv")

list_of_datasets
 [1] "despesas_contratadas_candidatos_2018_AC.csv"    
 [2] "despesas_contratadas_candidatos_2018_AL.csv"    
 [3] "despesas_contratadas_candidatos_2018_AM.csv"    
 [4] "despesas_contratadas_candidatos_2018_AP.csv"    
 [5] "despesas_contratadas_candidatos_2018_BA.csv"    
 [6] "despesas_contratadas_candidatos_2018_BR.csv"    
 [7] "despesas_contratadas_candidatos_2018_BRASIL.csv"
 [8] "despesas_contratadas_candidatos_2018_CE.csv"    
 [9] "despesas_contratadas_candidatos_2018_DF.csv"    
[10] "despesas_contratadas_candidatos_2018_ES.csv"    
[11] "despesas_contratadas_candidatos_2018_GO.csv"    
[12] "despesas_contratadas_candidatos_2018_MA.csv"    
[13] "despesas_contratadas_candidatos_2018_MG.csv"    
[14] "despesas_contratadas_candidatos_2018_MS.csv"    
[15] "despesas_contratadas_candidatos_2018_MT.csv"    
[16] "despesas_contratadas_candidatos_2018_PA.csv"    
[17] "despesas_contratadas_candidatos_2018_PB.csv"    
[18] "despesas_contratadas_candidatos_2018_PE.csv"    
[19] "despesas_contratadas_candidatos_2018_PI.csv"    
[20] "despesas_contratadas_candidatos_2018_PR.csv"    
[21] "despesas_contratadas_candidatos_2018_RJ.csv"    
[22] "despesas_contratadas_candidatos_2018_RN.csv"    
[23] "despesas_contratadas_candidatos_2018_RO.csv"    
[24] "despesas_contratadas_candidatos_2018_RR.csv"    
[25] "despesas_contratadas_candidatos_2018_RS.csv"    
[26] "despesas_contratadas_candidatos_2018_SC.csv"    
[27] "despesas_contratadas_candidatos_2018_SE.csv"    
[28] "despesas_contratadas_candidatos_2018_SP.csv"    
[29] "despesas_contratadas_candidatos_2018_TO.csv"    

Function

library(pbapply)
library(parallel)
library(doParallel)
library(dplyr)


#' Reads a list of datasets
#' @param x A list of datasets (names of datasets are strings)
#' @param func A function, the read function to use to read the data
#' @param parallelize Parallelize the code
#' @param ... Further arguments passed to func

readListFaster <- function(x,  func, ..., parallelize = FALSE, rbind = FALSE){

  stopifnot(length(x) > 0)

  read_and_assign <- function(dataset, func){
    dataset_name <- as.name(dataset)
    dataset_name <- func(dataset, ...)
  }

  if (parallelize) {
    message("Reading in data in parallel")
    clusters <- parallel::detectCores() %>%
      parallel::makeCluster()

    doParallel::registerDoParallel(clusters)

    output <- invisible( # invisible is used to suppress the unneeded output
      pbapply::pblapply(x,
                        read_and_assign,
                        func = func,
                        ...,
                        cl = clusters)
    )

    parallel::stopCluster(clusters)

  } else if (!parallelize) {
    output <- invisible(
      pbapply::pblapply(x,
                        read_and_assign,
                        func = func)
    )
  }

  # Remove what's after the "." at the end of the data set names and what's before any / for url files.
  x <- stringr::str_replace_all(x,".*/|\\..*", "")

  names(output) <- x

  if (rbind) {
    dplyr::bind_rows(output)
  } else {
    output
  }
}

Results

Without Paralelize

system.time(loaded_datasets1 <- readListFaster(list_of_datasets, 
                                 func = read.csv2, 
                                 parallelize=FALSE, 
                                 rbind = TRUE))
   user  system elapsed 
229.852  15.221 264.272 

With Paralelize

system.time(loaded_datasets2 <- readListFaster(list_of_datasets, 
                                 func = read.csv2, 
                                 parallelize=TRUE, 
                                 rbind = TRUE))
   user  system elapsed 
 74.463  24.082 261.827 

It’s important to realise that every bit of optimisation matters. But it would not help us if the outcome data frames were different, don’t you agree? Luckily all 174 million records do match.

table(loaded_datasets1 == loaded_datasets2)
     TRUE 
174053476 

To leave a comment for the author, please follow the link and comment on their blog: Daniel Marcelino's Blog.

R-bloggers.com offers daily e-mail updates about R news and tutorials on topics such as: Data science, Big Data, R jobs, visualization (ggplot2, Boxplots, maps, animation), programming (RStudio, Sweave, LaTeX, SQL, Eclipse, git, hadoop, Web Scraping) statistics (regression, PCA, time series, trading) and more...



If you got this far, why not subscribe for updates from the site? Choose your flavor: e-mail, twitter, RSS, or facebook...

Comments are closed.

Search R-bloggers


Sponsors

Never miss an update!
Subscribe to R-bloggers to receive
e-mails with the latest R posts.
(You will not see this message again.)

Click here to close (This popup will not appear again)