Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "topic" channel #4459

Merged
merged 22 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
118 changes: 113 additions & 5 deletions docs/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,21 @@ In Nextflow there are two kinds of channels: *queue channels* and *value channel

### Queue channel

A *queue channel* is a non-blocking unidirectional FIFO queue which connects two processes, channel factories, or operators.
A *queue channel* is a non-blocking unidirectional FIFO queue connecting a *producer* process (i.e. outputting a value)
to a consumer process, or an operators.

A queue channel can be created by factory methods ([of](#of), [fromPath](#frompath), etc), operators ({ref}`operator-map`, {ref}`operator-flatmap`, etc), and processes (see {ref}`Process outputs <process-output>`).

(channel-type-value)=

### Value channel

A *value channel* contains a single value and can be consumed any number of times by a process or operator.
A *value channel* can be bound (i.e. assigned) with one and only one value, and can be consumed any number of times by
a process or an operator.

A value channel can be created with the [value](#value) factory method or by any operator that produces a single value ({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel.
A value channel can be created with the [value](#value) factory method or by any operator that produces a single value
({ref}`operator-first`, {ref}`operator-collect`, {ref}`operator-reduce`, etc). Additionally, a process will emit value
channels if it is invoked with all value channels, including simple values which are implicitly wrapped in a value channel.

For example:

Expand All @@ -52,7 +56,8 @@ workflow {
}
```

In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly wrapped in a value channel, and the output is also emitted as a value channel.
In the above example, since the `foo` process is invoked with a simple value instead of a channel, the input is implicitly
wrapped in a value channel, and the output is also emitted as a value channel.

See also: {ref}`process-multiple-input-channels`.

Expand All @@ -63,7 +68,8 @@ See also: {ref}`process-multiple-input-channels`.
Channels may be created explicitly using the following channel factory methods.

:::{versionadded} 20.07.0
`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or `Channel.of()`, and so on.
`channel` was introduced as an alias of `Channel`, allowing factory methods to be specified as `channel.of()` or
`Channel.of()`, and so on.
:::

(channel-empty)=
Expand Down Expand Up @@ -429,6 +435,108 @@ Y

See also: [channel.fromList](#fromlist) factory method.

(channel-topic)=

### topic

:::{versionadded} 23.11.0-edge
:::

:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::

A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and
{ref}`channel-type-queue`.

A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects
multiple *producer* processes with multiple *consumer* processes or operators.

:::{tip}
You can think about it as a channel that is shared across many different process using the same *topic name*.
:::

A process output can be assigned to a topic using the `topic` option on an output, for example:

```groovy
process foo {
output:
val('foo'), topic: my_topic
}

process bar {
output:
val('bar'), topic: my_topic
}
```

The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process
input or operator composition as any other Nextflow channel:

```groovy
channel.topic('my-topic').view()
```

This approach is a convenient way to collect related items from many different sources without explicitly defining
the logic connecting many different queue channels altogether, commonly using the `mix` operator.

:::{warning}
Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever.
:::

See also: {ref}`process-additional-options` for process outputs.

(channel-topic)=

### topic

:::{versionadded} 23.11.0-edge
:::

:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::

A *topic* is a channel type introduced as of Nextflow 23.11.0-edge along with {ref}`channel-type-value` and
{ref}`channel-type-queue`.

A *topic channel*, similarly to a *queue channel*, is non-blocking unidirectional FIFO queue, however it connects
multiple *producer* processes with multiple *consumer* processes or operators.

:::{tip}
You can think about it as a channel that is shared across many different process using the same *topic name*.
:::

A process output can be assigned to a topic using the `topic` option on an output, for example:

```groovy
process foo {
output:
val('foo'), topic: my_topic
}

process bar {
output:
val('bar'), topic: my_topic
}
```

The `channel.topic` method allows referencing the topic channel with the specified name, which can be used as a process
input or operator composition as any other Nextflow channel:

```groovy
Channel.topic('my-topic').view()
```

This approach is a convenient way to collect related items from many different sources without explicitly defining
the logic connecting many different queue channels altogether, commonly using the `mix` operator.

:::{warning}
Any process that consumes a channel topic should not send any outputs to that topic, or else the pipeline will hang forever.
pditommaso marked this conversation as resolved.
Show resolved Hide resolved
:::

See also: {ref}`process-additional-options` for process outputs.

(channel-value)=

### value
Expand Down
9 changes: 9 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1856,3 +1856,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`
: *Experimental: may change in a future release.*

: When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information.

`nextflow.preview.topic`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

: When `true`, enables {ref}`topic channels <channel-topic>` feature.
81 changes: 39 additions & 42 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,62 +1166,59 @@ process foo {
```
:::

### Optional outputs
(process-additional-options)=

In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced:
### Additional options

```groovy
output:
path("output.txt"), optional: true
```
The following options are available for all process outputs:

In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`.
`emit: <name>`

(process-multiple-outputs)=
: Defines the name of the output channel, which can be used to access the channel by name from the process output:

### Multiple outputs
```groovy
process FOO {
output:
path 'hello.txt', emit: hello
path 'bye.txt', emit: bye

"""
echo "hello" > hello.txt
echo "bye" > bye.txt
"""
}

When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero):
workflow {
FOO()
FOO.out.hello.view()
}
```

```groovy
process FOO {
output:
path 'bye_file.txt'
path 'hi_file.txt'
See {ref}`workflow-process-invocation` for more details.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
`optional: true | false`

workflow {
FOO()
FOO.out[1].view()
}
```
: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel.

You can also use the `emit` option to assign a name to each output and access them by name:
```groovy
output:
path("output.txt"), optional: true
```

```groovy
process FOO {
output:
path 'bye_file.txt', emit: bye_file
path 'hi_file.txt', emit: hi_file
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
: :::{note}
While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional.
:::

workflow {
FOO()
FOO.out.hi_file.view()
}
```
`topic: <name>`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

See {ref}`workflow-process-invocation` for more details.
: Defines the {ref}`channel topic <channel-topic>` to which the output will be sent.

## When

Expand Down
4 changes: 2 additions & 2 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ workflow {
}
```

When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below).
When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below).

The process output(s) can also be accessed like the return value of a function:

Expand Down Expand Up @@ -144,7 +144,7 @@ workflow {
}
```

See {ref}`process-multiple-outputs` for more details.
See {ref}`process outputs <process-additional-options>` for more details.

### Process named stdout

Expand Down
5 changes: 5 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Channel {
return CH.queue()
}

static DataflowWriteChannel topic(String name) {
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS)
return CH.topic(name)
}

/**
* Create a empty channel i.e. only emits a STOP signal
*
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NF.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ class NF {
static boolean isRecurseEnabled() {
NextflowMeta.instance.preview.recursion
}

static boolean isTopicChannelEnabled() {
NextflowMeta.instance.preview.topic
}
}
7 changes: 7 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NextflowMeta {
volatile float dsl
boolean strict
boolean recursion
boolean topic

void setDsl( float num ) {
if( num == 1 )
Expand All @@ -59,6 +60,12 @@ class NextflowMeta {
log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.recursion = recurse
}

void setTopic(Boolean value) {
if( topic )
log.warn "CHANNEL TOPICS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.topic = value
}
}

static class Features implements Flags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,15 +915,16 @@ class NextflowDSLImpl implements ASTTransformation {

/**
* Transform a map entry `emit: something` into `emit: 'something'
* and `topic: something` into `topic: 'something'
* (ie. as a constant) in a map expression passed as argument to
* a method call. This allow the syntax
*
* output:
* path 'foo', emit: bar
* path 'foo', emit: bar, topic: baz
*
* @param call
*/
protected void fixOutEmitOption(MethodCallExpression call) {
protected void fixOutEmitAndTopicOptions(MethodCallExpression call) {
List<Expression> args = isTupleX(call.arguments)?.expressions
if( !args ) return
if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return
Expand All @@ -936,6 +937,9 @@ class NextflowDSLImpl implements ASTTransformation {
if( key?.text == 'emit' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
else if( key?.text == 'topic' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
}
}

Expand All @@ -955,7 +959,7 @@ class NextflowDSLImpl implements ASTTransformation {
// prefix the method name with the string '_out_'
methodCall.setMethod( new ConstantExpression('_out_' + methodName) )
fixMethodCall(methodCall)
fixOutEmitOption(methodCall)
fixOutEmitAndTopicOptions(methodCall)
}

else if( methodName in ['into','mode'] ) {
Expand Down
Loading