Chapter 10: Python Code for Checkpointing: Interrupted Pipelines and Exceptions

Code for the “Interrupting tasks” example

from ruffus import *

from ruffus import *
import sys, time

#   create initial files
@originate(['job1.start'])
def create_initial_files(output_file):
    with open(output_file, "w") as oo: pass


#---------------------------------------------------------------
#
#   long task to interrupt
#
@transform(create_initial_files, suffix(".start"), ".output")
def long_task(input_files, output_file):
    with open(output_file, "w") as ff:
        ff.write("Unfinished...")
        # sleep for 2 seconds here so you can interrupt me
        sys.stderr.write("Job started. Press ^C to interrupt me now...\n")
        time.sleep(2)
        ff.write("\nFinished")
        sys.stderr.write("Job completed.\n")


#       Run
pipeline_run([long_task])