Parallelizing by scattering on an array

Imagine you have a list of files of the same type that all need to be processed identically. You could choose to pass each file one by one to your workflow and process them individually, but that would be repetitive and inefficient. Instead, you could choose to parallelize your workflow. By processing the data simultaneously instead of sequentially, you can do the same amount of work in a fraction of the time. In WDL 1.0, you can accomplish this using arrays and scatter-gather parallelism.

In this document, you’ll see examples of code that scatter input arrays to parallelize workflows.

Problem

You have several files that all need to be processed in the same way and you want to run them in parallel.

Solution

When you have multiple files that can be processed simultaneously and all need to be processed using the same steps and tools in WDL, you can scatter those files in your workflow to parallelize and speed up your work.

Let’s say we have a workflow that takes in an array of files, my_input, and calls two tasks, task_A and task_B, as shown in the example WDL script below:

version 1.0
workflow myWorkflowName {
  input {
    File my_ref
    Array[File] my_input
    String name
  }
  call task_A {
    input: 
      ref = my_ref,
      in = my_input,
      id = name     
  }
  call task_B {
    input: 
      ref = my_ref,
      in = task_A.out,
      id = name
  }
}

task task_A {...}
task task_B {...}

We want the workflow to scatter the input array before running the tasks in the workflow, but that isn’t written into the script above yet. To scatter the input array and parallelize our workflow, we can surround our task call in a scatter block. Let’s take a look at the updated workflow after we’ve added a scatter block:

version 1.0
workflow myWorkflowName {
  input {
    File my_ref
    Array[File] my_input
    String name
  }
  scatter (file in my_input) {
    call task_A {
      input: 
        ref = my_ref,
        in = file,
        id = name     
    }
  }
  call task_B {
    input: 
      ref = my_ref,
      in = task_A.out,
      id = name
  }
}

task task_A {...}
task task_B {...}

By adding the scatter block above, we’ve told our workflow that it can run task_A on each file in the array, my_input, simultaneously.

You’ll notice that we didn’t change anything about our call to task_B in the workflow when we added the scatter block around task_A. Even though the output of task_A is an array, we don’t need to explicitly tell our workflow that the input to task_B, task_A.out, is also an array. In WDL 1.0, simply referring to task_A.out indicates that you are referring to the array of those outputs.

Example

Now, let’s look at a real-world example of a WDL workflow that runs or skips tasks based on whether or not an input variable is defined. The code block below contains relevant snippets of the Imputation pipeline in the WARP GitHub repository.

The Imputation pipeline is an open-source, cloud-optimized pipeline that is based on the Michigan Imputation Server pipeline and imputes missing genotypes from either a multi-sample VCF file or an array of single sample VCF files. The pipeline filters, phases, and performs imputation on a multi-sample VCF, and returns the imputed VCF file along with several metrics. For more information, see the Imputation Overview.

The code below shows an example of scattering and gathering an input array in WDL 1.0. In this pipeline, contigs are taken in as input in the form of an array of strings, and then that array is scattered. Inside the scatter block, each contig is used to create a reference_filename, values are assigned to the struct ReferencePanelContig, and tasks.CountVariantsInChunks is called.

Later on in the workflow, another task, tasks.StoreChunksInfo, is called and takes in outputs of tasks.CountVariantsInChunks as inputs, vars_in_array and vars_in_panel. Just like the Solution above, the inputs are not explicitly defined as arrays, but because they are the outputs of a task inside a scatter block, the workflow will understand that the inputs are arrays.

version 1.0
workflow Imputation {
  input {
    File ref_dict # for reheadering / adding contig lengths in the header of the ouptut VCF, and calculating contig lengths
    Array[String] contigs
    String reference_panel_path # path to the bucket where the reference panel files are stored for all contigs

    # file extensions used to find reference panel files
    String vcf_suffix = ".vcf.gz"
    String vcf_index_suffix = ".vcf.gz.tbi"
    String bcf_suffix = ".bcf"
    String bcf_index_suffix =  ".bcf.csi"
    String m3vcf_suffix = ".cleaned.m3vcf.gz"
  }

  scatter (contig in contigs) {

    String reference_filename = reference_panel_path + "ALL.chr" + contig + ".phase3_integrated.20130502.genotypes.cleaned"

    ReferencePanelContig referencePanelContig = {
      "vcf": reference_filename + vcf_suffix,
      "vcf_index": reference_filename + vcf_index_suffix,
      "bcf": reference_filename + bcf_suffix,
      "bcf_index": reference_filename + bcf_index_suffix,
      "m3vcf": reference_filename + m3vcf_suffix,
      "contig": contig
    }

    call tasks.CountVariantsInChunks {
      input:
        vcf = select_first([OptionalQCSites.output_vcf,  GenerateChunk.output_vcf]),
        vcf_index = select_first([OptionalQCSites.output_vcf_index, GenerateChunk.output_vcf_index]),
        panel_vcf = referencePanelContig.vcf,
        panel_vcf_index = referencePanelContig.vcf_index
    }
  }

  call tasks.StoreChunksInfo {
    input:
      chroms = flatten(chunk_contig),
      starts = flatten(start),
      ends = flatten(end),
      vars_in_array = flatten(CountVariantsInChunks.var_in_original),
      vars_in_panel = flatten(CountVariantsInChunks.var_in_reference),
      valids = flatten(CheckChunks.valid),
      basename = output_callset_name
  }
}

Resources