From 3e5ed92e293e439d4bc0b6874f951323ec0c0426 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Thu, 21 Dec 2023 22:04:06 +0100 Subject: [PATCH 1/3] Add support for custom fuse device plugin [ci fast] Signed-off-by: Paolo Di Tommaso --- .../main/groovy/nextflow/k8s/K8sConfig.groovy | 4 ++ .../groovy/nextflow/k8s/K8sTaskHandler.groovy | 3 +- .../nextflow/k8s/K8sTaskHandlerTest.groovy | 43 ++++++++++++++++++- 3 files changed, 48 insertions(+), 2 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy index 0be59bd423..fb3a2d15fb 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy @@ -116,6 +116,10 @@ class K8sConfig implements Map { target.storageSubPath } + String fuseDevicePlugin() { + target.fuseDevicePlugin ?: 'nextflow.io/fuse' + } + /** * Whenever the pod should honour the entrypoint defined by the image (default: false) * diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy index f782e423d6..bfcd6e8831 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy @@ -252,7 +252,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { if( fusionConfig().privileged() ) builder.withPrivileged(true) else { - builder.withResourcesLimits(["nextflow.io/fuse": 1]) + final device = Map.of(k8sConfig.fuseDevicePlugin(), 1) + builder.withResourcesLimits(device) } final env = fusionLauncher().fusionEnv() diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy index 8ac5093294..61bd57e235 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -938,7 +938,9 @@ class K8sTaskHandlerTest extends Specification { def client = Mock(K8sClient) def builder = Mock(K8sWrapperBuilder) def launcher = Mock(FusionScriptLauncher) - def handler = Spy(new K8sTaskHandler(builder:builder, client: client)) + def k8sConfig = Spy(K8sConfig) + def exec = Mock(K8sExecutor) { getK8sConfig()>>k8sConfig } + def handler = Spy(new K8sTaskHandler(builder:builder, client: client, executor: exec)) Map result when: @@ -973,6 +975,45 @@ class K8sTaskHandlerTest extends Specification { result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']] result.spec.containers[0].resources == [limits:['nextflow.io/fuse':1]] !result.spec.containers[0].securityContext + + + /* + * use custom fuse device + */ + when: + result = handler.newSubmitRequest(task) + then: + launcher.fusionEnv() >> [FUSION_BUCKETS: 'this,that'] + launcher.toContainerMount(WORK_DIR.resolve('.command.run')) >> Path.of('/fusion/http/work/dir/.command.run') + launcher.fusionSubmitCli(task) >> ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run'] + and: + k8sConfig.fuseDevicePlugin() >> 'custom/device/fuse' + and: + handler.getTask() >> task + handler.fusionEnabled() >> true + handler.fusionLauncher() >> launcher + handler.fusionConfig() >> new FusionConfig(privileged: false) + and: + task.getContainer() >> 'debian:latest' + task.getWorkDir() >> WORK_DIR + task.getConfig() >> config + and: + 1 * handler.fixOwnership() >> false + 1 * handler.entrypointOverride() >> false + 1 * handler.getPodOptions() >> new PodOptions() + 1 * handler.getSyntheticPodName(task) >> 'nf-123' + 1 * handler.getLabels(task) >> [:] + 1 * handler.getAnnotations() >> [:] + 1 * handler.getContainerMounts() >> [] + and: + 1 * config.getCpus() >> 0 + 1 * config.getMemory() >> null + 1 * client.getConfig() >> new ClientConfig() + and: + result.spec.containers[0].args == ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run'] + result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']] + result.spec.containers[0].resources == [limits:['custom/device/fuse':1]] + !result.spec.containers[0].securityContext } def 'get fusion submit command' () { From 842cea424ec2e766daefe22c0a434c38f75c5470 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 20 Jan 2024 18:15:58 +0100 Subject: [PATCH 2/3] Improve + tests [ci fast] Signed-off-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/k8s/K8sConfig.groovy | 11 +++++++++-- .../main/groovy/nextflow/k8s/K8sTaskHandler.groovy | 2 +- .../test/groovy/nextflow/k8s/K8sConfigTest.groovy | 12 ++++++++++++ .../groovy/nextflow/k8s/K8sTaskHandlerTest.groovy | 2 +- 4 files changed, 23 insertions(+), 4 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy index fb3a2d15fb..aed5d977e3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy @@ -42,6 +42,8 @@ import nextflow.util.Duration @CompileStatic class K8sConfig implements Map { + static final private Map DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1) + @Delegate private Map target @@ -116,8 +118,13 @@ class K8sConfig implements Map { target.storageSubPath } - String fuseDevicePlugin() { - target.fuseDevicePlugin ?: 'nextflow.io/fuse' + Map fuseDevicePlugin() { + final result = target.fuseDevicePlugin + if( result instanceof Map && result.size()==1 ) + return result as Map + if( result ) + log.warn1 "Setting 'fuseDevicePlugin' should be a map object providing exactly one entry - offending value: $result" + return DEFAULT_FUSE_PLUGIN } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy index bfcd6e8831..91d8dbc160 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy @@ -252,7 +252,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { if( fusionConfig().privileged() ) builder.withPrivileged(true) else { - final device = Map.of(k8sConfig.fuseDevicePlugin(), 1) + final device= k8sConfig.fuseDevicePlugin() builder.withResourcesLimits(device) } diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy index 4d63b2e82b..5e843bd2c9 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy @@ -129,6 +129,18 @@ class K8sConfigTest extends Specification { } + def 'should set device plugin' () { + when: + def cfg = new K8sConfig([:]) + then: + cfg.fuseDevicePlugin() == ['nextflow.io/fuse':1] + + when: + cfg = new K8sConfig([fuseDevicePlugin:['foo/fuse':10]]) + then: + cfg.fuseDevicePlugin() == ['foo/fuse':10] + } + def 'should create client config' () { given: diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy index 61bd57e235..c3c7f8b5a0 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -987,7 +987,7 @@ class K8sTaskHandlerTest extends Specification { launcher.toContainerMount(WORK_DIR.resolve('.command.run')) >> Path.of('/fusion/http/work/dir/.command.run') launcher.fusionSubmitCli(task) >> ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run'] and: - k8sConfig.fuseDevicePlugin() >> 'custom/device/fuse' + k8sConfig.fuseDevicePlugin() >> ['custom/device/fuse': 1] and: handler.getTask() >> task handler.fusionEnabled() >> true From 1c9c91674bc5122cc25ca68c6588239545ac6828 Mon Sep 17 00:00:00 2001 From: Paolo Di Tommaso Date: Sat, 20 Jan 2024 19:05:39 +0100 Subject: [PATCH 3/3] Add docs Signed-off-by: Paolo Di Tommaso --- docs/config.md | 5 +++++ docs/kubernetes.md | 17 +++++++++++++++++ 2 files changed, 22 insertions(+) diff --git a/docs/config.md b/docs/config.md index a0f6e51a84..26db447620 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1006,6 +1006,11 @@ The following settings are available: ::: : If you trace the hostname, activate this option (default: `false`). +`k8s.fuseDevicePlugin` +: :::{versionadded} 24.01.0-edge + ::: +: The FUSE device plugin to be used when enabling Fusion in unprivileged mode (default: `['nextflow.io/fuse': 1]`). + `k8s.httpConnectTimeout` : :::{versionadded} 22.10.0 ::: diff --git a/docs/kubernetes.md b/docs/kubernetes.md index e6f38a9542..357d97f207 100644 --- a/docs/kubernetes.md +++ b/docs/kubernetes.md @@ -120,6 +120,23 @@ Then the pipeline execution can be launched using the usual run command and spec nextflow run -work-dir s3:///scratch ``` +:::{note} +When using Fusion, pods will run as *privileged* by default. +::: + +To use Fusion with without the need for escalating privileges, it is required to install in the Kubernetes cluster the +Nextflow [FUSE device plugin](https://github.com/nextflow-io/k8s-fuse-plugin) and add in your Nextflow configuration the following +setting: + +``` +fusion { + privileged = false +} +``` + +To use a custom FUSE device plugin, specify it via the setting `k8s.fuseDevicePlugin`. See +the {ref}`Kubernetes configuration section` for details. + ### Running in a pod Nextflow can be executed directly from a pod running in a Kubernetes cluster. In these cases you will need to use the plain Nextflow `run` command and specify the `k8s` executor and the required persistent volume claim in the `nextflow.config` file as shown below: