Skip to content

Commit

Permalink
Add integration test, documentation
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <bentshermann@gmail.com>
  • Loading branch information
bentsherman committed Nov 9, 2023
1 parent 64c4eed commit ec156bc
Show file tree
Hide file tree
Showing 7 changed files with 145 additions and 44 deletions.
41 changes: 41 additions & 0 deletions docs/channel.md
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,47 @@ Y

See also: [fromList](#fromlist) factory method.

(channel-topic)=

### topic

:::{versionadded} 23.11.0-edge
:::

:::{note}
This feature requires the `nextflow.preview.topic` feature flag to be enabled.
:::

The `topic` method is used to create a "topic" channel, which is a queue channel that can receive items from multiple sources.

Any process can send items to a topic by using the `topic` option on an output:

```groovy
process foo {
output:
val('foo'), topic: 'my-topic'
}
process bar {
output:
val('bar'), topic: 'my-topic'
}
```

Then, the `topic` method can be used to consume all items in the topic:

```groovy
Channel.topic('my-topic').view()
```

This approach is a convenient way to collect outputs from many sources without having to write the necessary channel logic. You can name topics however you want, and you can use different names to collect items for different "topics", as long as your process outputs and channel logic are consistent with each other.

:::{warning}
Any process that consumes a topic channel should not send any outputs to that topic, or else the pipeline will hang forever.
:::

See also: {ref}`process-additional-options` for process outputs.

(channel-value)=

### value
Expand Down
9 changes: 9 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1806,3 +1806,12 @@ Some features can be enabled using the `nextflow.enable` and `nextflow.preview`
: *Experimental: may change in a future release.*

: When `true`, enables process and workflow recursion. See [this GitHub discussion](https://github.com/nextflow-io/nextflow/discussions/2521) for more information.

`nextflow.preview.topic`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

: When `true`, enables {ref}`topic channels <channel-topic>`.
81 changes: 39 additions & 42 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1166,62 +1166,59 @@ process foo {
```
:::

### Optional outputs
(process-additional-options)=

In most cases, a process is expected to produce an output for each output definition. However, there are situations where it is valid for a process to not generate output. In these cases, `optional: true` may be added to the output definition, which tells Nextflow not to fail the process if the declared output is not produced:
### Additional options

```groovy
output:
path("output.txt"), optional: true
```
The following options are available for all process outputs:

In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is legitimately missing, the process does not fail. The output channel will only contain values for those processes that produce `output.txt`.
`emit: <name>`

(process-multiple-outputs)=
: Defines the name of the output channel, which can be used to access the channel by name from the process output:

### Multiple outputs
```groovy
process FOO {
output:
path 'hello.txt', emit: hello
path 'bye.txt', emit: bye
"""
echo "hello" > hello.txt
echo "bye" > bye.txt
"""
}
When a process declares multiple outputs, each output can be accessed by index. The following example prints the second process output (indexes start at zero):
workflow {
FOO()
FOO.out.hello.view()
}
```

```groovy
process FOO {
output:
path 'bye_file.txt'
path 'hi_file.txt'
See {ref}`workflow-process-invocation` for more details.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
`optional: true | false`

workflow {
FOO()
FOO.out[1].view()
}
```
: Normally, if a specified output is not produced by the task, the task will fail. Setting `optional: true` will cause the task to not fail, and instead emit nothing to the given output channel.

You can also use the `emit` option to assign a name to each output and access them by name:
```groovy
output:
path("output.txt"), optional: true
```

```groovy
process FOO {
output:
path 'bye_file.txt', emit: bye_file
path 'hi_file.txt', emit: hi_file
In this example, the process is normally expected to produce an `output.txt` file, but in the cases where the file is missing, the task will not fail. The output channel will only contain values for those tasks that produced `output.txt`.

"""
echo "bye" > bye_file.txt
echo "hi" > hi_file.txt
"""
}
: :::{note}
While this option can be used with any process output, it cannot be applied to individual elements of a [tuple](#output-type-tuple) output. The entire tuple must be optional or not optional.
:::

workflow {
FOO()
FOO.out.hi_file.view()
}
```
`topic: <name>`

: :::{versionadded} 23.11.0-edge
:::

: *Experimental: may change in a future release.*

See {ref}`workflow-process-invocation` for more details.
: Defines the {ref}`topic channel <channel-topic>` to which the output will be sent. Cannot be used with the `emit` option on the same output.

## When

Expand Down
4 changes: 2 additions & 2 deletions docs/workflow.md
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ workflow {
}
```

When a process defines multiple output channels, each output can be accessed using the array element operator (`out[0]`, `out[1]`, etc.) or using *named outputs* (see below).
When a process defines multiple output channels, each output can be accessed by index (`out[0]`, `out[1]`, etc.) or by name (see below).

The process output(s) can also be accessed like the return value of a function:

Expand Down Expand Up @@ -144,7 +144,7 @@ workflow {
}
```

See {ref}`process-multiple-outputs` for more details.
See {ref}`process outputs <process-additional-options>` for more details.

### Process named stdout

Expand Down
15 changes: 15 additions & 0 deletions tests/checks/topic-channel.nf/.checks
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
#
# initial run
#
echo Initial run
$NXF_RUN

cmp versions.txt .expected || false

#
# Resumed run
#
echo Resumed run
$NXF_RUN -resume

cmp versions.txt .expected || false
2 changes: 2 additions & 0 deletions tests/checks/topic-channel.nf/.expected
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
bar: 0.9.0
foo: 0.1.0
37 changes: 37 additions & 0 deletions tests/topic-channel.nf
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

nextflow.preview.topic = true

process foo {
input:
val(index)

output:
stdout topic: versions

script:
"""
echo 'foo: 0.1.0'
"""
}

process bar {
input:
val(index)

output:
stdout topic: versions

script:
"""
echo 'bar: 0.9.0'
"""
}

workflow {
Channel.of( 1..3 ) | foo
Channel.of( 1..3 ) | bar

Channel.topic('versions')
| unique
| collectFile(name: 'versions.txt', sort: true, storeDir: '.')
}

0 comments on commit ec156bc

Please sign in to comment.