Chapter 14: Multiprocessing, drmaa and Computation Clusters

Overview

Multi Processing

Ruffus uses python multiprocessing to run each job in a separate process.

This means that jobs do not necessarily complete in the order of the defined parameters. Task hierachies are, of course, inviolate: upstream tasks run before downstream, dependent tasks.

Tasks that are independent (i.e. do not precede each other) may be run in parallel as well.

The number of concurrent jobs can be set in pipeline_run:

pipeline_run([parallel_task], multiprocess = 5)

If multiprocess is set to 1, then jobs will be run on a single process.

Data sharing

Running jobs in separate processes allows Ruffus to make full use of the multiple processors in modern computers. However, some multiprocessing guidelines should be borne in mind when writing Ruffus pipelines. In particular:

  • Try not to pass large amounts of data between jobs, or at least be aware that this has to be marshalled across process boundaries.
  • Only data which can be pickled can be passed as parameters to Ruffus task functions. Happily, that applies to almost any native Python data type. The use of the rare, unpicklable object will cause python to complain (fail) loudly when Ruffus pipelines are run.

Restricting parallelism with @jobs_limit

Calling pipeline_run(multiprocess = NNN) allows multiple jobs (from multiple independent tasks) to be run in parallel. However, there are some operations that consume so many resources that we might want them to run with less or no concurrency.

For example, we might want to download some files via FTP but the server restricts requests from each IP address. Even if the rest of the pipeline is running 100 jobs in parallel, the FTP downloading must be restricted to 2 files at a time. We would really like to keep the pipeline running as is, but let this one operation run either serially, or with little concurrency.

The optional name (e.g. @jobs_limit(3, "ftp_download_limit")) allows the same limit to be shared across multiple tasks. To be pedantic: a limit of 3 jobs at a time would be applied across all tasks which have a @jobs_limit named "ftp_download_limit".

The example code uses up to 10 processes across the pipeline, but runs the stage1_big and stage1_small tasks 3 at a time (shared across both tasks). stage2 jobs run 5 at a time.

Using drmaa to dispatch work to Computational Clusters or Grid engines from Ruffus jobs

Ruffus has been widely used to manage work on computational clusters or grid engines. Though Ruffus task functions cannot (yet!) run natively and transparently on remote cluster nodes, it is trivial to dispatch work across the cluster.

From version 2.4 onwards, Ruffus includes an optional helper module which interacts with python bindings for the widely used drmaa Open Grid Forum API specification. This allows jobs to dispatch work to a computational cluster and wait until it completes.

Here are the necessary steps

1) Use a shared drmaa session:

Before your pipeline runs:

#
#   start shared drmaa session for all jobs / tasks in pipeline
#
import drmaa
drmaa_session = drmaa.Session()
drmaa_session.initialize()

Cleanup after your pipeline completes:

#
#   pipeline functions go here
#
if __name__ == '__main__':
    drmaa_session.exit()

2) import ruffus.drmaa_wrapper

  • The optional ruffus.drmaa_wrapper module needs to be imported explicitly:
# imported ruffus.drmaa_wrapper explicitly
from ruffus.drmaa_wrapper import run_job, error_drmaa_job

3) call drmaa_wrapper.run_job()

drmaa_wrapper.run_job() dispatches the work to a cluster node within a normal Ruffus job and waits for completion

This is the equivalent of os.system or subprocess.check_output but the code will run remotely as specified:

    # ruffus.drmaa_wrapper.run_job
    stdout_res, stderr_res  = run_job(cmd_str           = "touch " + output_file,
                                      job_name          = job_name,
                                      logger            = logger,
                                      drmaa_session     = drmaa_session,
                                      run_locally       = options.local_run,
                                      job_other_options = job_other_options)

The complete code is available here

4) Use multithread: pipeline_run(multithread = NNN)

Warning

drmaa_wrapper.run_job()

requires pipeline_run (multithread = NNN)

and will not work with pipeline_run (multiprocess = NNN)

Using multithreading rather than multiprocessing
  • allows the drmaa session to be shared
  • prevents “processing storms” which lock up the queue submission node when hundreds or thousands of grid engine / cluster commands complete at the same time.
pipeline_run (..., multithread = NNN, ...)

or if you are using ruffus.cmdline:

cmdline.run (options, multithread = options.jobs)

Normally multithreading reduces the amount of parallelism in python due to the python Global interpreter Lock (GIL). However, as the work load is almost entirely on another computer (i.e. a cluster / grid engine node) with a separate python interpreter, any cost benefit calculations of this sort are moot.

5) Develop locally

drmaa_wrapper.run_job() provides two convenience parameters for developing grid engine pipelines:

  • commands can run locally, i.e. on the local machine rather than on cluster nodes:

    run_job(cmd_str, run_locally = True)
    
  • Output files can be touched, i.e. given the appearance of the work having being done without actually running the commands

    run_job(cmd_str, touch_only = True)
    

Forcing a pipeline to appear up to date

Sometimes, we know that a pipeline has run to completion, that everything is up-to-date. However, Ruffus still insists on the basis of file modification times that you need to rerun.

For example, sometimes a trivial accounting modification needs to be made to a data file. Even though you know that this changes nothing in practice, Ruffus will detect the modification and ask to rerun everything from that point forwards.

One way to convince Ruffus that everything is fine is to manually touch all subsequent data files one by one in sequence so that the file timestamps follow the appropriate progression.

You can also ask Ruffus to do this automatically for you by running the pipeline in touch mode:

pipeline_run( touch_files_only = True)

pipeline_run will run your pipeline script normally working backwards from any specified final target, or else the last task in the pipeline. It works out where it should begin running, i.e. with the first out-of-date data files. After that point, instead of calling your pipeline task functions, each missing or out-of-date file is touch-ed in turn so that the file modification dates follow on successively.

This turns out to be useful way to check that your pipeline runs correctly by creating a series of dummy (empty files). However, Ruffus does not know how to read your mind to know which files to create from @split or @subdivide tasks.

Using ruffus.cmdline from version 2.4, you can just specify:

your script --touch_files_only [--other_options_of_your_own_etc]