diff --git a/docs/config.md b/docs/config.md index a0f6e51a84..26db447620 100644 --- a/docs/config.md +++ b/docs/config.md @@ -1006,6 +1006,11 @@ The following settings are available: ::: : If you trace the hostname, activate this option (default: `false`). +`k8s.fuseDevicePlugin` +: :::{versionadded} 24.01.0-edge + ::: +: The FUSE device plugin to be used when enabling Fusion in unprivileged mode (default: `['nextflow.io/fuse': 1]`). + `k8s.httpConnectTimeout` : :::{versionadded} 22.10.0 ::: diff --git a/docs/kubernetes.md b/docs/kubernetes.md index e6f38a9542..357d97f207 100644 --- a/docs/kubernetes.md +++ b/docs/kubernetes.md @@ -120,6 +120,23 @@ Then the pipeline execution can be launched using the usual run command and spec nextflow run -work-dir s3:///scratch ``` +:::{note} +When using Fusion, pods will run as *privileged* by default. +::: + +To use Fusion with without the need for escalating privileges, it is required to install in the Kubernetes cluster the +Nextflow [FUSE device plugin](https://github.com/nextflow-io/k8s-fuse-plugin) and add in your Nextflow configuration the following +setting: + +``` +fusion { + privileged = false +} +``` + +To use a custom FUSE device plugin, specify it via the setting `k8s.fuseDevicePlugin`. See +the {ref}`Kubernetes configuration section` for details. + ### Running in a pod Nextflow can be executed directly from a pod running in a Kubernetes cluster. In these cases you will need to use the plain Nextflow `run` command and specify the `k8s` executor and the required persistent volume claim in the `nextflow.config` file as shown below: diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy index 0be59bd423..aed5d977e3 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sConfig.groovy @@ -42,6 +42,8 @@ import nextflow.util.Duration @CompileStatic class K8sConfig implements Map { + static final private Map DEFAULT_FUSE_PLUGIN = Map.of('nextflow.io/fuse', 1) + @Delegate private Map target @@ -116,6 +118,15 @@ class K8sConfig implements Map { target.storageSubPath } + Map fuseDevicePlugin() { + final result = target.fuseDevicePlugin + if( result instanceof Map && result.size()==1 ) + return result as Map + if( result ) + log.warn1 "Setting 'fuseDevicePlugin' should be a map object providing exactly one entry - offending value: $result" + return DEFAULT_FUSE_PLUGIN + } + /** * Whenever the pod should honour the entrypoint defined by the image (default: false) * diff --git a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy index f782e423d6..91d8dbc160 100644 --- a/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/k8s/K8sTaskHandler.groovy @@ -252,7 +252,8 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask { if( fusionConfig().privileged() ) builder.withPrivileged(true) else { - builder.withResourcesLimits(["nextflow.io/fuse": 1]) + final device= k8sConfig.fuseDevicePlugin() + builder.withResourcesLimits(device) } final env = fusionLauncher().fusionEnv() diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy index 4d63b2e82b..5e843bd2c9 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sConfigTest.groovy @@ -129,6 +129,18 @@ class K8sConfigTest extends Specification { } + def 'should set device plugin' () { + when: + def cfg = new K8sConfig([:]) + then: + cfg.fuseDevicePlugin() == ['nextflow.io/fuse':1] + + when: + cfg = new K8sConfig([fuseDevicePlugin:['foo/fuse':10]]) + then: + cfg.fuseDevicePlugin() == ['foo/fuse':10] + } + def 'should create client config' () { given: diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy index 8ac5093294..c3c7f8b5a0 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sTaskHandlerTest.groovy @@ -938,7 +938,9 @@ class K8sTaskHandlerTest extends Specification { def client = Mock(K8sClient) def builder = Mock(K8sWrapperBuilder) def launcher = Mock(FusionScriptLauncher) - def handler = Spy(new K8sTaskHandler(builder:builder, client: client)) + def k8sConfig = Spy(K8sConfig) + def exec = Mock(K8sExecutor) { getK8sConfig()>>k8sConfig } + def handler = Spy(new K8sTaskHandler(builder:builder, client: client, executor: exec)) Map result when: @@ -973,6 +975,45 @@ class K8sTaskHandlerTest extends Specification { result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']] result.spec.containers[0].resources == [limits:['nextflow.io/fuse':1]] !result.spec.containers[0].securityContext + + + /* + * use custom fuse device + */ + when: + result = handler.newSubmitRequest(task) + then: + launcher.fusionEnv() >> [FUSION_BUCKETS: 'this,that'] + launcher.toContainerMount(WORK_DIR.resolve('.command.run')) >> Path.of('/fusion/http/work/dir/.command.run') + launcher.fusionSubmitCli(task) >> ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run'] + and: + k8sConfig.fuseDevicePlugin() >> ['custom/device/fuse': 1] + and: + handler.getTask() >> task + handler.fusionEnabled() >> true + handler.fusionLauncher() >> launcher + handler.fusionConfig() >> new FusionConfig(privileged: false) + and: + task.getContainer() >> 'debian:latest' + task.getWorkDir() >> WORK_DIR + task.getConfig() >> config + and: + 1 * handler.fixOwnership() >> false + 1 * handler.entrypointOverride() >> false + 1 * handler.getPodOptions() >> new PodOptions() + 1 * handler.getSyntheticPodName(task) >> 'nf-123' + 1 * handler.getLabels(task) >> [:] + 1 * handler.getAnnotations() >> [:] + 1 * handler.getContainerMounts() >> [] + and: + 1 * config.getCpus() >> 0 + 1 * config.getMemory() >> null + 1 * client.getConfig() >> new ClientConfig() + and: + result.spec.containers[0].args == ['/usr/bin/fusion', 'bash', '/fusion/http/work/dir/.command.run'] + result.spec.containers[0].env == [[name:'FUSION_BUCKETS', value:'this,that']] + result.spec.containers[0].resources == [limits:['custom/device/fuse':1]] + !result.spec.containers[0].securityContext } def 'get fusion submit command' () {