Chapter 15: Logging progress through a pipeline

Note

Remember to look at the example code

Overview

There are two parts to logging with Ruffus:

  • Logging progress through the pipeline

    This produces the sort of output displayed in this manual:

    >>> pipeline_run([parallel_io_task])
    Task = parallel_io_task
        Job = ["a.1" -> "a.2", "A file"] completed
        Job = ["b.1" -> "b.2", "B file"] unnecessary: already up to date
    Completed Task = parallel_io_task
    
  • Logging your own messages from within your pipelined functions.

    Because Ruffus may run each task function in separate process on a separate CPU (multiprocessing), some attention has to be paid to how to send and synchronise your log messages across process boundaries.

We shall deal with these in turn.

Logging task/job completion

By default, Ruffus logs each task and each job as it is completed to sys.stderr.

By default, Ruffus logs to STDERR: pipeline_run(logger = stderr_logger).

If you want to turn off all tracking messages as the pipeline runs, apart from setting verbose = 0, you can also use the aptly named Ruffus black_hole_logger:

pipeline_run(logger = black_hole_logger)

Controlling logging verbosity

pipeline_run() currently has five levels of verbosity, set by the optional verbose parameter which defaults to 1:

verbose = 0: nothing
verbose = 1: logs completed jobs/tasks;
verbose = 2: logs up to date jobs in incomplete tasks
verbose = 3: logs reason for running job
verbose = 4: logs messages useful only for debugging ruffus pipeline code

verbose > 5 are intended for debugging Ruffus by the developers and the details are liable to change from release to release

Use ruffus.cmdline

As always, it is easiest to use ruffus.cmdline.

Set your script to

  • write messages to STDERR with the --verbose option and
  • to a log file with the --log_file option.
 from ruffus import *

 #  Python logger which can be synchronised across concurrent Ruffus tasks
 logger, logger_mutex = cmdline.setup_logging (__name__, options.log_file, options.verbose)

 @transform( ["job1.input"], suffix(".input"), ".output1"),
 def first_task(input_file, output_file):
     pass

 pipeline_run(logger=logger)

Customising logging

You can also specify exactly how logging works by providing a logging object to pipeline_run() . This log object should have debug() and info() methods.

Instead of writing your own, it is usually more convenient to use the python logging module which provides logging classes with rich functionality.

The example code sets up a logger to a rotating set of files

Log your own messages

You need to take a little care when logging your custom messages within your pipeline.

  • If your Ruffus pipeline may run in parallel, make sure that logging is synchronised.
  • If your Ruffus pipeline may run across separate processes, send your logging object across process boundaries.

logging objects can not be pickled and shared naively across processes. Instead, we need to create proxies which forward the logging to a single shared log.

The ruffus.proxy_logger module provides an easy way to share logging objects among jobs. This requires just two simple steps:

Note

1. Set up logging

Things are easiest if you are using ruffus.cmdline:

#  standard python logger which can be synchronised across concurrent Ruffus tasks
logger, logger_mutex = cmdline.setup_logging (__name__, options.log_file, options.verbose)

Otherwise, manually:

from ruffus.proxy_logger import *
(logger,
 logging_mutex) = make_shared_logger_and_proxy (setup_std_shared_logger,
                                                "my_logger",
                                                {"file_name" :"/my/lg.log"})

2. Share the proxy

Now, pass:

  • logger (which forwards logging calls across jobs) and
  • logging_mutex (which prevents different jobs which are logging simultaneously from being jumbled up)

to each job:

@transform( initial_file,
            suffix(".input"),
            ".output1",
            logger, logging_mutex),         # pass log and synchronisation as parameters
def first_task(input_file, output_file,
            logger, logging_mutex):         # pass log and synchronisation as parameters
    pass

    # synchronise logging
    with logging_mutex:
        logger.info("Here we go logging...")