Chapter 18: Turning parts of the pipeline on and off at runtime with @active_if


It is sometimes useful to be able to switch on and off parts of a pipeline. For example, a pipeline might have two difference code paths depending on the type of data it is being asked to analyse.

One surprisingly easy way to do this is to use a python if statement around particular task functions:

 from ruffus import *

 run_task1 = True

 @originate(['', ''])
 def create_files(output_file):
     open(output_file, "w")

 if run_task1:
     # might not run
     @transform(create_files, suffix(".foo"), ".bar")
     def foobar(input_file, output_file):
         open(output_file, "w")

 @transform(foobar, suffix(".bar"), ".result")
 def wrap_up(input_file, output_file):
         open(output_file, "w")

This simple solution has a number of drawbacks:
  1. The on/off decision is a one off event that happens when the script is loaded. Ideally, we would like some flexibility, and postpone the decision until pipeline_run() is invoked.
  2. When if is false, the entire task function becomes invisible, and if there are any downstream tasks, as in the above example, Ruffus will complain loudly about missing dependencies.

@active_if controls the state of tasks

  • Switches tasks on and off at run time depending on its parameters
  • Evaluated each time pipeline_run, pipeline_printout or pipeline_printout_graph is called.
  • Dormant tasks behave as if they are up to date and have no output.

The Design and initial implementation were contributed by Jacob Biesinger

The following example shows its flexibility and syntax:

from ruffus import *
run_if_true_1 = True
run_if_true_2 = False
run_if_true_3 = True

#    task1
@originate(['', ''])
def create_files(outfile):
    open(outfile, "w").write(outfile + "\n")

#   Only runs if all three run_if_true conditions are met
#   @active_if determines if task is active
@active_if(run_if_true_1, lambda: run_if_true_2)
@transform(create_files, suffix(".foo"), ".bar")
def this_task_might_be_inactive(infile, outfile):
    open(outfile, "w").write("%s -> %s\n" % (infile, outfile))

# @active_if switches off task because run_if_true_2 == False
pipeline_run(verbose = 3)

# @active_if switches on task because all run_if_true conditions are met
run_if_true_2 = True
pipeline_run(verbose = 3)

The task starts off inactive:

>>> # @active_if switches off task "this_task_might_be_inactive" because run_if_true_2 == False
>>> pipeline_run(verbose = 3)

Task enters queue = create_files
    Job  = [None ->] Missing file []
    Job  = [None ->] Missing file []
    Job  = [None ->] completed
    Job  = [None ->] completed
Completed Task = create_files
Inactive Task = this_task_might_be_inactive

Now turn on the task:

>>> # @active_if switches on task "this_task_might_be_inactive" because all run_if_true conditions are met
>>> run_if_true_2 = True
>>> pipeline_run(verbose = 3)

Task enters queue = this_task_might_be_inactive

    Job  = [ ->] Missing file []
    Job  = [ ->] Missing file []
    Job  = [ ->] completed
    Job  = [ ->] completed
Completed Task = this_task_might_be_inactive