Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add "topic" channel #4459

Merged
merged 22 commits into from
Nov 24, 2023
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 41 additions & 0 deletions docs/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,47 @@ Y

See also: [fromList](#fromlist) factory method.

(channel-topic)=

### topic

:::{versionadded} 23.11.0-edge
:::

:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::

The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from many sources implicitly based on a "topic" identifier.
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

Any process can send items to a topic by using the `topic` option on an output:

```groovy
process foo {
output:
val('foo'), topic: 'my-topic'
bentsherman marked this conversation as resolved.
Show resolved Hide resolved
}

process bar {
output:
val('bar'), topic: 'my-topic'
}
```

Then, the `topic` method can be used to consume all items in the topic:

```groovy
Channel.topic('my-topic').view()
```

This approach is a convenient way to collect related items from many different sources without explicitly defining the channel logic (the `topic` method is essentially an implicit {ref}`operator-mix` operation). You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other.

:::{warning}
Any process that consumes a topic channel should not send any outputs to that topic, or else the pipeline will hang forever.
:::

See also: {ref}`process-additional-options` for process outputs.

(channel-value)=

### value
Expand Down
9 changes: 9 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1806,3 +1806,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`
: *Experimental: may change in a future release.*

: When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information.

`nextflow.preview.topic`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

: When `true`, enables {ref}`topic channels <channel-topic>`.
81 changes: 39 additions & 42 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,62 +1166,59 @@ process foo {
```
:::

### Optional outputs
(process-additional-options)=

In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced:
### Additional options

```groovy
output:
path("output.txt"), optional: true
```
The following options are available for all process outputs:

In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`.
`emit: <name>`

(process-multiple-outputs)=
: Defines the name of the output channel, which can be used to access the channel by name from the process output:

### Multiple outputs
```groovy
process FOO {
output:
path 'hello.txt', emit: hello
path 'bye.txt', emit: bye

"""
echo "hello" > hello.txt
echo "bye" > bye.txt
"""
}

When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero):
workflow {
FOO()
FOO.out.hello.view()
}
```

```groovy
process FOO {
output:
path 'bye_file.txt'
path 'hi_file.txt'
See {ref}`workflow-process-invocation` for more details.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
`optional: true | false`

workflow {
FOO()
FOO.out[1].view()
}
```
: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel.

You can also use the `emit` option to assign a name to each output and access them by name:
```groovy
output:
path("output.txt"), optional: true
```

```groovy
process FOO {
output:
path 'bye_file.txt', emit: bye_file
path 'hi_file.txt', emit: hi_file
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
: :::{note}
While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional.
:::

workflow {
FOO()
FOO.out.hi_file.view()
}
```
`topic: <name>`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

See {ref}`workflow-process-invocation` for more details.
: Defines the {ref}`topic channel <channel-topic>` to which the output will be sent. Cannot be used with the `emit` option on the same output.

## When

Expand Down
4 changes: 2 additions & 2 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ workflow {
}
```

When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below).
When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below).

The process output(s) can also be accessed like the return value of a function:

