See also
- @subdivide in the Ruffus Manual
- Decorators for more decorators
@subdivide¶
@subdivide ( input, regex(matching_regex) | formatter(matching_formatter), [ inputs (input_pattern_or_glob) | add_inputs (input_pattern_or_glob) ], output, [extras,...] )¶
Purpose:
Subdivides a set of Inputs each further into multiple Outputs.
Many to Even More operator
The number of files in each Output can be set at runtime by the use of globs
Output file names are specified using the formatter or regex indicators from input, i.e. from the output of specified tasks, or a list of file names, or a glob matching pattern.
- Additional inputs or dependencies can be added dynamically to the task:
add_inputs nests the the original input parameters in a list before adding additional dependencies.
inputs replaces the original input parameters wholescale.
Only out of date tasks (comparing input and output files) will be run.
Note
The use of split is a synonym for subdivide is deprecated.
Example:
from ruffus import * from random import randint from random import os @originate(['0.start', '1.start', '2.start']) def create_files(output_file): with open(output_file, "w"): pass # # Subdivide each of 3 start files further into [NNN1, NNN2, NNN3] number of files # where NNN1, NNN2, NNN3 are determined at run time # @subdivide(create_files, formatter(), "{path[0]}/{basename[0]}.*.step1", # Output parameter: Glob matches any number of output file names "{path[0]}/{basename[0]}") # Extra parameter: Append to this for output file names def subdivide_files(input_file, output_files, output_file_name_root): # # IMPORTANT: cleanup rubbish from previous run first # for oo in output_files: os.unlink(oo) # The number of output files is decided at run time number_of_output_files = randint(2,4) for ii in range(number_of_output_files): output_file_name = "{output_file_name_root}.{ii}.step1".format(**locals()) with open(output_file_name, "w"): pass # # Each output of subdivide_files results in a separate job for downstream tasks # @transform(subdivide_files, suffix(".step1"), ".step2") def analyse_files(input_file, output_file_name): with open(output_file_name, "w"): pass pipeline_run()>>> pipeline_run() Job = [None -> 0.start] completed Job = [None -> 1.start] completed Job = [None -> 2.start] completed Completed Task = create_files Job = [0.start -> 0.*.step1, 0] completed Job = [1.start -> 1.*.step1, 1] completed Job = [2.start -> 2.*.step1, 2] completed Completed Task = subdivide_files Job = [0.0.step1 -> 0.0.step2] completed Job = [0.1.step1 -> 0.1.step2] completed Job = [0.2.step1 -> 0.2.step2] completed Job = [1.0.step1 -> 1.0.step2] completed Job = [1.1.step1 -> 1.1.step2] completed Job = [1.2.step1 -> 1.2.step2] completed Job = [1.3.step1 -> 1.3.step2] completed Job = [2.0.step1 -> 2.0.step2] completed Job = [2.1.step1 -> 2.1.step2] completed Job = [2.2.step1 -> 2.2.step2] completed Job = [2.3.step1 -> 2.3.step2] completed Completed Task = analyse_filesParameters:
- tasks_or_file_names
can be a:
- Task / list of tasks (as in the example above).
File names are taken from the output of the specified task(s)
- (Nested) list of file name strings.
- File names containing *[]? will be expanded as a glob.
E.g.:"a.*" => "a.1", "a.2"
- matching_regex
is a python regular expression string, which must be wrapped in a regex indicator object See python regular expression (re) documentation for details of regular expression syntax
- matching_formatter
a formatter indicator object containing optionally a python regular expression (re).
- output = output
Specifies the resulting output file name(s) after string substitution
Can include glob patterns.
- extras = extras
Any extra parameters are passed verbatim to the task function
If you are using named parameters, these can be passed as a list, i.e. extras= [...]
Any extra parameters are consumed by the task function and not forwarded further down the pipeline.