Skip to content

Commit

Permalink
Add support for custom fuse device plugin (nextflow-io#4612) [ci fast]
Browse files Browse the repository at this point in the history
This commit allows customising the FUSE plugin device required
by Fusion when using the Kubernetes executor. The FUSE plugin
name  can be specified by using the setting `k8s.fuseDevicePlugin`
for example:

```
k8s.fuseDevicePlugin = ['my-plugin/fuse': 1]
```

Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Niklas Schandry <niklas@bio.lmu.de>
  • Loading branch information
pditommaso authored and nschan committed Feb 6, 2024
1 parent 43f0fc1 commit 36fcf20
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -1006,6 +1006,11 @@ The following settings are available:
:::
: If you trace the hostname, activate this option (default: `false`).

`k8s.fuseDevicePlugin`
: :::{versionadded} 24.01.0-edge
:::
: The FUSE device plugin to be used when enabling Fusion in unprivileged mode (default: `['nextflow.io/fuse': 1]`).

`k8s.httpConnectTimeout`
: :::{versionadded} 22.10.0
:::
Expand Down
17 changes: 17 additions & 0 deletions docs/kubernetes.md
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,23 @@ Then the pipeline execution can be launched using the usual run command and spec
nextflow run <YOUR PIPELINE> -work-dir s3://<YOUR-BUCKET>/scratch
```

:::{note}
When using Fusion, pods will run as *privileged* by default.
:::

To use Fusion with without the need for escalating privileges, it is required to install in the Kubernetes cluster the
Nextflow [FUSE device plugin](https://github.com/nextflow-io/k8s-fuse-plugin) and add in your Nextflow configuration the following
setting:

```
fusion {
privileged = false
}
```

To use a custom FUSE device plugin, specify it via the setting `k8s.fuseDevicePlugin`. See
the {ref}`Kubernetes configuration section<config-k8s>` for details.

### Running in a pod

Nextflow can be executed directly from a pod running in a Kubernetes cluster. In these cases you will need to use the plain Nextflow `run` command and specify the `k8s` executor and the required persistent volume claim in the `nextflow.config` file as shown below:
Expand Down
11 changes: 11 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ import nextflow.util.Duration
@CompileStatic
class K8sConfig implements Map<String,Object> {

static final private Map<String,?> DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1)

@Delegate
private Map<String,Object> target

Expand Down Expand Up @@ -116,6 +118,15 @@ class K8sConfig implements Map<String,Object> {
target.storageSubPath
}

Map<String,?> fuseDevicePlugin() {
final result = target.fuseDevicePlugin
if( result instanceof Map && result.size()==1 )
return result as Map<String,?>
if( result )
log.warn1 "Setting 'fuseDevicePlugin' should be a map object providing exactly one entry - offending value: $result"
return DEFAULT_FUSE_PLUGIN
}

/**
* Whenever the pod should honour the entrypoint defined by the image (default: false)
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
if( fusionConfig().privileged() )
builder.withPrivileged(true)
else {
builder.withResourcesLimits(["nextflow.io/fuse": 1])
final device= k8sConfig.fuseDevicePlugin()
builder.withResourcesLimits(device)
}

final env = fusionLauncher().fusionEnv()
Expand Down
12 changes: 12 additions & 0 deletions modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,18 @@ class K8sConfigTest extends Specification {

}

def 'should set device plugin' () {
when:
def cfg = new K8sConfig([:])
then:
cfg.fuseDevicePlugin() == ['nextflow.io/fuse':1]

when:
cfg = new K8sConfig([fuseDevicePlugin:['foo/fuse':10]])
then:
cfg.fuseDevicePlugin() == ['foo/fuse':10]
}

def 'should create client config' () {

given:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -938,7 +938,9 @@ class K8sTaskHandlerTest extends Specification {
def client = Mock(K8sClient)
def builder = Mock(K8sWrapperBuilder)
def launcher = Mock(FusionScriptLauncher)
def handler = Spy(new K8sTaskHandler(builder:builder, client: client))
def k8sConfig = Spy(K8sConfig)
def exec = Mock(K8sExecutor) { getK8sConfig()>>k8sConfig }
def handler = Spy(new K8sTaskHandler(builder:builder, client: client, executor: exec))
Map result

when:
Expand Down Expand Up @@ -973,6 +975,45 @@ class K8sTaskHandlerTest extends Specification {
result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']]
result.spec.containers[0].resources == [limits:['nextflow.io/fuse':1]]
!result.spec.containers[0].securityContext


/*
* use custom fuse device
*/
when:
result = handler.newSubmitRequest(task)
then:
launcher.fusionEnv() >> [FUSION_BUCKETS: 'this,that']
launcher.toContainerMount(WORK_DIR.resolve('.command.run')) >> Path.of('/fusion/http/work/dir/.command.run')
launcher.fusionSubmitCli(task) >> ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run']
and:
k8sConfig.fuseDevicePlugin() >> ['custom/device/fuse': 1]
and:
handler.getTask() >> task
handler.fusionEnabled() >> true
handler.fusionLauncher() >> launcher
handler.fusionConfig() >> new FusionConfig(privileged: false)
and:
task.getContainer() >> 'debian:latest'
task.getWorkDir() >> WORK_DIR
task.getConfig() >> config
and:
1 * handler.fixOwnership() >> false
1 * handler.entrypointOverride() >> false
1 * handler.getPodOptions() >> new PodOptions()
1 * handler.getSyntheticPodName(task) >> 'nf-123'
1 * handler.getLabels(task) >> [:]
1 * handler.getAnnotations() >> [:]
1 * handler.getContainerMounts() >> []
and:
1 * config.getCpus() >> 0
1 * config.getMemory() >> null
1 * client.getConfig() >> new ClientConfig()
and:
result.spec.containers[0].args == ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run']
result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']]
result.spec.containers[0].resources == [limits:['custom/device/fuse':1]]
!result.spec.containers[0].securityContext
}

def 'get fusion submit command' () {
Expand Down

0 comments on commit 36fcf20

Please sign in to comment.