Managing intermediate results when using R/sparklyr
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.
In our latest “R and big data” article we show how to manage intermediate results in non-trivial Apache Spark workflows using R, sparklyr, dplyr, and replyr.
Handle management
Many Sparklyr
tasks involve creation of intermediate or temporary tables. This can be through dplyr::copy_to()
and through dplyr::compute()
. These handles can represent a reference leak and eat up resources.
To help control handle lifetime the replyr
supplies record-retaining temporary name generators (and uses the same internally).
The actual function is pretty simple:
print(replyr::makeTempNameGenerator)
## function(prefix, ## suffix= NULL) { ## force(prefix) ## if((length(prefix)!=1)||(!is.character(prefix))) { ## stop("repyr::makeTempNameGenerator prefix must be a string") ## } ## if(is.null(suffix)) { ## alphabet <- c(letters, toupper(letters), as.character(0:9)) ## suffix <- paste(base::sample(alphabet, size=20, replace= TRUE), ## collapse = '') ## } ## count <- 0 ## nameList <- c() ## function(dumpList=FALSE) { ## if(dumpList) { ## v <- nameList ## nameList <<- c() ## return(v) ## } ## nm <- paste(prefix, suffix, sprintf('%010d',count), sep='_') ## nameList <<- c(nameList, nm) ## count <<- count + 1 ## nm ## } ## } ## <bytecode: 0x7f8659110708> ## <environment: namespace:replyr>
For instance to join a few tables it is a can be a good idea to call compute after each join (else the generated SQL
can become large and unmanageable). This sort of code looks like the following:
# create example data names <- paste('table', 1:5, sep='_') tables <- lapply(names, function(ni) { di <- data.frame(key= 1:3) di[[paste('val',ni,sep='_')]] <- runif(nrow(di)) copy_to(sc, di, ni) }) # build our temp name generator tmpNamGen <- replyr::makeTempNameGenerator('JOINTMP') # left join the tables in sequence joined <- tables[[1]] for(i in seq(2,length(tables))) { ti <- tables[[i]] if(i<length(tables)) { joined <- compute(left_join(joined, ti, by='key'), name= tmpNamGen()) } else { # use non-temp name. joined <- compute(left_join(joined, ti, by='key'), name= 'joinres') } } # clean up temps temps <- tmpNamGen(dumpList = TRUE) print(temps)
## [1] "JOINTMP_9lWXvfnkhI2NPRsA1tEh_0000000000" ## [2] "JOINTMP_9lWXvfnkhI2NPRsA1tEh_0000000001" ## [3] "JOINTMP_9lWXvfnkhI2NPRsA1tEh_0000000002"
for(ti in temps) { db_drop_table(sc, ti) } # show result print(joined)
## Source: query [3 x 6] ## Database: spark connection master=local[4] app=sparklyr local=TRUE ## ## # A tibble: 3 x 6 ## key val_table_1 val_table_2 val_table_3 val_table_4 val_table_5 ## <int> <dbl> <dbl> <dbl> <dbl> <dbl> ## 1 1 0.7594355 0.8082776 0.696254059 0.3777300 0.30015615 ## 2 2 0.4082232 0.8101691 0.005687125 0.9382002 0.04502867 ## 3 3 0.5941884 0.7990701 0.874374779 0.7936563 0.19940400
Careful introduction and management of materialized intermediates can conserve resources (both time and space) and greatly improve outcomes. We feel it is a good practice to set up an explicit temp name manager, pass it through all your Sparklyr
transforms, and then clear temps in batches after the results no longer depend no the intermediates.
R-bloggers.com offers daily e-mail updates about R news and tutorials about learning R and many other topics. Click here if you're looking to post or find an R/data-science job.
Want to share your content on R-bloggers? click here if you have a blog, or here if you don't.