Expand Down Expand Up @@ -144,7 +144,7 @@ workflow {
}
```

See {ref}`process-multiple-outputs` for more details.
See {ref}`process outputs <process-additional-options>` for more details.

### Process named stdout

Expand Down
5 changes: 5 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Channel.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,11 @@ class Channel {
return CH.queue()
}

static DataflowWriteChannel topic(String name) {
if( !NF.topicChannelEnabled ) throw new MissingMethodException('topic', Channel.class, InvokerHelper.EMPTY_ARGS)
return CH.topic(name)
}

/**
* Create a empty channel i.e. only emits a STOP signal
*
Expand Down
4 changes: 4 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NF.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,8 @@ class NF {
static boolean isRecurseEnabled() {
NextflowMeta.instance.preview.recursion
}

static boolean isTopicChannelEnabled() {
NextflowMeta.instance.preview.topic
}
}
7 changes: 7 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/NextflowMeta.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class NextflowMeta {
volatile float dsl
boolean strict
boolean recursion
boolean topic

void setDsl( float num ) {
if( num == 1 )
Expand All @@ -59,6 +60,12 @@ class NextflowMeta {
log.warn "NEXTFLOW RECURSION IS A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.recursion = recurse
}

void setTopic(Boolean value) {
if( topic )
log.warn "TOPIC CHANNELS ARE A PREVIEW FEATURE - SYNTAX AND FUNCTIONALITY CAN CHANGE IN FUTURE RELEASES"
this.topic = value
}
}

static class Features implements Flags {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -915,15 +915,16 @@ class NextflowDSLImpl implements ASTTransformation {

/**
* Transform a map entry `emit: something` into `emit: 'something'
* and `topic: something` into `topic: 'something'
* (ie. as a constant) in a map expression passed as argument to
* a method call. This allow the syntax
*
* output:
* path 'foo', emit: bar
* path 'foo', emit: bar, topic: baz
*
* @param call
*/
protected void fixOutEmitOption(MethodCallExpression call) {
protected void fixOutEmitAndTopicOptions(MethodCallExpression call) {
List<Expression> args = isTupleX(call.arguments)?.expressions
if( !args ) return
if( args.size()<2 && (args.size()!=1 || call.methodAsString!='_out_stdout')) return
Expand All @@ -936,6 +937,9 @@ class NextflowDSLImpl implements ASTTransformation {
if( key?.text == 'emit' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
else if( key?.text == 'topic' && val ) {
map.mapEntryExpressions[i] = new MapEntryExpression(key, constX(val.text))
}
}
}

Expand All @@ -955,7 +959,7 @@ class NextflowDSLImpl implements ASTTransformation {
// prefix the method name with the string '_out_'
methodCall.setMethod( new ConstantExpression('_out_' + methodName) )
fixMethodCall(methodCall)
fixOutEmitOption(methodCall)
fixOutEmitAndTopicOptions(methodCall)
}

else if( methodName in ['into','mode'] ) {
Expand Down
65 changes: 61 additions & 4 deletions modules/nextflow/src/main/groovy/nextflow/extension/CH.groovy
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package nextflow.extension

import static nextflow.Channel.*

import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
Expand All @@ -15,8 +17,6 @@ import nextflow.Channel
import nextflow.Global
import nextflow.NF
import nextflow.Session
import static nextflow.Channel.STOP

/**
* Helper class to handle channel internal api ops
*
Expand All @@ -30,7 +30,14 @@ class CH {
return (Session) Global.session
}

static private Map<DataflowQueue, DataflowBroadcast> bridges = new HashMap<>(10)
static class Topic {
List<DataflowWriteChannel> sources = new ArrayList<>(10)
DataflowBroadcast target = new DataflowBroadcast()
}

static final private Map<String, Topic> allTopics = new HashMap<>(10)

static final private Map<DataflowQueue, DataflowBroadcast> bridges = new HashMap<>(10)
bentsherman marked this conversation as resolved.
Show resolved Hide resolved

static DataflowReadChannel getReadChannel(channel) {
if (channel instanceof DataflowQueue)
Expand Down Expand Up @@ -66,14 +73,41 @@ class CH {
}

static void broadcast() {
// connect all dataflow queue variables to associated broadcast channel
// connect all broadcast topics, note this must be before the following
// "bridging" step because it can modify the final network topology
connectTopics()
// bridge together all broadcast channels
bridgeChannels()
}

static private void bridgeChannels() {
// connect all dataflow queue variables to associated broadcast channel
for( DataflowQueue queue : bridges.keySet() ) {
log.trace "Bridging dataflow queue=$queue"
def broadcast = bridges.get(queue)
queue.into(broadcast)
}
}

static private void connectTopics() {
for( Topic topic : allTopics.values() ) {
if( topic.sources ) {
// get the list of source channels for this topic
final ch = new ArrayList(topic.sources)
// the mix operator requires at least two sources, add an empty channel if needed
if( ch.size()==1 )
ch.add(empty())
// map write channels to read channels
final sources = ch.collect(it -> getReadChannel(it))
// mix all of them
new MixOp(sources).withTarget(topic.target).apply()
}
else {
topic.target.bind(STOP)
}
}
}

static void init() { bridges.clear() }

@PackageScope
Expand Down Expand Up @@ -102,6 +136,29 @@ class CH {
return new DataflowQueue()
}

static DataflowBroadcast topic(String name) {
synchronized (allTopics) {
def topic = allTopics[name]
if( topic!=null )
return topic.target
// create a new topic
topic = new Topic()
allTopics[name] = topic
return topic.target
}
}

static DataflowWriteChannel topicWriter(String name) {
synchronized (allTopics) {
if( name !in allTopics )
allTopics[name] = new Topic()
def topic = allTopics[name]
def result = CH.create()
topic.sources.add(result)
return result
}
}

static boolean isChannel(obj) {
obj instanceof DataflowReadChannel || obj instanceof DataflowWriteChannel
}
Expand Down
Loading