From ecda528fbff521a0fdfbf3c87340e9edf4facf8f Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Fri, 6 Dec 2024 11:49:05 -0600 Subject: [PATCH] FIx issues with dynamic publish path and index file Signed-off-by: Ben Sherman --- docs/workflow.md | 64 +++++---- .../nextflow/extension/PublishOp.groovy | 129 +++++++++++++----- .../groovy/nextflow/script/OutputDsl.groovy | 10 +- tests/output-dsl.nf | 88 +++++++----- 4 files changed, 185 insertions(+), 106 deletions(-) diff --git a/docs/workflow.md b/docs/workflow.md index bfc99c1f8d..17c5d19f4b 100644 --- a/docs/workflow.md +++ b/docs/workflow.md @@ -465,7 +465,7 @@ results/ Target names cannot begin or end with a slash (`/`). ::: -By default, all files emitted by a published channel will be published into the specified directory. If a channel emits list values, each file in the list (including nested lists) will be published. For example: +By default, all files emitted by a published channel will be published into the specified directory. Lists and maps are recursively scanned for nested files. For example: ```nextflow workflow { @@ -503,12 +503,12 @@ workflow { } output { - 'foo' { + foo { enabled params.save_foo path 'intermediates/foo' } - 'bar' { + bar { mode 'copy' } } @@ -533,15 +533,17 @@ The `path` directive in a target block can also be a closure which defines a cus ```nextflow workflow { main: - ch_fastq = Channel.of( [ [id: 'SAMP1'], file('1.fastq'), file('2.fastq') ] ) + ch_samples = Channel.of( + [id: 'SAMP1', fastq_1: file('1.fastq'), fastq_1: file('2.fastq')] + ) publish: - ch_fastq >> 'fastq' + ch_samples >> 'samples' } output { - 'fastq' { - path { meta, fastq_1, fastq_2 -> "fastq/${meta.id}" } + samples { + path { sample -> "fastq/${sample.id}" } } } ``` @@ -552,19 +554,15 @@ The closure can even define a different path for each individual file by returni ```nextflow output { - 'fastq' { - path { meta, fastq_1, fastq_2 -> - { file -> "fastq/${meta.id}/${file.baseName}" } + samples { + path { sample -> + { filename -> "fastq/${sample.id}/${filename}" } } } } ``` -The inner closure will be applied to each file in the channel value, in this case `fastq_1` and `fastq_2`. - -:::{tip} -A mapping closure should usually have only one parameter. However, if the incoming values are tuples, the closure can specify a parameter for each tuple element for more convenient access, also known as "destructuring" or "unpacking". -::: +The inner closure will be applied to each file in the channel value, in this case `sample.fastq_1` and `sample.fastq_2`. ### Index files @@ -575,29 +573,29 @@ For example: ```nextflow workflow { main: - ch_fastq = Channel.of( - [ [id: 1, name: 'sample 1'], '1a.fastq', '1b.fastq' ], - [ [id: 2, name: 'sample 2'], '2a.fastq', '2b.fastq' ], - [ [id: 3, name: 'sample 3'], '3a.fastq', '3b.fastq' ] + ch_samples = Channel.of( + [id: 1, name: 'sample 1', fastq_1: '1a.fastq', fastq_2: '1b.fastq'], + [id: 2, name: 'sample 2', fastq_1: '2a.fastq', fastq_2: '2b.fastq'], + [id: 3, name: 'sample 3', fastq_1: '3a.fastq', fastq_2: '3b.fastq'] ) publish: - ch_fastq >> 'fastq' + ch_samples >> 'samples' } output { - 'fastq' { + samples { + path 'fastq' index { - path 'index.csv' + path 'samples.csv' } } } ``` -The above example will write the following CSV file to `results/fastq/index.csv`: +The above example will write the following CSV file to `results/samples.csv`: -```csv -"id","name","fastq_1","fastq_2" +``` "1","sample 1","results/fastq/1a.fastq","results/fastq/1b.fastq" "2","sample 2","results/fastq/2a.fastq","results/fastq/2b.fastq" "3","sample 3","results/fastq/3a.fastq","results/fastq/3b.fastq" @@ -607,14 +605,20 @@ You can customize the index file with additional directives, for example: ```nextflow index { - path 'index.csv' - header ['id', 'fastq_1', 'fastq_1'] - sep '\t' - mapper { meta, fq_1, fq_2 -> meta + [fastq_1: fq_1, fastq_2: fq_2] } + path 'samples.csv' + header true + sep '|' } ``` -This example will produce the same index file as above, but with the `name` column removed and with tabs instead of commas. +This example will produce the following index file. + +``` +"id"|"name"|"fastq_1"|"fastq_2" +"1"|"sample 1"|"results/fastq/1a.fastq"|"results/fastq/1b.fastq" +"2"|"sample 2"|"results/fastq/2a.fastq"|"results/fastq/2b.fastq" +"3"|"sample 3"|"results/fastq/3a.fastq"|"results/fastq/3b.fastq" +``` See [Reference](#reference) for the list of available index directives. diff --git a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy index 5c0a454858..4978b455c2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy @@ -18,7 +18,6 @@ package nextflow.extension import java.nio.file.Path -import groovy.json.JsonOutput import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import groovyx.gpars.dataflow.DataflowReadChannel @@ -27,8 +26,9 @@ import nextflow.Session import nextflow.processor.PublishDir import nextflow.util.CsvWriter /** - * Publish files from a source channel. + * Publish a workflow output. * + * @author Ben Sherman * @author Paolo Di Tommaso */ @Slf4j @@ -39,9 +39,9 @@ class PublishOp { private Map opts - private Path targetDir + private String path - private Closure pathAs + private Closure dynamicPath private IndexOpts indexOpts @@ -54,11 +54,11 @@ class PublishOp { PublishOp(DataflowReadChannel source, Map opts) { this.source = source this.opts = opts - this.targetDir = opts.path as Path - if( opts.pathAs instanceof Closure ) - this.pathAs = opts.pathAs as Closure + this.path = opts.path as String + if( opts.dynamicPath instanceof Closure ) + this.dynamicPath = opts.dynamicPath as Closure if( opts.index ) - this.indexOpts = new IndexOpts(targetDir, opts.index as Map) + this.indexOpts = new IndexOpts(session.outputDir, opts.index as Map) } boolean getComplete() { complete } @@ -71,23 +71,29 @@ class PublishOp { return this } + /** + * For each incoming value, perform the following: + * + * 1. Publish any files contained in the value + * 2. Append a record to the index file for the value (if enabled) + * + * @param value + */ protected void onNext(value) { log.trace "Publish operator received: $value" // evaluate dynamic path - final path = pathAs != null - ? targetDir.resolve(pathAs.call(value)) - : targetDir - if( path == null ) + final targetDirOrClosure = getTargetDir(value) + if( targetDirOrClosure == null ) return // emit workflow publish event session.notifyWorkflowPublish(value) // create publisher - final overrides = path instanceof Closure - ? [saveAs: path] - : [path: path] + final overrides = targetDirOrClosure instanceof Closure + ? [saveAs: targetDirOrClosure] + : [path: targetDirOrClosure] final publisher = PublishDir.create(opts + overrides) // publish files @@ -98,26 +104,65 @@ class PublishOp { publisher.apply(files, sourceDir) } - // create record for index file + // append record to index file if( indexOpts ) { final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value - final normalized = normalizePaths(record) + final normalized = normalizePaths(record, targetDirOrClosure) log.trace "Normalized record for index file: ${normalized}" indexRecords << normalized } } + /** + * Compute the target directory for a published value: + * + * - if the publish path is a string, resolve it against + * the base output directory + * + * - if the publish path is a closure that returns a string, + * invoke it on the published value and resolve the returned + * string against the base output directory + * + * - if the publish path is a closure that returns a closure, + * invoke it on the published value and wrap the returned + * closure in a closure that resolves the relative path against + * the base output directory + * + * @param value + * @return Path | Closure + */ + protected Object getTargetDir(value) { + final outputDir = session.outputDir + if( dynamicPath == null ) + return outputDir.resolve(path) + final relativePath = dynamicPath.call(value) + if( relativePath == null ) + return null + return relativePath instanceof Closure + ? { file -> outputDir.resolve(relativePath.call(file) as String) } + : outputDir.resolve(relativePath as String) + } + + /** + * Once all values have been published, write the + * index file (if enabled). + */ protected void onComplete(nope) { if( indexOpts && indexRecords.size() > 0 ) { log.trace "Saving records to index file: ${indexRecords}" - final ext = indexOpts.path.getExtension() - if( ext == 'csv' ) - new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path) - else if( ext == 'json' ) - indexOpts.path.text = DumpHelper.prettyPrint(indexRecords) - else - log.warn "Invalid extension '${ext}' for index file '${indexOpts.path}' -- should be 'csv' or 'json'" - session.notifyFilePublish(indexOpts.path) + final indexPath = indexOpts.path + final ext = indexPath.getExtension() + indexPath.parent.mkdirs() + if( ext == 'csv' ) { + new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath) + } + else if( ext == 'json' ) { + indexPath.text = DumpHelper.prettyPrint(indexRecords) + } + else { + log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be 'csv' or 'json'" + } + session.notifyFilePublish(indexPath) } log.trace "Publish operator complete" @@ -141,6 +186,10 @@ class PublishOp { for( final el : value ) collectFiles(result, el) } + else if( value instanceof Map ) { + for( final entry : value.entrySet() ) + collectFiles(result, entry.value) + } return result } @@ -149,37 +198,43 @@ class PublishOp { * work directory paths to publish paths. * * @param value + * @param targetDirOrClosure */ - protected Object normalizePaths(value) { + protected Object normalizePaths(value, targetDirOrClosure) { if( value instanceof Path ) { - return List.of(value.getBaseName(), normalizePath(value)) + return List.of(value.getBaseName(), normalizePath(value, targetDirOrClosure)) } if( value instanceof Collection ) { return value.collect { el -> if( el instanceof Path ) - return normalizePath(el) + return normalizePath(el, targetDirOrClosure) if( el instanceof Collection ) - return normalizePaths(el) + return normalizePaths(el, targetDirOrClosure) return el } } if( value instanceof Map ) { - return value.collectEntries { k, v -> - if( v instanceof Path ) - return List.of(k, normalizePath(v)) - if( v instanceof Collection ) - return List.of(k, normalizePaths(v)) - return List.of(k, v) - } + return value + .findAll { k, v -> v != null } + .collectEntries { k, v -> + if( v instanceof Path ) + return List.of(k, normalizePath(v, targetDirOrClosure)) + if( v instanceof Collection ) + return List.of(k, normalizePaths(v, targetDirOrClosure)) + return List.of(k, v) + } } throw new IllegalArgumentException("Index file record must be a list, map, or file: ${value} [${value.class.simpleName}]") } - private Path normalizePath(Path path) { + private Path normalizePath(Path path, targetDirOrClosure) { + if( targetDirOrClosure instanceof Closure ) + return (targetDirOrClosure.call(path.getName()) as Path).normalize() final sourceDir = getTaskDir(path) + final targetDir = targetDirOrClosure as Path return targetDir.resolve(sourceDir.relativize(path)).normalize() } diff --git a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy index 79ed13cde0..80943d8a30 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/OutputDsl.groovy @@ -73,7 +73,7 @@ class OutputDsl { // validate target configs for( final name : targetConfigs.keySet() ) { if( name !in publishSources ) - log.warn "Publish target '${name}' was defined in the output block but not used by the workflow" + log.warn "Workflow output '${name}' was declared in the output block but not assigned in the workflow" } // create publish op (and optional index op) for each target @@ -99,11 +99,11 @@ class OutputDsl { final path = opts.path as String ?: name if( path.startsWith('/') || path.endsWith('/') ) - throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should not contain a leading or trailing slash") - opts.path = session.outputDir.resolve(path) + throw new ScriptRuntimeException("Invalid path '${path}' for workflow output '${name}' -- it should not contain a leading or trailing slash") + opts.path = path if( opts.index && !(opts.index as Map).path ) - throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option") + throw new ScriptRuntimeException("Index file definition for workflow output '${name}' is missing `path` option") return opts } @@ -162,7 +162,7 @@ class OutputDsl { void path(Closure value) { setOption('path', '.') - setOption('pathAs', value) + setOption('dynamicPath', value) } void storageClass(String value) { diff --git a/tests/output-dsl.nf b/tests/output-dsl.nf index f3ed165185..be1f53a166 100644 --- a/tests/output-dsl.nf +++ b/tests/output-dsl.nf @@ -16,75 +16,95 @@ */ nextflow.preview.output = true -params.save_foo = true +params.save_bam_bai = false -process align { +process fastqc { input: - val(x) + val id output: - path("*.bam") - path("${x}.bai") + tuple val(id), path('*.fastqc.log') script: """ - echo ${x} > ${x}.bam - echo ${x} | rev > ${x}.bai + echo ${id} > ${id}.fastqc.log """ } -process my_combine { +process align { input: - path(bamfile) - path(baifile) + val id output: - path 'result.txt' + tuple val(id), path('*.bam') + tuple val(id), path('*.bai') script: """ - cat $bamfile > result.txt - cat $baifile >> result.txt + echo ${id} > ${id}.bam + echo ${id} | rev > ${id}.bai """ } -process foo { +process quant { + input: + val id + output: - path 'xxx' + tuple val(id), path('quant') script: ''' - mkdir xxx - touch xxx/A - touch xxx/B - touch xxx/C + mkdir quant + touch quant/cmd_info.json + touch quant/lib_format_counts.json + touch quant/quant.sf ''' } workflow { main: - input = Channel.of('alpha','beta','delta') - align(input) + ids = Channel.of('1', '2', '3') + ch_fastqc = fastqc(ids) + (ch_bam, ch_bai) = align(ids) + ch_quant = quant(ids) - bams = align.out[0].toSortedList { bam -> bam.name } - bais = align.out[1].toSortedList { bai -> bai.name } - my_combine( bams, bais ) - my_combine.out.view { it -> it.text } - - foo() + ch_samples = ch_fastqc + .join(ch_bam) + .join(ch_bai) + .join(ch_quant) + .map { id, fastqc, bam, bai, quant -> + [ + id: id, + fastqc: fastqc, + bam: params.save_bam_bai ? bam : null, + bai: params.save_bam_bai ? bai : null, + quant: quant + ] + } publish: - align.out >> 'data' - my_combine.out >> 'more/data' - foo.out >> (params.save_foo ? 'data' : null) + ch_samples >> 'samples' } output { - 'data' { - path { val -> { file -> file } } + samples { + path { sample -> + def dirs = [ + 'bam': 'align', + 'bai': 'align', + 'log': 'fastqc' + ] + return { filename -> + def ext = filename.tokenize('.').last() + def dir = dirs[ext] + dir != null + ? "${dir}/${filename}" + : "${filename}/${sample.id}" + } + } index { - path 'index.csv' - mapper { val -> [filename: val] } + path 'samples.csv' header true sep ',' }