Running conditional tasks based on input size
Thanks to conditionals, WDL supports running or skipping tasks based on a number of different factors, including the size of a file.
There are a number of reasons you might want to run a different task (or the same task with a different set of parameters; see Task aliasing) if your data is larger or smaller than a certain size, but these reasons basically boil down to the efficiency of your task and workflow. Maybe you are expecting a lot of variability in the input file size for a task, so you want to run the task with different parameters that are optimized for the different sizes. Or maybe you are using a tool that can only handle inputs up to a certain size, so you want to split your inputs across different virtual machines, but only if the inputs are above the size limit of the tool.
Whatever the reason, conditionals allow you to programmatically decide when you want to run one task versus another. In this document, you’ll see examples of code that run different tasks based on the size of the input files.
Problem
You want to run one task if an input file is below a certain size and a different task if the input file is above a certain size.
Solution
To run a different task or set of tasks based on the size of an input file, one or more conditional if()
statements can be used.
Let’s say we have a workflow that calls two tasks, task_A
and task_B
, as shown in the example WDL script below:
version 1.0
workflow myWorkflowName {
input {
File my_ref
File my_input
String name
}
call task_A {
input:
ref = my_ref,
in = my_input,
id = name
}
call task_B {
input:
ref = my_ref,
in = my_input,
id = name
}
}
task task_A {...}
task task_B {...}
We don’t want the workflow to run both task_A
and task_B
, which is what would happen if we ran it as it’s written above. Instead, we want the workflow to run only one task, and which task it runs should depend on the size of one of the input files, like the file called my_input
shown above.
To do that, we first need to tell the workflow what the cutoff size should be. We can do that by adding another input variable to the workflow called input_size_cutoff_mb
. You can hardcode a cutoff size into the workflow or allow the cutoff to be defined at execution.
We still need to tell our workflow under what conditions we want it to run each task, and we’ll do that using if()
statements along with the size()
function. Specifically, we’ll compare the size of our inputs to the cutoff size and, depending on if our input size is smaller or larger than the cutoff size, we’ll run either task_A
or task_B
. Let’s look at the example code below:
version 1.0
workflow myWorkflowName {
input {
File my_ref
File my_input
String name
# Specifies the cutoff size in megabytes for the following `if()` statements
Float input_size_cutoff_mb
}
# Only run `task_A` if the size of `my_input` in megabytes is larger than the size
# of the cutoff
if (size(my_input, “MB”) > input_size_cutoff_mb) {
call task_A {
input:
ref = my_ref,
in = my_input,
id = name
}
}
# Only run `task_B` if the size of `my_input` in megabytes is smaller than or equal
# to the size of the cutoff
if (size(my_input, “MB”) <= input_size_cutoff_mb) {
call task_B {
input:
ref = my_ref,
in = my_input,
id = name
}
}
}
task task_A {...}
task task_B {...}
Here, we are using size(my_input, “MB”)
to get the size in megabytes of our input file, then comparing it to the size of input_size_cutoff_mb
. Check out the WDL 1.0 spec to read more about using size()
in WDL.
The if()
statements specify that the workflow should run task_A
only if the size of my_input
is larger than the size of the cutoff and task_B
only if the size of my_input
is smaller than or equal to the size of the cutoff.
Example
Now, let’s look at a real-world example of a WDL workflow that checks the size of an input file and then runs one or more different tasks based on how that size compares to a cutoff. The code block below contains relevant snippets of the Optimus pipeline in the WARP GitHub repository. Note that this version of the pipeline is not currently available on the main branch of the repository and the code block below does not contain the full script.
The Optimus pipeline is an open-source, cloud-optimized pipeline that supports the processing of any 3' single-cell and single-nucleus count data generated with the 10x Genomics v2 or v3 assay. The pipeline corrects cell barcodes and UMIs, aligns reads, marks duplicates calculates summary metrics for genes and cells, detects empty droplets, returns read outputs in BAM format, and returns cell gene counts in NumPy matrix and Loom file formats. For more information, see the Optimus Overview.
version 1.0
workflow Optimus {
input {
# Mode for counting either "sc_rna" or "sn_rna"
String counting_mode = "sc_rna"
# Sequencing data inputs
Array[File] r1_fastq
Array[File] r2_fastq
Array[File]? i1_fastq
String input_id
String output_bam_basename = input_id
# organism reference parameters
File tar_star_reference
# 10x parameters
File whitelist
# tenX_v2, tenX_v3
String chemistry = "tenX_v2"
# Set to true to count reads aligned to exonic regions in sn_rna mode
Boolean count_exons = false
}
# Get the size of the input fastq files with `size()` function and round up to the nearest integer with the `ceil()` function
Int fastq_input_size = ceil(size(r1_fastq, "Gi") ) + ceil(size(r2_fastq, "Gi"))
# Check whether the combined size of the input fastq files is larger than 30 Gi
Boolean split_fastqs = if ( fastq_input_size > 30 ) then true else false
# Run the following three tasks only if `split_fastqs` is true
if ( split_fastqs ) {
# Split the input fastq into multiple fastq files
call FastqProcessing.FastqProcessing as SplitFastq {
input:
i1_fastq = i1_fastq,
r1_fastq = r1_fastq,
r2_fastq = r2_fastq,
whitelist = whitelist,
chemistry = chemistry,
sample_id = input_id
}
# Scatter the fastq files and call STARsolo in parallel
scatter(idx in range(length(SplitFastq.fastq_R1_output_array))) {
call StarAlign.STARsoloFastq as STARsoloFastq {
input:
r1_fastq = [SplitFastq.fastq_R1_output_array[idx]],
r2_fastq = [SplitFastq.fastq_R2_output_array[idx]],
white_list = whitelist,
tar_star_reference = tar_star_reference,
chemistry = chemistry,
counting_mode = counting_mode,
count_exons = count_exons,
output_bam_basename = output_bam_basename + "_" + idx
}
}
# Merge the bam files from the previous task
call Merge.MergeSortBamFiles as MergeBam {
input:
bam_inputs = STARsoloFastq.bam_output,
output_bam_filename = output_bam_basename + ".bam",
sort_order = "coordinate"
}
}
# Run the following task only if `split_fastqs` is false
if ( !split_fastqs ) {
# Call STARsolo to analyze a single fastq
call StarAlign.STARsoloFastq as STARsoloFastqSingle {
input:
r1_fastq = r1_fastq,
r2_fastq = r2_fastq,
white_list = whitelist,
tar_star_reference = tar_star_reference,
chemistry = chemistry,
counting_mode = counting_mode,
count_exons = count_exons,
output_bam_basename = output_bam_basename
}
}
}
The example above shows a slightly different way of using conditionals to run different tasks based on the size of your inputs. Here, both the size of the input files and whether that size is larger than the cutoff (30 Gi) is determined outside of the if()
statements and saved as a boolean variable, split_fastqs
. This way, you only need to evaluate and compare the input sizes a single time in the workflow, and can then use the value of the resulting boolean variable as many times as you need.
Resources
- Conditionals WDL doc
- Task aliasing WDL doc
- WDL 1.0 spec
- Optimus pipeline code and documentation