Skip to content

Commit

Permalink
FIx issues with dynamic publish path and index file
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed Dec 6, 2024
1 parent 062939e commit ecda528
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 106 deletions.
64 changes: 34 additions & 30 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -465,7 +465,7 @@ results/
Target names cannot begin or end with a slash (`/`).
:::

By default, all files emitted by a published channel will be published into the specified directory. If a channel emits list values, each file in the list (including nested lists) will be published. For example:
By default, all files emitted by a published channel will be published into the specified directory. Lists and maps are recursively scanned for nested files. For example:

```nextflow
workflow {
Expand Down Expand Up @@ -503,12 +503,12 @@ workflow {
}
output {
'foo' {
foo {
enabled params.save_foo
path 'intermediates/foo'
}
'bar' {
bar {
mode 'copy'
}
}
Expand All @@ -533,15 +533,17 @@ The `path` directive in a target block can also be a closure which defines a cus
```nextflow
workflow {
main:
ch_fastq = Channel.of( [ [id: 'SAMP1'], file('1.fastq'), file('2.fastq') ] )
ch_samples = Channel.of(
[id: 'SAMP1', fastq_1: file('1.fastq'), fastq_1: file('2.fastq')]
)
publish:
ch_fastq >> 'fastq'
ch_samples >> 'samples'
}
output {
'fastq' {
path { meta, fastq_1, fastq_2 -> "fastq/${meta.id}" }
samples {
path { sample -> "fastq/${sample.id}" }
}
}
```
Expand All @@ -552,19 +554,15 @@ The closure can even define a different path for each individual file by returni

```nextflow
output {
'fastq' {
path { meta, fastq_1, fastq_2 ->
{ file -> "fastq/${meta.id}/${file.baseName}" }
samples {
path { sample ->
{ filename -> "fastq/${sample.id}/${filename}" }
}
}
}
```

The inner closure will be applied to each file in the channel value, in this case `fastq_1` and `fastq_2`.

:::{tip}
A mapping closure should usually have only one parameter. However, if the incoming values are tuples, the closure can specify a parameter for each tuple element for more convenient access, also known as "destructuring" or "unpacking".
:::
The inner closure will be applied to each file in the channel value, in this case `sample.fastq_1` and `sample.fastq_2`.

### Index files

Expand All @@ -575,29 +573,29 @@ For example:
```nextflow
workflow {
main:
ch_fastq = Channel.of(
[ [id: 1, name: 'sample 1'], '1a.fastq', '1b.fastq' ],
[ [id: 2, name: 'sample 2'], '2a.fastq', '2b.fastq' ],
[ [id: 3, name: 'sample 3'], '3a.fastq', '3b.fastq' ]
ch_samples = Channel.of(
[id: 1, name: 'sample 1', fastq_1: '1a.fastq', fastq_2: '1b.fastq'],
[id: 2, name: 'sample 2', fastq_1: '2a.fastq', fastq_2: '2b.fastq'],
[id: 3, name: 'sample 3', fastq_1: '3a.fastq', fastq_2: '3b.fastq']
)
publish:
ch_fastq >> 'fastq'
ch_samples >> 'samples'
}
output {
'fastq' {
samples {
path 'fastq'
index {
path 'index.csv'
path 'samples.csv'
}
}
}
```

The above example will write the following CSV file to `results/fastq/index.csv`:
The above example will write the following CSV file to `results/samples.csv`:

```csv
"id","name","fastq_1","fastq_2"
```
"1","sample 1","results/fastq/1a.fastq","results/fastq/1b.fastq"
"2","sample 2","results/fastq/2a.fastq","results/fastq/2b.fastq"
"3","sample 3","results/fastq/3a.fastq","results/fastq/3b.fastq"
Expand All @@ -607,14 +605,20 @@ You can customize the index file with additional directives, for example:

```nextflow
index {
path 'index.csv'
header ['id', 'fastq_1', 'fastq_1']
sep '\t'
mapper { meta, fq_1, fq_2 -> meta + [fastq_1: fq_1, fastq_2: fq_2] }
path 'samples.csv'
header true
sep '|'
}
```

This example will produce the same index file as above, but with the `name` column removed and with tabs instead of commas.
This example will produce the following index file.

```
"id"|"name"|"fastq_1"|"fastq_2"
"1"|"sample 1"|"results/fastq/1a.fastq"|"results/fastq/1b.fastq"
"2"|"sample 2"|"results/fastq/2a.fastq"|"results/fastq/2b.fastq"
"3"|"sample 3"|"results/fastq/3a.fastq"|"results/fastq/3b.fastq"
```

See [Reference](#reference) for the list of available index directives.

Expand Down
129 changes: 92 additions & 37 deletions modules/nextflow/src/main/groovy/nextflow/extension/PublishOp.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package nextflow.extension

import java.nio.file.Path

import groovy.json.JsonOutput
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import groovyx.gpars.dataflow.DataflowReadChannel
Expand All @@ -27,8 +26,9 @@ import nextflow.Session
import nextflow.processor.PublishDir
import nextflow.util.CsvWriter
/**
* Publish files from a source channel.
* Publish a workflow output.
*
* @author Ben Sherman <[email protected]>
* @author Paolo Di Tommaso <[email protected]>
*/
@Slf4j
Expand All @@ -39,9 +39,9 @@ class PublishOp {

private Map opts

private Path targetDir
private String path

private Closure pathAs
private Closure dynamicPath

private IndexOpts indexOpts

Expand All @@ -54,11 +54,11 @@ class PublishOp {
PublishOp(DataflowReadChannel source, Map opts) {
this.source = source
this.opts = opts
this.targetDir = opts.path as Path
if( opts.pathAs instanceof Closure )
this.pathAs = opts.pathAs as Closure
this.path = opts.path as String
if( opts.dynamicPath instanceof Closure )
this.dynamicPath = opts.dynamicPath as Closure
if( opts.index )
this.indexOpts = new IndexOpts(targetDir, opts.index as Map)
this.indexOpts = new IndexOpts(session.outputDir, opts.index as Map)
}

boolean getComplete() { complete }
Expand All @@ -71,23 +71,29 @@ class PublishOp {
return this
}

/**
* For each incoming value, perform the following:
*
* 1. Publish any files contained in the value
* 2. Append a record to the index file for the value (if enabled)
*
* @param value
*/
protected void onNext(value) {
log.trace "Publish operator received: $value"

// evaluate dynamic path
final path = pathAs != null
? targetDir.resolve(pathAs.call(value))
: targetDir
if( path == null )
final targetDirOrClosure = getTargetDir(value)
if( targetDirOrClosure == null )
return

// emit workflow publish event
session.notifyWorkflowPublish(value)

// create publisher
final overrides = path instanceof Closure
? [saveAs: path]
: [path: path]
final overrides = targetDirOrClosure instanceof Closure
? [saveAs: targetDirOrClosure]
: [path: targetDirOrClosure]
final publisher = PublishDir.create(opts + overrides)

// publish files
Expand All @@ -98,26 +104,65 @@ class PublishOp {
publisher.apply(files, sourceDir)
}

// create record for index file
// append record to index file
if( indexOpts ) {
final record = indexOpts.mapper != null ? indexOpts.mapper.call(value) : value
final normalized = normalizePaths(record)
final normalized = normalizePaths(record, targetDirOrClosure)
log.trace "Normalized record for index file: ${normalized}"
indexRecords << normalized
}
}

/**
* Compute the target directory for a published value:
*
* - if the publish path is a string, resolve it against
* the base output directory
*
* - if the publish path is a closure that returns a string,
* invoke it on the published value and resolve the returned
* string against the base output directory
*
* - if the publish path is a closure that returns a closure,
* invoke it on the published value and wrap the returned
* closure in a closure that resolves the relative path against
* the base output directory
*
* @param value
* @return Path | Closure<Path>
*/
protected Object getTargetDir(value) {
final outputDir = session.outputDir
if( dynamicPath == null )
return outputDir.resolve(path)
final relativePath = dynamicPath.call(value)
if( relativePath == null )
return null
return relativePath instanceof Closure
? { file -> outputDir.resolve(relativePath.call(file) as String) }
: outputDir.resolve(relativePath as String)
}

/**
* Once all values have been published, write the
* index file (if enabled).
*/
protected void onComplete(nope) {
if( indexOpts && indexRecords.size() > 0 ) {
log.trace "Saving records to index file: ${indexRecords}"
final ext = indexOpts.path.getExtension()
if( ext == 'csv' )
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexOpts.path)
else if( ext == 'json' )
indexOpts.path.text = DumpHelper.prettyPrint(indexRecords)
else
log.warn "Invalid extension '${ext}' for index file '${indexOpts.path}' -- should be 'csv' or 'json'"
session.notifyFilePublish(indexOpts.path)
final indexPath = indexOpts.path
final ext = indexPath.getExtension()
indexPath.parent.mkdirs()
if( ext == 'csv' ) {
new CsvWriter(header: indexOpts.header, sep: indexOpts.sep).apply(indexRecords, indexPath)
}
else if( ext == 'json' ) {
indexPath.text = DumpHelper.prettyPrint(indexRecords)
}
else {
log.warn "Invalid extension '${ext}' for index file '${indexPath}' -- should be 'csv' or 'json'"
}
session.notifyFilePublish(indexPath)
}

log.trace "Publish operator complete"
Expand All @@ -141,6 +186,10 @@ class PublishOp {
for( final el : value )
collectFiles(result, el)
}
else if( value instanceof Map ) {
for( final entry : value.entrySet() )
collectFiles(result, entry.value)
}
return result
}

Expand All @@ -149,37 +198,43 @@ class PublishOp {
* work directory paths to publish paths.
*
* @param value
* @param targetDirOrClosure
*/
protected Object normalizePaths(value) {
protected Object normalizePaths(value, targetDirOrClosure) {
if( value instanceof Path ) {
return List.of(value.getBaseName(), normalizePath(value))
return List.of(value.getBaseName(), normalizePath(value, targetDirOrClosure))
}

if( value instanceof Collection ) {
return value.collect { el ->
if( el instanceof Path )
return normalizePath(el)
return normalizePath(el, targetDirOrClosure)
if( el instanceof Collection<Path> )
return normalizePaths(el)
return normalizePaths(el, targetDirOrClosure)
return el
}
}

if( value instanceof Map ) {
return value.collectEntries { k, v ->
if( v instanceof Path )
return List.of(k, normalizePath(v))
if( v instanceof Collection<Path> )
return List.of(k, normalizePaths(v))
return List.of(k, v)
}
return value
.findAll { k, v -> v != null }
.collectEntries { k, v ->
if( v instanceof Path )
return List.of(k, normalizePath(v, targetDirOrClosure))
if( v instanceof Collection<Path> )
return List.of(k, normalizePaths(v, targetDirOrClosure))
return List.of(k, v)
}
}

throw new IllegalArgumentException("Index file record must be a list, map, or file: ${value} [${value.class.simpleName}]")
}

private Path normalizePath(Path path) {
private Path normalizePath(Path path, targetDirOrClosure) {
if( targetDirOrClosure instanceof Closure )
return (targetDirOrClosure.call(path.getName()) as Path).normalize()
final sourceDir = getTaskDir(path)
final targetDir = targetDirOrClosure as Path
return targetDir.resolve(sourceDir.relativize(path)).normalize()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ class OutputDsl {
// validate target configs
for( final name : targetConfigs.keySet() ) {
if( name !in publishSources )
log.warn "Publish target '${name}' was defined in the output block but not used by the workflow"
log.warn "Workflow output '${name}' was declared in the output block but not assigned in the workflow"
}

// create publish op (and optional index op) for each target
Expand All @@ -99,11 +99,11 @@ class OutputDsl {

final path = opts.path as String ?: name
if( path.startsWith('/') || path.endsWith('/') )
throw new ScriptRuntimeException("Invalid publish target path '${path}' -- it should not contain a leading or trailing slash")
opts.path = session.outputDir.resolve(path)
throw new ScriptRuntimeException("Invalid path '${path}' for workflow output '${name}' -- it should not contain a leading or trailing slash")
opts.path = path

if( opts.index && !(opts.index as Map).path )
throw new ScriptRuntimeException("Index file definition for publish target '${name}' is missing `path` option")
throw new ScriptRuntimeException("Index file definition for workflow output '${name}' is missing `path` option")

return opts
}
Expand Down Expand Up @@ -162,7 +162,7 @@ class OutputDsl {

void path(Closure value) {
setOption('path', '.')
setOption('pathAs', value)
setOption('dynamicPath', value)
}

void storageClass(String value) {
Expand Down
Loading

0 comments on commit ecda528

Please sign in to comment.