Chapter 10: Python Code for Checkpointing: Interrupted Pipelines and Exceptions¶
See also
Code for the “Interrupting tasks” example¶
from ruffus import * from ruffus import * import sys, time # create initial files @originate(['job1.start']) def create_initial_files(output_file): with open(output_file, "w") as oo: pass #--------------------------------------------------------------- # # long task to interrupt # @transform(create_initial_files, suffix(".start"), ".output") def long_task(input_files, output_file): with open(output_file, "w") as ff: ff.write("Unfinished...") # sleep for 2 seconds here so you can interrupt me sys.stderr.write("Job started. Press ^C to interrupt me now...\n") time.sleep(2) ff.write("\nFinished") sys.stderr.write("Job completed.\n") # Run pipeline_run([long_task])