Code
process basicExample {
input:
val x
"echo process job $x"
}
workflow {
def num = Channel.of(1,2,3)
basicExample(num)
}
Inputs are qualifier
and name
. qualifier
can be treated as variable type.
- val: Access the input value by name in the process script.
- path: Handle the input value as a path, staging the file properly in the execution context.
- env: Use the input value to set an environment variable in the process script.
- stdin: Forward the input value to the process stdin special file.
- tuple: Handle a group of input values having any of the above qualifiers.
- each: Execute the process for each element in the input collection.
val
process basicExample {
input:
val x
"echo process job $x"
}
workflow {
def num = Channel.of(1,2,3)
basicExample(num)
}
or
process basicExample {
input:
val x
"echo process job $x"
}
workflow {
Channel.of(1,2,3).into | basicExample
}
It is similar to path; check path instead.
path
Path can be a (groovy) list of files or directories. For example, all the fasta files (*.fa) in a directory.
process blastThemAll {
input:
path query_file # here you are in a way renaming the variable passed to the process similar to function.
"blastp -query ${query_file} -db nr"
}
workflow {
def proteins = Channel.fromPath( '/some/path/*.fa' )
blastThemAll(proteins)
}
You may also pass a specific file name. All the fastq files in a directory, are passed to blastp
as my_query.fa
.
process blastThemAll {
input query_file, name: 'my_query.fa' #here input file is named as my_query.fa
# or
input 'my_query.fa'
"blastp -query my_query.fa -db nr" #here input file is named as my_query.fa
}
workflow {
def proteins = Channel.fromPath( '/some/path/*.fa' )
blastThemAll(proteins)
}
There might be situations that you want to pass certain number of files to a process and if the number of files is not equal to the number of files you want to pass, the process will not be executed.
To address this you can use arity
parameter.
input:
path('one.txt', arity: '1') // exactly one file is expected
path('two_*.txt', arity: '2') // exactly two files are expected
path('several_*.txt', arity: '1..*') // one or more files are expected
You can also control the name of the file in task
directory by stageAs
parameter.
input:
path('one.txt', stageAs: 'my_file.txt')
# or
path x, stageAs: 'my_file.txt'
"my_command --in my_file.txt"
You can also pass multiple files and use a variable ($x in this case) to refer to the file name.
input:
path x, stageAs: 'my_dir/*.txt'
"my_command --in $x"
env
env
can be used to define a variable in the process script. Opposite to val
which is used to pass a variable across the processes.
process envExample {
input:
env my_var
"echo process job $my_var"
}
stdin
process stdinExample {
input:
stdin my_input
"echo $my_input"
}
process stdinExample {
input:
stdin str
"cat -"
}
workflow {
Channel.of('hello', 'hola', 'bonjour', 'ciao')
| map { it + '\n' } #it represent the element in the channel
| printAll
}
tuple
tuple is used to pass multiple variables to a process.
process tupleExample {
input:
tuple val(x), path('input.txt')
"""
echo "Processing $x"
cat input.txt > copy
"""
}
workflow {
Channel.of( [1, 'alpha.txt'], [2, 'beta.txt'], [3, 'delta.txt'] ) | tupleExample
}
The output will be : echo “Processing 1” cat alpha.txt > copy echo “Processing 2” cat beta.txt > copy echo “Processing 3” cat delta.txt > copy
each
Repeating a process for each element in a collection.
process alignSequences {
input:
path in1 # is sequences
each param1 # is methods
each path(dirs) # is libraries
"""
tool -in $in1 -mode $param1 -lib $dirs > result
"""
}
workflow {
in1 = Channel.fromPath('*.txt')
param1 = ['R', 'S']
dirs = [ file('dir1'), file('dir2'), file('dir3') ]
alignSequences(sequences, methods, libraries)
}
By output you can define the output of a process. Similar to input, output can also have qualifiers.
process myProcess {
input:
path input_file
output:
val x
val 'log'
val "${input_file.baseName}.out"
script:
x = input_file.name
"""
echo $x > log
cat $input_file > ${input_file.baseName}.out
"""
}
workflow {
ch_dummy = Channel.fromPath('*.csv').first()
(var, log, basename) = myProcess(ch_dummy)
var.view { "ch_var: $it" }
log.view { "ch_str: $it" }
basename.view { "ch_exp: $it" }
}
# or
workflow {
ch_dummy = Channel.fromPath('*.csv').first()
results = ch_dummy | myProcess
# Unpacking the results
results.view { result ->
def (var, log, basename) = result
println "ch_var: $var"
println "ch_str: $log"
println "ch_exp: $basename"
}
}
path
process myProcess {
input:
path input_file
output:
path "${input_file.baseName}.out"
script:
"""
echo "Processing ${input_file}" > ${input_file.baseName}.out
"""
}
workflow{
input_file = channel.fromPath('*.txt')
results = input_files | myProcess
# or
results = input_files.map { it -> myProcess(it) }
# or
# results = input_files.map { myProcess(it) }
# or
# results = input_files.map(myProcess)
results.view { results ->
def (output_file) = result
println "output file: $output_file"
}
# or
# results.view {results ->
# println "output file: £{results}"
# }
}
arity
Similar to input, you can use arity
to control the number of files to be generated.
output:
path('one.txt', arity: '1') // exactly one file is expected
path('two_*.txt', arity: '2') // exactly two files are expected
path('several_*.txt', arity: '1..*') // one or more files are expected
process myProcess {
input:
path input_file
output:
path "${input_file.baseName}.out"
path "${input_file.baseName}.log"
script:
"""
echo "Processing ${input_file}" > ${input_file.baseName}.out
echo "Log file for ${input_file}" > ${input_file.baseName}.log
"""
}
workflow{
input_file = channel.fromPath('*.txt')
results = input_files | myProcess
results.view { results ->
def (output_file, log_file) = result
println "output file: $output_file"
println "log file: $log_file"
}
}
To save files dependent on the input file name.
process mapping {
input:
val sample
path seq
output:
path "${sample}.sam"
"""
tool -in $seq > ${sample}.sam
"""
}
stdout
To output the result of a process to the stdout.
process myProcess {
input:
path input_file
output:
stdout
script:
"""
echo "Processing ${input_file}"
"""
}
eval
eval
is used to save a standard output of a command.
process myProcess {
input:
path input_file
output:
eval ('date')
script:
"""
echo "Processing ${input_file}" > result.txt
"""
}
tuple
BY using tuple
in output you can define multiple outputs as a channel. For example, you can link the output to information in metadata such as sample_name.
process myProcess {
input:
path input_file
output:
tuple val(input_file), path("${input_file.baseName}.out")
script:
"""
echo "Processing ${input_file}" > ${input_file.baseName}.out
"""
}
workflow{
input_file = channel.fromPath('*.txt')
results = input_files | myProcess
results.view { results ->
def (input_file, output_file) = result
println "input file: $input_file"
println "output file: $output_file"
}
}
emit
By using emit
you can access the output of a process in the workflow.
process myProcess {
output:
path "test.out", emit: output_file
# or
# emit output_file
script:
"""
echo "this is a test"> $test.out
"""
}
workflow{
myProcess()
myProcess.out.output_file.view()
}
optional: true | fales
It can be used to switch on or off the output. In other words to give the option to return the output or not.
process myProcess {
output:
path "test.out", optional: true
script:
"""
echo "this is a test"> $test.out
"""
}
when
is used to control the execution of a process dependent on state of given condition.
process myProcess {
input:
path input_file
output:
path "${input_file.baseName}.out"
when:
input_file.name =~ /Sample_*.txt/
script:
"""
echo "Processing ${input_file}" > ${input_file.baseName}.out
"""
}
Optional settings for a process.
accelerator
To specify a hardware.
process myProcess {
input:
path input_file
output:
path "${input_file.baseName}.out", emit: output_file
accelerator 4, type: 'nvidia-h100-mega-80gb'
script:
"""
tool --in $input_file --out ${input_file.baseName}.out
"""
}
beforeScript
or afterScript
This directives are used to run custom script before or after the main process.
process myProcess {
input:
path input_file
output:
path "${input_file.baseName}.out", emit: output_file
beforeScript:
"""
source /path/to/my/script.sh
"""
script:
"""
echo "Processing ${input_file}" > ${input_file.baseName}.out
"""
afterScript:
"""
echo "Process finished"
"""
}
cache
To cache the output of a process.
process myProcess {
cahce true # or false
# cache lenient: true # minimal input just to save metadata such as file name and size.
# cache deep: true # file's content is saved in the cache.
script:
"""
command....
"""
}
executor
To specify the executor.
- slurm - local - k8s - awsbatch - sge - pbspro - lsf - …..
process myProcess {
executor 'slurm'
//...
}
You can also create profile where you specify cluster and project.
profiles {
uppmax {
executor {
name = 'slurm'
}
process {
scratch = '$SNIC_TMP'
clusterOptions = ' -A snic2022-22-606 --mail-type=all --mail-user=nimarafati@gmail.com '
}
}
}
clusterOptions
To specify the settings for a cluster.
process myProcess {
executor 'slurm'
clusterOptions '-A snic2022-22-606 --mail-type=all'
//...
}
queue
It allows you to set the queue for a process when submitting a job when using grid based executer.
- short
for short jobs.
- long
for long jobs.
process myProcess {
queue 'short'
executor 'sge'
//...
}
resourceLabel
Using custom-name to pair the workflow to the computing resources (AWS Batch,…). This should be linked according to the structure of the cloud system.
scratch
To specify the scratch directory, where tasks output are temporarily stored.
process myProcess {
scratch '/path/to/scratch'
//...
}
shell
To specify the shell to run the process. You can have it in configuration file.
process {
shell '/bin/bash', '-euo', 'pipefail'
}
# or
process.shell = ['/bin/bash', '-euo', 'pipefail']
conda
To specify a conda environment.
process myProcess {
conda 'tool=0.1.1'
//...
}
container
To specify a container.
process myProcess {
container 'ubuntu:20.04'
//...
}
cpu
& memory
To specify the number of CPUs.
process myProcess {
cpu 4
memory '8 GB'
executor 'slurm'
script:
"""
command.... --cpu ${task.cpus}
"""
}
disk
To specify the disk space.
process myProcess {
disk '100 GB'
executor 'slurm'
//...
}
time
To specify the time limit for a process.
process myProcess {
time '8 h'
//...
}
resourceLimit
To specify the resource limit for a process.
process myProcess {
resourceLimit 'cpus: 16, memory: 8GB, time: 8.h, disk: 2GB'
//...
}
You can also specify the resource limit in the configuration file.
process {
resourceLimit 'cpus: 16, memory: 8GB, time: 8.h'
}
debug
It is explanatory, to debug the process.
process myProcess {
debug true
//...
}
errorStrategy
Tracing and handling the error in the workflow
Name | Executor |
---|---|
terminate | Terminates the execution as soon as an error condition is reported. Pending jobs are killed (default) |
finish | Initiates an orderly pipeline shutdown when an error condition is raised, waiting the completion of any submitted job. |
ignore | Ignores processes execution errors. |
retry | Re-submit for execution a process returning an error condition. |
You can use maxErrors 5
, maxRetries = 3
together with retry
.
process ignoreAnyError {
errorStrategy 'retry'
maxErrors 5
script:
<your command string here>
}
ext
To specify user custom settings.
process myProcess {
ext version: '1.0', arg: '--param1 value1 --param2 value2'
//...
}
Or in the configuration
params.star_version = '2.7.9.a'
params.star_args = '--runThreadN 8 '
label
To specify a label for a process.
process myProcess {
label 'my_label'
//...
}
You can use the same label for multiple processes. Also, you can use multiple labels for a process.
maxForks
Define how many process instances to perform in parallel.
process doNotParallelizeIt {
maxForks 1
'''
<your script here>
'''
}
module
To load a module in local machine/server.
process myProcess {
module 'BioPerl:blast'
//...
}
penv
To specify parallel environments.
process myProcess {
cpu 4
penv 'openmpi'
executor 'slurm'
//...
}
puhlishDir
To specify the directory to publish the output. Other options to set:
- enabled
to enable or disable the publishing.
- failOnError
to stop the workflow if the publishing fails.
- mode
to specify the mode of publishing:
- copy
to copy the files to the publish directory.
- link
to create a symbolic link to the files in the publish directory.
- move
to move the files to the publish directory.
- rellink
to create a relative symbolic link to the files in the publish directory.
- symlink
to create a symbolic link to the files in the publish directory.
- path
to specify the path to the publish directory. publishDir path: '/path/results/'
- pattern
to specify the pattern of the files to publish. publishDir pattern: '*.txt'
- saveAs
to specify the name of the file in the publish directory. publishDir saveAs: 'my_file.txt'
-
process myProcess {
publishDir 'results', mode: 'copy', overwrite: true
//...
}
tag
To specify a tag for a every analysis in process which will be saved in the log file. The difference with label
is that label
is used to label a group of analysis in processes.
process myProcess {
tag '$sample'
input:
val sample
script
"""
echo "Processing $sample"
"""
}
workflow {
Channel.of('sampleA', 'sampleB', 'sampleC') | myProcess
}
This will give output as:
Launching `myProcess` [sampleA]
Launching `myProcess` [sampleB]
Launching `myProcess` [sampleC]
You can instruct the process to dynamially change the settings dependent on the input. In following example the queue is set to short
if the input file is smaller than 1MB.
process myProcess {
input:
path input_file
queue {input_file.size < 1.MB ? 'short', 'long'}
script:
"""
echo "Processing ${input_file} on queue ${task.queue}"
"""
}
Or for computing depedenet on the process labels.
process FASTQC {
tag "$meta.id"
label 'process_medium'
//...
}
process TRIMMOMATIC {
tag "$meta.id"
label 'process_low'
//...
}
process {
withlabel: 'process_medium' {
cpus = { check_max( 2 * task.attempt, 'cpus' ) }
memory = { check_max( 12.GB * task.attempt, 'memory' ) }
time = { check_max( 4.h * task.attempt, 'time' ) }
}
withlabel: 'process_low' {
cpus = { check_max( 1 * task.attempt, 'cpus' ) }
memory = { check_max( 6.GB * task.attempt, 'memory' ) }
time = { check_max( 2.h * task.attempt, 'time' ) }
}
}
You can set the retry dynamically dependent on the availability of ther resources (e.g. network congestion,….).
process {
errorStrategy = { task.exitStatus in [143,137,104,134,139] ? 'retry' : 'finish' }
maxRetries = 1
maxErrors = '-1'
}
There are two channel types: - Queue channel: A channel that emits a sequence of values. This channel can be created by:
- factory methods: of
, fromPath
- operators: map
, flatMap
- processes: see above
- Value channel: A channel that emits a single value. This channel can be created by:
- factory methods: value
- operators: first
, collect
, reduce
channel.empty
Creating an empty channel without emitting any value.
channel.from
Creating a channel from a list of values.
ch = Channel.from( 'alpha', 'beta', 'delta' )
ch.subscribe { println "Got: $it" }
subscribe
a function for the channel printing the emitted valueschannel.fromList
Creating a channel from a list of values.
ch = Channel.fromList( ['alpha', 'beta', 'delta'] )
ch.view()
# Or
channel.
fromList( ['alpha', 'beta', 'delta'] )
.view { println "Got: $it" }
Output:
Got: alpha
Got: beta
Got: delta
channel.fromPath
Emitting file(s) from a path. NOTE: does not check whether the file exists.
myFileChannel = Channel.fromPath( '/data/file.txt' )
pairFiles = Channel.fromPath( 'data/file_{1,2}.fq' )
expl3 = Channel.fromPath( '/path/*', hidden: true ) # search for hidden files.
myFileChannel = Channel.fromPath( '/path/*b', type: 'dir' ) # Listing DIRECTORIES ending with b
myFileChannel = Channel.fromPath( '/path/a*', type: 'any' ) # Listing files and directories starting with a
Channel.fromPath( ['/some/path/*.txt', '/other/path/*.tx'] ) # You can pass multiple formats to channel
** checkes directories for given files too.
Parametrs:
- maxDepth
: Maximum number of directory levels to visit (default: no limit)
- followLinks
: When true it follows symbolic links during directories tree traversal, otherwise they are managed as files (default: true)
- relative
: When true returned paths are relative to the top-most common directory (default: false)
- checkIfExists
: When true throws an exception of the specified path do not exist in the file system (default: false)
channel.fromFilePairs
Emitting pair files (read1/read2).
Channel
.fromFilePairs('/domus/h1/nimar/nextflow_tutorial/fastq/*{1,2}.fq')
.view()
# [HG00100, [/domus/h1/nimar/nextflow_tutorial/fastq/HG00100_1.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00100_2.fq]]
# [HG00097, [/domus/h1/nimar/nextflow_tutorial/fastq/HG00097_1.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00097_2.fq]]
# [HG00101, [/domus/h1/nimar/nextflow_tutorial/fastq/HG00101_1.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00101_2.fq]]
#
Channel
.fromFilePairs('/some/data/*', size: -1) { file -> file.extension }
.view { ext, files -> "Files with the extension $ext are $files" }
# Files with the extension fq are [/domus/h1/nimar/nextflow_tutorial/fastq/HG00097_1.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00097_2.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00100_1.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00100_2.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00101_1.fq, /domus/h1/nimar/nextflow_tutorial/fastq/HG00101_2.fq]
-1
means any file->
in groovy is used to define a closure ({}
) which is essentially an anonymous function (see below for further information).{ file -> file.extension }
: This is a closure that takes each file matched by the pattern and returns its extension. The file parameter represents each file, and file.extension gets its extension. This closure is used to group the files by their extensions.channel.fromSRA
Emitting files from SRA.
ids = ['ERR908507', 'ERR908506', 'ERR908505']
Channel
.fromSRA(ids)
.view()
Output:
[SRR1448794, /vol1/fastq/SRR144/004/SRR1448794/SRR1448794.fastq.gz]
[SRR1448795, /vol1/fastq/SRR144/005/SRR1448795/SRR1448795.fastq.gz]
[SRR1448796, /vol1/fastq/SRR144/006/SRR1448796/SRR1448796.fastq.gz]
channel.of
Creating a channel from a list of values.
Channel
.of( 'alpha', 'beta', 'delta' )
.subscribe { println "Got: $it" }
Output:
Got: alpha
Got: beta
Got: delta
or ranges:
Channel
.of( 1..22, X..Y )
.view()
Output:
1
2
:
22
X
Y
channel.topic
It is a channel type by which you can connect multiple processes.
process proc1 {
val x, topic: 'my_topic'
script:
"""
echo "Processing $x"
"""
}
process proc2 {
val x, topic: 'my_topic'
script:
"""
echo "Processing $x"
"""
}
channel
.topic('my_topic')
.view()
channel.value
Creating a value channel by which you can bind the channel to a specific variable.
workflow_summary = WorkflowSpatialxe.paramsSummaryMultiqc(workflow, summary_params)
ch_workflow_summary = Channel.value(workflow_summary)
channel.watchPath
Check the presence of file with certain events which should be specified (create
, modify
, delete
).
Channel
.watchPath( '/path/*.fa', 'create, modify' )
.subscribe { println "Fasta file: $it" }
The watchPath factory waits endlessly for files that match the specified pattern and event(s), which means that it will cause your pipeline to run forever. Consider using the until operator to close the channel when a certain condition is met (e.g. receiving a file named DONE).
mix
emits the emited outputs of multiple channels.
c1 = Channel.from( 'alpha', 'beta', 'delta' )
c2 = Channel.from( 'one', 'two', 'three' )
c1.mix( c2 ).view()
take
Take is similar to input but when it is used the beignning of the workflow begins with main
workflow {
take:
myChannel
main:
myProcess(myChannel)
}
->
The notation ->
in groovy is used to define a closure ({}
) which is essentially an anonymous function (see below for further information).
Basic usage:
{param1, param2 ->
// code here
}
{}
body of the closureparam1, param2
parameters of the closure->
separates the parameters from the body of the closure. Example:def myClosure = {param1, param2 ->
println "param1: $param1"
println "param2: $param2"
}
myClosure('hello', 'world')
Output:
param1: hello
param2: world
You define a new variable in groovy by `String variable -> println “${variable}; ${variable.size()}”.
Channel
.from( 'alpha', 'beta', 'lambda' )
.subscribe { String str ->
println "Got: ${str}; len: ${str.size()}"
}
LinkedHashMap
A LinkedHashMap
is a collection that maintains the order of its elements. When you iterate over a LinkedHashMap
, the elements are returned in the order they were inserted.
def create_fastq_channel(LinkedHashMap row) {
// create meta map
def meta = [:]
meta.id = row.sample
meta.single_end = row.single_end.toBoolean()
// add path(s) of the fastq file(s) to the meta map
def fastq_meta = []
if (!file(row.fastq_1).exists()) {
exit 1, "ERROR: Please check input samplesheet -> Read 1 FastQ file does not exist!\n${row.fastq_1}"
}
if (meta.single_end) {
fastq_meta = [ meta, [ file(row.fastq_1) ] ]
} else {
if (!file(row.fastq_2).exists()) {
exit 1, "ERROR: Please check input samplesheet -> Read 2 FastQ file does not exist!\n${row.fastq_2}"
}
fastq_meta = [ meta, [ file(row.fastq_1), file(row.fastq_2) ] ]
}
return fastq_meta
}
In general nextflow pipeline is orchestrated by main.nf file. This file is the entry point of the pipeline.
and then nextflow.config file. This file contains the configuration of the pipeline.
Suggested order of the file to look:
1. main.nf: Understand the primary workflow structure.
2. nextflow.config: Learn about configurations and profiles.
3. modules: Explore the individual modules for different tasks.
4. subworkflows: Check for any reusable sub-workflows used in the main workflow.
params.input = 'data/*'
params.outdir = 'results'
workflow.onComplete {
println "Workflow completed"
}
params.input
: Specifies the input data files.params.outdir
: Defines the output directory.workflow.onComplete
: A block to execute when the workflow finishes.process process_data {
input:
path input_file from Channel.fromPath(params.input)
output:
path "${input_file.baseName}.processed"
script:
"""
process_tool -i $input_file -o ${input_file.baseName}.processed
"""
}
process_data
: A process definition that takes an input file, processes it, and outputs the result.input_file
: Specify the input channel.output
: Defines the output path.script
: The shell command or script to run.workflow {
files = Channel.fromPath(params.input)
processed_files = files | process_data
processed_files.view {
println "Processed file: $it"
}
}
workflow
: Defines the workflow structure.files
: A channel created from the input files.processed_files
: The result of processing the files through process_data
.processed_files.view
: Outputs the processed files for logging.main.nf may refer to some functions which can be defined in a groovy class.
In Groovy, a class is a blueprint for creating objects, encapsulating data for the object, and methods to manipulate that data. Groovy classes are similar to Java classes but with some syntactic sugar that makes them easier to work with.
class Person {
String name
int age
void sayHello() {
println "Hello, my name is $name and I am $age years old."
}
}
// Creating an instance of the class
def person = new Person(name: 'John', age: 30)
person.sayHello()
Key Points - Properties: String name
and int age
are properties of the Person
class.
- Methods: sayHello
is a method that prints a greeting message.
- Instance Creation: new Person(name: 'John', age: 30)
creates an instance of the Person
class with specified properties.
- Method Call: person.sayHello()
calls the sayHello
method on the person object.
public
Access Modifier: Specifies that the member (field, method, or class) is accessible from any other class.
public class Person {
public String name
public int age
public void sayHello() {
println "Hello, my name is $name and I am $age years old."
}
}
void
Return Type: Indicates that the method does not return any value.
void sayHello() {
println "Hello"
}
static
Modifier: Means that the member belongs to the class, rather than instances of the class. Static members can be accessed without creating an instance of the class.
static String greet() {
return "Hello"
}