If you can for-each, you can doParallel! Parallelization in R

One of the common complaints I hear about R as a data analysis language is its relatively slow speed, compared to the C at its foundation or other, lower-level programming languages (C family, java, fortran). For-loops can be incredibly slow! They do get the job done eventually, serially (one task at a time), instead of in parallel (multiple tasks at a time).

When a coworker presented in May with a simple data task to run the same way over 140 files, it seemed like the perfect excuse to investigate a formal parallelization method based firmly in R. I have parallelized tasks manually by organizing batches, and running a copy of an R script with the appropriate batch number/information inside. It’s very tedious. It does work. But it’s tedious, and easy to make bookkeeping mistakes.

So, with no offense to the base for loop & its dedicated users, let me introduce doParallel, which readily parallelizes an existing for / foreach loop. I recommend reading the CRAN vignette on the two, here.

doParallel

doParallel connects the packages parallel and foreach to enable multi-core simultaneous processing on an iterative task, on one computer or cluster node. Ideally the tasks do not expect results from each other, or cross-talk, to complete. These tasks are sent out by a master process that communicates with system resources and monitors the tasks’ completion. GNU Parallel is a favorite bash tool of mine, and it does similar bookkeeping work for you on command line. The first step in R is to register the number of cores that will be used (which should be at the most n-1 number of cores available unless you want to crash your machine):

# My packages
library(VanillaICE)
library(data.table)
library(doParallel)
library(foreach)
library(plyr)

# Start parallel cluster
no_cores <- 10
cl <- makeCluster(no_cores)
registerDoParallel(cl)

So we expect to use 10 cores.

Then, I adapted code my coworker had already written to work for one file (one comparison between two samples). Like you would for a regular loop, I substituted variables for hard-coded references to files and IDs. I wrapped the code in a function. Inside, I set up an empty data.table with the columns I know will be produced by each task. Such a table will contain processed data from one of the 140 file comparisons. I collect these into a super-master table with all of these later. I’m a fan of pre-allocated tables when it comes to reading and combining data from similar sources.

Sometimes there was not enough data from one of the comparison files (the first IF statement conditions below were not met, leading to nrow(master) < 1). In such a case, I set up a mostly empty table with the same column names, containing NAs except for the file IDs in question, so I could see later that no data was processed.

docomparison <- function(x){...adapted code...
 
 if (is.data.table(nonov_dt) && (nrow(nonov_dt) >= 1)){
  master <- data.table("markers"=numeric(0), "seqnames"=numeric(0),
 "start"=numeric(0), "lrr_values.ipsc"=numeric(0),
 "baf_values.ipsc"=numeric(0), "lrr_values.donor"=numeric(0),
 "baf_values.donor"=numeric(0))

  master[, comparison := comparison.paste]
 
...more processing...
 }

if(nrow(master) < 1){
 master <- data.table("markers"="NA", "seqnames"="NA",
 "start"="NA", "lrr_values.ipsc"="NA", "baf_values.ipsc"="NA",
 "lrr_values.donor"="NA", "baf_values.donor"="NA",
 "comparison" = comparison.paste)
 }
 
 return(master)
}

In fact this function has its own loops necessary to complete a single comparison and fill a table — but these are fast enough that it is not worth parallelizing. I re-wrote all necessary packages inside the loop in case the master process (behind the scenes) did not correctly transfer the globally loaded packages.

Now to start the parallel loop! I assign the output of the parallelized function to a summary table called super.master. This took a bit of finagling to write.

super.master <- foreach(i=140, .combine = rbind,
 .packages = c('VanillaICE', 'data.table', 'plyr'),
 .export=c("master")) %dopar% docomparison(i)

This line starts off the tasks running 10 at a time, since we allowed use of 10 simultaneous cores. Each is executing the docomparison() function and populates a master table. When one task is done, doParallel and foreach rbind its output to super.master. The combination method used here is row binding, but column binding is an option if you were to create a wide table, i.e. new columns from each task. I read that a .packages= list could help guarantee packages are appropriately loaded in task environments — a bit of a stumbling block in getting this to run to completion. Better safe than sorry and package-bereft.

To finish up, I move to the appropriate out directory and write the super.master table to file. I stop the parallel cluster to free up those resources, and quit without saving the environment to a needless .RData file:

stopCluster(cl)
q(save="no")

Interfacing with a compute cluster

Not many personal machines have ten cores. I used the JHPCE cluster to request 11 cores on a single node. I asked for one more than the number of simultaneous tasks: it is wise to allow one extra for the independent process that monitors tasks, starts new ones, and at the end, combines data from them.

qsub -cwd -pe local 11 -l mem_free=1G,h_vmem=2G qsub-R.sh

PE local 11 requests the cores. Without it, doParallel would not be able to use more than 1. The file qsub-R.sh contains just the line, R CMD BATCH R-script.R, that is needed to run R in the cluster environment. How you request multiple cores will depend on your cluster operating system and batch job handling software.

Caveats & conclusion

The linchpin of my solution is the assumption of a uniform table structure produced by each task. Because it was easy to write code that works for a single task that could be put in a serial for loop, it was easy to drop into the doParallel/foreach package syntax. If the tasks all create differentially dimensioned or named tables, it might take multiple %dopar% lines before the “super-master” output table is ready. So, thankfully, this problem was tidy in that aspect.

doParallel halts execution if one of the tasks dies, rather than continuing — which is for the best, but leads to many, potentially resource-wasteful, attempts. Its error messages are not that helpful. The code that is being parallelized has to be 100% watertight, because I found it did not always return the “local” R error from the task that died.

I rate doParallel as advanced R, because it requires the user to deeply understand what will be parallelized and how the finished pieces will be combined together. It took a few days for me to learn this new syntax. The user should already understand base for loops, and the architecture of the machine they expect to request resources from.

Further reading
lrg

  • There is an O’Reilly book that includes Parallel functionality (potentially outdated at 2011)
  • This post on R Bloggers about the package
  • This post outlines other methods of .combine =, including a data.frame method and merge.by.time

 

 

If you made it this far, let me know what you think in the comments & your own preferred parallelization strategies in R. Thanks! Cheers – Claire

Featured image: shared under Creative Commons Attribution-Share Alike 2.0 Generic License. The Nanoscience High-Performance Computing Facility at Argonne National Laboratory, by Matt Howard, 2007.

 

Comments are closed.

Powered by WordPress.com.

Up ↑

%d bloggers like this: