Chapter 10: Checkpointing: Interrupted Pipelines and Exceptions

Overview

../../_images/theoretical_pipeline_schematic.png

Computational pipelines transform your data in stages until the final result is produced.

By default, Ruffus uses file modification times for the input and output to determine whether each stage of a pipeline is up-to-date or not. But what happens when the task function is interrupted, whether from the command line or by error, half way through writing the output?

In this case, the half-formed, truncated and corrupt Output file will look newer than its Input and hence up-to-date.

Interrupting tasks

Let us try with an example:

from ruffus import *
import sys, time

#   create initial files
@originate(['job1.start'])
def create_initial_files(output_file):
    with open(output_file, "w") as oo: pass


#---------------------------------------------------------------
#
#   long task to interrupt
#
@transform(create_initial_files, suffix(".start"), ".output")
def long_task(input_files, output_file):
    with open(output_file, "w") as ff:
        ff.write("Unfinished...")
        # sleep for 2 seconds here so you can interrupt me
        sys.stderr.write("Job started. Press ^C to interrupt me now...\n")
        time.sleep(2)
        ff.write("\nFinished")
        sys.stderr.write("Job completed.\n")


#       Run
pipeline_run([long_task])

When this script runs, it pauses in the middle with this message:

Job started. Press ^C to interrupt me now...

If you interrupted the script by pressing Control-C at this point, you will see that job1.output contains only Unfinished.... However, if you should rerun the interrupted pipeline again, Ruffus ignores the corrupt, incomplete file:

>>> pipeline_run([long_task])
Job started. Press ^C to interrupt me now...
Job completed

And if you had run pipeline_printout:

>>> pipeline_printout(sys.stdout, [long_task], verbose=3)
________________________________________
Tasks which will be run:

Task = long_task
       Job  = [job1.start
             -> job1.output]
         # Job needs update: Previous incomplete run leftover: [job1.output]

We can see that Ruffus magically knows that the previous run was incomplete, and that job1.output is detritus that needs to be discarded.

Checkpointing: only log completed jobs

All is revealed if you were to look in the working directory. Ruffus has created a file called .ruffus_history.sqlite. In this SQLite database, Ruffus logs only those files which are the result of a completed job, all other files are suspect. This file checkpoint database is a fail-safe, not a substitute for checking file modification times. If the Input or Output files are modified, the pipeline will rerun.

By default, Ruffus saves only file timestamps to the SQLite database but you can also add a checksum of the pipeline task function body or parameters. This behaviour can be controlled by setting the checksum_level parameter in pipeline_run(). For example, if you do not want to save any timestamps or checksums:

pipeline_run(checksum_level = 0)

CHECKSUM_FILE_TIMESTAMPS      = 0     # only rerun when the file timestamps are out of date (classic mode)
CHECKSUM_HISTORY_TIMESTAMPS   = 1     # Default: also rerun when the history shows a job as being out of date
CHECKSUM_FUNCTIONS            = 2     # also rerun when function body has changed
CHECKSUM_FUNCTIONS_AND_PARAMS = 3     # also rerun when function parameters or function body change

Note

Checksums are calculated from the pickled string for the function code and parameters. If pickling fails, Ruffus will degrade gracefully to saving just the timestamp in the SQLite database.

Do not share the same checkpoint file across for multiple pipelines!

The name of the Ruffus python script is not saved in the checkpoint file along side timestamps and checksums. That means that you can rename your pipeline source code file without having to rerun the pipeline! The tradeoff is that if multiple pipelines are run from the same directory, and save their histories to the same SQlite database file, and if their file names overlap (all of these are bad ideas anyway!), this is bound to be a source of confusion.

Luckily, the name and path of the checkpoint file can be also changed for each pipeline

Setting checkpoint file names

Warning

Some file systems do not appear to support SQLite at all:

There are reports that SQLite databases have file locking problems on Lustre.

The best solution would be to keep the SQLite database on an alternate compatible file system away from the working directory if possible.

environment variable DEFAULT_RUFFUS_HISTORY_FILE

The name of the checkpoint file is the value of the environment variable DEFAULT_RUFFUS_HISTORY_FILE.

export DEFAULT_RUFFUS_HISTORY_FILE=/some/where/.ruffus_history.sqlite

This gives considerable flexibility, and allows a system-wide policy to be set so that all Ruffus checkpoint files are set logically to particular paths.

Note

It is your responsibility to make sure that the requisite destination directories for the checkpoint files exist beforehand!

Where this is missing, the checkpoint file defaults to .ruffus_history.sqlite in your working directory

Setting the checkpoint file name manually

This checkpoint file name can always be overridden as a parameter to Ruffus functions:

pipeline_run(history_file = "XXX")
pipeline_printout(history_file = "XXX")
pipeline_printout_graph(history_file = "XXX")

There is also built in support in Ruffus.cmdline. So if you use this module, you can simply add to your command line:

# use a custom checkpoint file
myscript --checksum_file_name .myscript.ruffus_history.sqlite

This takes precedence over everything else.

Useful checkpoint file name policies DEFAULT_RUFFUS_HISTORY_FILE

If the pipeline script is called test/bin/scripts/run.me.py, then these are the resulting checkpoint files locations:

Example 1: same directory, different name

If the environment variable is:

export DEFAULT_RUFFUS_HISTORY_FILE=.{basename}.ruffus_history.sqlite

Then the job checkpoint database for run.me.py will be .run.me.ruffus_history.sqlite

/test/bin/scripts/run.me.py
/common/path/for/job_history/scripts/.run.me.ruffus_history.sqlite

Example 2: Different directory, same name

export DEFAULT_RUFFUS_HISTORY_FILE=/common/path/for/job_history/.{basename}.ruffus_history.sqlite
/common/path/for/job_history/.run.me.ruffus_history.sqlite

Example 2: Different directory, same name but keep one level of subdirectory to disambiguate

export DEFAULT_RUFFUS_HISTORY_FILE=/common/path/for/job_history/{subdir[0]}/.{basename}.ruffus_history.sqlite
/common/path/for/job_history/scripts/.run.me.ruffus_history.sqlite

Example 2: nested in common directory

export DEFAULT_RUFFUS_HISTORY_FILE=/common/path/for/job_history/{path}/.{basename}.ruffus_history.sqlite
/common/path/for/job_history/test/bin/scripts/.run.me.ruffus_history.sqlite

Regenerating the checkpoint file

Occasionally you may need to re-generate the checkpoint file.

This could be necessary:

  • because you are upgrading from a previous version of Ruffus without checkpoint file support
  • on the rare occasions when the SQLite file becomes corrupted and has to deleted
  • if you wish to circumvent the file checking of Ruffus after making some manual changes!

To do this, it is only necessary to call pipeline_run appropriately:

CHECKSUM_REGENERATE = 2
pipeline(touch_files_only = CHECKSUM_REGENERATE)

Similarly, if you are using Ruffus.cmdline, you can call:

myscript --recreate_database

Note that this regenerates the checkpoint file to reflect the existing Input, Output files on disk. In other words, the onus is on you to make sure there are no half-formed, corrupt files. On the other hand, the pipeline does not need to have been previously run successfully for this to work. Essentially, Ruffus, pretends to run the pipeline, while logging all the files with consistent file modication times, stopping at the first tasks which appear out of date or incomplete.

Rules for determining if files are up to date

The following simple rules are used by Ruffus.

  1. The pipeline stage will be rerun if:

    • If any of the Input files are new (newer than the Output files)
    • If any of the Output files are missing
  2. In addition, it is possible to run jobs which create files from scratch.

    • If no Input file names are supplied, the job will only run if any output file is missing.
  3. Finally, if no Output file names are supplied, the job will always run.

Missing files generate exceptions

If the inputs files for a job are missing, the task function will have no way to produce its output. In this case, a MissingInputFileError exception will be raised automatically. For example,

task.MissingInputFileError: No way to run job: Input file ['a.1'] does not exist
for Job = ["a.1" -> "a.2", "A file"]

Caveats: Coarse Timestamp resolution

Note that modification times have precision to the nearest second under some older file systems (ext2/ext3?). This may be also be true for networked file systems.

Ruffus supplements the file system time resolution by independently recording the timestamp at full OS resolution (usually to at least the millisecond) at job completion, when presumably the Output files will have been created.

However, Ruffus only does this if the discrepancy between file time and system time is less than a second (due to poor file system timestamp resolution). If there are large mismatches between the two, due for example to network time slippage, misconfiguration etc, Ruffus reverts to using the file system time and adds a one second delay between jobs (via time.sleep()) to make sure input and output file stamps are different.

If you know that your filesystem has coarse-grained timestamp resolution, you can always revert to this very conservative behaviour, at the prices of some annoying 1s pauses, by setting pipeline_run(one_second_per_job = True)

Flag files: Checkpointing for the paranoid

One other way of checkpointing your pipelines is to create an extra “flag” file as an additional Output file name. The flag file is only created or updated when everything else in the job has completed successifully and written to disk. A missing or out of date flag file then would be a sign for Ruffus that the task never completed properly in the first place.

This used to be much the best way of performing checkpointing in Ruffus and is still the most bulletproof way of proceeding. For example, even the loss or corruption of the checkpoint file, would not affect things greatly.

Nevertheless flag files are largely superfluous in modern Ruffus.