diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index 6bcde07363..74f060ad13 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -111,6 +111,7 @@ jobs: if: ${{ !contains(github.event.head_commit.message, '[ci fast]') && needs.build.outputs.any_changed == 'true' }} needs: build runs-on: ubuntu-latest + timeout-minutes: 90 strategy: fail-fast: false matrix: diff --git a/docs/process.md b/docs/process.md index db230dddc8..931f5e88d8 100644 --- a/docs/process.md +++ b/docs/process.md @@ -1331,6 +1331,65 @@ Allowed values for the `arch` directive are as follows, grouped by equivalent fa Examples of values for the architecture `target` option are `cascadelake`, `icelake`, `zen2` and `zen3`. See the Spack documentation for the full and up-to-date [list of meaningful targets](https://spack.readthedocs.io/en/latest/basic_usage.html#support-for-specific-microarchitectures). +(process-array)= + +## array + +:::{versionadded} 24.04.0 +::: + +:::{warning} *Experimental: may change in a future release.* +::: + +The `array` directive allows you to submit tasks as *job arrays* for executors that support it. + +A job array is a collection of jobs with the same resource requirements and the same script (parameterized by an index). Job arrays incur significantly less scheduling overhead compared to individual jobs, and as a result they are preferred by HPC schedulers where possible. + +The directive should be specified with a given array size, along with an executor that supports job arrays. For example: + +```groovy +process cpu_task { + executor 'slurm' + array 100 + + ''' + your_command --here + ''' +} +``` + +Nextflow currently supports job arrays for the following executors: + +- {ref}`awsbatch-executor` +- {ref}`google-batch-executor` +- {ref}`lsf-executor` +- {ref}`pbs-executor` +- {ref}`pbspro-executor` +- {ref}`sge-executor` +- {ref}`slurm-executor` + +A process using job arrays will collect tasks and submit each batch as a job array when it is ready. Any "leftover" tasks will be submitted as a partial job array. + +Once a job array is submitted, each "child" task is executed as an independent job. Any tasks that fail (and can be retried) will be retried without interfering with the tasks that succeeded. Retried tasks are submitted individually rather than through a job array, in order to allow for the use of [dynamic resources](#dynamic-computing-resources). + +The following directives must be uniform across all tasks in a process that uses job arrays, because these directives are specified once for the entire job array: + +- {ref}`process-accelerator` +- {ref}`process-clusterOptions` +- {ref}`process-cpus` +- {ref}`process-disk` +- {ref}`process-machineType` +- {ref}`process-memory` +- {ref}`process-queue` +- {ref}`process-resourcelabels` +- {ref}`process-resourcelimits` +- {ref}`process-time` + +For cloud-based executors like AWS Batch, or when using Fusion with any executor, the following additional directives must be uniform: + +- {ref}`process-container` +- {ref}`process-containerOptions` + (process-beforescript)= ### beforeScript diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy index f836dfd46a..9cb514a811 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/BashWrapperBuilder.groovy @@ -31,6 +31,7 @@ import nextflow.container.ContainerBuilder import nextflow.container.DockerBuilder import nextflow.container.SingularityBuilder import nextflow.exception.ProcessException +import nextflow.extension.FilesEx import nextflow.file.FileHelper import nextflow.processor.TaskBean import nextflow.processor.TaskProcessor @@ -307,6 +308,7 @@ class BashWrapperBuilder { final binding = new HashMap(20) binding.header_script = headerScript + binding.task_metadata = getTaskMetadata() binding.task_name = name binding.helpers_script = getHelpersScript() @@ -461,6 +463,31 @@ class BashWrapperBuilder { } } + protected String getTaskMetadata() { + final lines = new StringBuilder() + lines << '### ---\n' + lines << "### name: '${bean.name}'\n" + if( bean.arrayIndexName ) { + lines << '### array:\n' + lines << "### index-name: ${bean.arrayIndexName}\n" + lines << "### index-start: ${bean.arrayIndexStart}\n" + lines << "### work-dirs:\n" + for( Path it : bean.arrayWorkDirs ) + lines << "### - ${Escape.path(FilesEx.toUriString(it))}\n" + } + + if( containerConfig?.isEnabled() ) + lines << "### container: '${bean.containerImage}'\n" + + if( outputFiles.size() > 0 ) { + lines << '### outputs:\n' + for( final output : bean.outputFiles ) + lines << "### - '${output}'\n" + } + + lines << '### ...\n' + } + protected String getHelpersScript() { def result = new StringBuilder() diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/CrgExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/CrgExecutor.groovy index 09395198a3..2ed880e15b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/CrgExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/CrgExecutor.groovy @@ -18,6 +18,7 @@ package nextflow.executor import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskRun /** * An executor specialised for CRG cluster @@ -41,9 +42,18 @@ class CrgExecutor extends SgeExecutor { task.config.penv = 'smp' } + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '-t' << "1-${arraySize}".toString() + } + result << '-N' << getJobNameFor(task) - result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) - result << '-j' << 'y' + + if( task !instanceof TaskArrayRun ) { + result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) + result << '-j' << 'y' + } + result << '-terse' << '' // note: directive need to be returned as pairs /* diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy index 9212efd42e..1d6d92f37e 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/Executor.groovy @@ -81,7 +81,7 @@ abstract class Executor { * * @param task A {@code TaskRun} instance */ - final void submit( TaskRun task ) { + void submit( TaskRun task ) { log.trace "Scheduling process: ${task}" if( session.isTerminated() ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy index e13ee1e175..11d310b600 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/GridTaskHandler.groovy @@ -38,6 +38,7 @@ import nextflow.exception.ProcessNonZeroExitStatusException import nextflow.file.FileHelper import nextflow.fusion.FusionAwareTask import nextflow.fusion.FusionHelper +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.trace.TraceRecord @@ -100,6 +101,12 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask { this.sanityCheckInterval = duration } + @Override + void prepareLauncher() { + // -- create the wrapper script + createTaskWrapper(task).build() + } + protected ProcessBuilder createProcessBuilder() { // -- log the qsub command @@ -254,17 +261,15 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask { void submit() { ProcessBuilder builder = null try { - // -- create the wrapper script - createTaskWrapper(task).build() // -- start the execution and notify the event to the monitor builder = createProcessBuilder() // -- forward the job launcher script to the command stdin if required final stdinScript = executor.pipeLauncherScript() ? stdinLauncherScript() : null // -- execute with a re-triable strategy final result = safeExecute( () -> processStart(builder, stdinScript) ) - // -- save the JobId in the - this.jobId = executor.parseJobId(result) - this.status = SUBMITTED + // -- save the job id + final jobId = (String)executor.parseJobId(result) + updateStatus(jobId) log.debug "[${executor.name.toUpperCase()}] submitted process ${task.name} > jobId: $jobId; workDir: ${task.workDir}" } @@ -281,9 +286,21 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask { status = COMPLETED throw new ProcessFailedException("Error submitting process '${task.name}' for execution", e ) } - } + private void updateStatus(String jobId) { + if( task instanceof TaskArrayRun ) { + for( int i=0; i getDirectives(TaskRun task, List result) { - result << '-o' << task.workDir.resolve(TaskRun.CMD_LOG).toString() + if( task !instanceof TaskArrayRun ) { + result << '-o' << task.workDir.resolve(TaskRun.CMD_LOG).toString() + } // add other parameters (if any) if( task.config.queue ) { @@ -104,7 +107,13 @@ class LsfExecutor extends AbstractGridExecutor { } // -- the job name - result << '-J' << getJobNameFor(task) + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '-J' << "${getJobNameFor(task)}[1-${arraySize}]".toString() + } + else { + result << '-J' << getJobNameFor(task) + } // -- at the end append the command script wrapped file name result.addAll( task.config.getClusterOptionsAsList() ) @@ -317,4 +326,21 @@ class LsfExecutor extends AbstractGridExecutor { boolean isFusionEnabled() { return FusionHelper.isFusionEnabled(session) } + + @Override + String getArrayIndexName() { + return 'LSB_JOBINDEX' + } + + @Override + int getArrayIndexStart() { + return 1 + } + + @Override + String getArrayTaskId(String jobId, int index) { + assert jobId, "Missing 'jobId' argument" + return "${jobId}[${index + 1}]" + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/PbsExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/PbsExecutor.groovy index be69ecce2d..e721783833 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/PbsExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/PbsExecutor.groovy @@ -21,6 +21,7 @@ import java.util.regex.Pattern import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskRun /** * Implements a executor for PBS/Torque cluster @@ -29,7 +30,7 @@ import nextflow.processor.TaskRun */ @Slf4j @CompileStatic -class PbsExecutor extends AbstractGridExecutor { +class PbsExecutor extends AbstractGridExecutor implements TaskArrayExecutor { private static Pattern OPTS_REGEX = ~/(?:^|\s)-l.+/ @@ -43,9 +44,17 @@ class PbsExecutor extends AbstractGridExecutor { protected List getDirectives( TaskRun task, List result ) { assert result !=null + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '-J' << "0-${arraySize - 1}".toString() + } + result << '-N' << getJobNameFor(task) - result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) - result << '-j' << 'oe' + + if( task !instanceof TaskArrayRun ) { + result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) + result << '-j' << 'oe' + } // the requested queue name if( task.config.queue ) { @@ -180,4 +189,21 @@ class PbsExecutor extends AbstractGridExecutor { static protected boolean matchOptions(String value) { value ? OPTS_REGEX.matcher(value).find() : null } + + @Override + String getArrayIndexName() { + return 'PBS_ARRAY_INDEX' + } + + @Override + int getArrayIndexStart() { + return 0 + } + + @Override + String getArrayTaskId(String jobId, int index) { + assert jobId, "Missing 'jobId' argument" + return jobId.replace('[]', "[$index]") + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/PbsProExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/PbsProExecutor.groovy index c865f9a6ad..77bf6dadc4 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/PbsProExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/PbsProExecutor.groovy @@ -18,6 +18,7 @@ package nextflow.executor import groovy.transform.CompileStatic import groovy.util.logging.Slf4j +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskRun /** * Implements a executor for PBSPro cluster executor @@ -44,7 +45,12 @@ class PbsProExecutor extends PbsExecutor { @Override protected List getDirectives(TaskRun task, List result ) { assert result !=null - + + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '-J' << "0-${arraySize - 1}".toString() + } + // when multiple competing directives are provided, only the first one will take effect // therefore clusterOptions is added as first to give priority over other options as expected // by the clusterOptions semantics -- see https://github.com/nextflow-io/nextflow/pull/2036 @@ -53,8 +59,11 @@ class PbsProExecutor extends PbsExecutor { } result << '-N' << getJobNameFor(task) - result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) - result << '-j' << 'oe' + + if( task !instanceof TaskArrayRun ) { + result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) + result << '-j' << 'oe' + } // the requested queue name if( task.config.queue ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/SgeExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/SgeExecutor.groovy index 1db880ab0e..bf9b99e724 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/SgeExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/SgeExecutor.groovy @@ -19,6 +19,7 @@ import java.nio.file.Path import groovy.transform.CompileStatic import nextflow.fusion.FusionHelper +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskRun /** * Execute a task script by running it on the SGE/OGE cluster @@ -26,7 +27,7 @@ import nextflow.processor.TaskRun * @author Paolo Di Tommaso */ @CompileStatic -class SgeExecutor extends AbstractGridExecutor { +class SgeExecutor extends AbstractGridExecutor implements TaskArrayExecutor { /** * Gets the directives to submit the specified task to the cluster for execution @@ -37,9 +38,18 @@ class SgeExecutor extends AbstractGridExecutor { */ protected List getDirectives(TaskRun task, List result) { + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '-t' << "1-${arraySize}".toString() + } + result << '-N' << getJobNameFor(task) - result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) - result << '-j' << 'y' + + if( task !instanceof TaskArrayRun ) { + result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) + result << '-j' << 'y' + } + result << '-terse' << '' // note: directive need to be returned as pairs /* @@ -114,8 +124,14 @@ class SgeExecutor extends AbstractGridExecutor { if( entry.toString().isLong() ) return entry + if( (id=entry.tokenize('.').get(0)).isLong() ) + return id + if( entry.startsWith('Your job') && entry.endsWith('has been submitted') && (id=entry.tokenize().get(2)) ) return id + + if( entry.startsWith('Your job array') && entry.endsWith('has been submitted') && (id=entry.tokenize().get(3)) ) + return id.tokenize('.').get(0) } throw new IllegalStateException("Invalid SGE submit response:\n$text\n\n") @@ -185,4 +201,20 @@ class SgeExecutor extends AbstractGridExecutor { boolean isFusionEnabled() { return FusionHelper.isFusionEnabled(session) } + + @Override + String getArrayIndexName() { + return 'SGE_TASK_ID' + } + + @Override + int getArrayIndexStart() { + return 1 + } + + @Override + String getArrayTaskId(String jobId, int index) { + return "${jobId}.${index}" + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/SlurmExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/SlurmExecutor.groovy index 664379d280..bcf639847f 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/SlurmExecutor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/SlurmExecutor.groovy @@ -22,6 +22,7 @@ import java.util.regex.Pattern import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.fusion.FusionHelper +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig import nextflow.processor.TaskRun /** @@ -34,7 +35,7 @@ import nextflow.processor.TaskRun */ @Slf4j @CompileStatic -class SlurmExecutor extends AbstractGridExecutor { +class SlurmExecutor extends AbstractGridExecutor implements TaskArrayExecutor { static private Pattern SUBMIT_REGEX = ~/Submitted batch job (\d+)/ @@ -55,8 +56,18 @@ class SlurmExecutor extends AbstractGridExecutor { */ protected List getDirectives(TaskRun task, List result) { + if( task instanceof TaskArrayRun ) { + final arraySize = task.getArraySize() + result << '--array' << "0-${arraySize - 1}".toString() + } + result << '-J' << getJobNameFor(task) - result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) // -o OUTFILE and no -e option => stdout and stderr merged to stdout/OUTFILE + + if( task !instanceof TaskArrayRun ) { + // -o OUTFILE and no -e option => stdout and stderr merged to stdout/OUTFILE + result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG)) + } + result << '--no-requeue' << '' // note: directive need to be returned as pairs if( !hasSignalOpt(task.config) ) { @@ -218,4 +229,21 @@ class SlurmExecutor extends AbstractGridExecutor { boolean isFusionEnabled() { return FusionHelper.isFusionEnabled(session) } + + @Override + String getArrayIndexName() { + return 'SLURM_ARRAY_TASK_ID' + } + + @Override + int getArrayIndexStart() { + return 0 + } + + @Override + String getArrayTaskId(String jobId, int index) { + assert jobId, "Missing 'jobId' argument" + return "${jobId}_${index}" + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/TaskArrayExecutor.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/TaskArrayExecutor.groovy new file mode 100644 index 0000000000..b85990d652 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/executor/TaskArrayExecutor.groovy @@ -0,0 +1,87 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.executor + +import java.nio.file.FileSystems +import java.nio.file.Path + +import groovy.transform.CompileStatic +import nextflow.fusion.FusionHelper +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun +/** + * Interface for executors that support job arrays. + * + * @author Ben Sherman + */ +@CompileStatic +interface TaskArrayExecutor { + + String getName() + + Path getWorkDir() + + void submit( TaskRun task ) + + TaskHandler createTaskHandler(TaskRun task) + + boolean isFusionEnabled() + + /** + * Get the environment variable name that provides the array index of a task. + */ + String getArrayIndexName() + + /** + * Get the start of the job array index range. + */ + int getArrayIndexStart() + + /** + * Get the name of a child job based on the array job name + * and child index. + */ + String getArrayTaskId(String jobId, int index) + + default boolean isWorkDirDefaultFS() { + getWorkDir().fileSystem== FileSystems.default + } + + /** + * Get a {@link TaskHandler} work directory for the task array resolution + * + * @param handler + * @return + */ + default String getArrayWorkDir(TaskHandler handler) { + return isFusionEnabled() + ? FusionHelper.toContainerMount(handler.task.workDir).toString() + : handler.task.workDir.toUriString() + } + + default String getArrayLaunchCommand(String taskDir) { + if( isFusionEnabled() ) { + return "bash ${taskDir}/${TaskRun.CMD_RUN}" + } + else if( isWorkDirDefaultFS() ) { + return "bash ${taskDir}/${TaskRun.CMD_RUN} 2>&1 > ${taskDir}/${TaskRun.CMD_LOG}" + } + else { + throw new IllegalStateException("Executor ${getName()} does not support array jobs") + } + } +} diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy index 9f0578aaef..33c6f812ed 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionHelper.groovy @@ -90,4 +90,8 @@ class FusionHelper { return Path.of(result) } + static Path toContainerMount(Path path) { + toContainerMount(path, path.scheme) + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy index ec346db198..07af8f22c2 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/ParallelPollingMonitor.groovy @@ -86,7 +86,7 @@ class ParallelPollingMonitor extends TaskPollingMonitor { if( !session.success ) return // ignore error when the session has been interrupted handleException(handler, e) - session.notifyTaskComplete(handler) + notifyTaskComplete(handler) } } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy new file mode 100644 index 0000000000..b5a40db98d --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayCollector.groovy @@ -0,0 +1,204 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor + +import java.nio.file.Files +import java.util.concurrent.locks.Lock +import java.util.concurrent.locks.ReentrantLock + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.executor.Executor +import nextflow.executor.TaskArrayExecutor +import nextflow.file.FileHelper +import nextflow.util.CacheHelper +import nextflow.util.Escape +/** + * Task monitor that batches tasks and submits them as job arrays + * to an underlying task monitor. + * + * @author Ben Sherman + */ +@Slf4j +@CompileStatic +class TaskArrayCollector { + + /** + * The set of directives which are used by the job array. + */ + private static final List ARRAY_DIRECTIVES = [ + 'accelerator', + 'arch', + 'clusterOptions', + 'cpus', + 'disk', + 'machineType', + 'memory', + 'queue', + 'resourceLabels', + 'resourceLimits', + 'time', + // only needed for container-native executors and/or Fusion + 'container', + 'containerOptions', + ] + + private TaskProcessor processor + + private TaskArrayExecutor executor + + private int arraySize + + private Lock sync = new ReentrantLock() + + private List array + + private boolean closed = false + + TaskArrayCollector(TaskProcessor processor, Executor executor, int arraySize) { + if( executor !instanceof TaskArrayExecutor ) + throw new IllegalArgumentException("Executor '${executor.name}' does not support job arrays") + + this.processor = processor + this.executor = (TaskArrayExecutor)executor + this.arraySize = arraySize + this.array = new ArrayList<>(arraySize) + } + + /** + * Add a task to the current array, and submit the array when it + * reaches the desired size. + * + * @param task + */ + void collect(TaskRun task) { + sync.lock() + try { + // submit task directly if the collector is closed + // or if the task is retried (since it might have dynamic resources) + if( closed || task.config.getAttempt() > 1 ) { + executor.submit(task) + return + } + + // add task to the array + array.add(task) + + // submit job array when it is ready + if( array.size() == arraySize ) { + executor.submit(createTaskArray(array)) + array = new ArrayList<>(arraySize) + } + } + finally { + sync.unlock() + } + } + + /** + * Close the collector, submitting any remaining tasks as a partial job array. + */ + void close() { + sync.lock() + try { + if( array.size() == 1 ) { + executor.submit(array.first()) + } + else if( array.size() > 0 ) { + executor.submit(createTaskArray(array)) + array = null + } + closed = true + } + finally { + sync.unlock() + } + } + + /** + * Create the task run for a job array. + * + * @param tasks + */ + protected TaskArrayRun createTaskArray(List tasks) { + // prepare child job launcher scripts + final handlers = tasks.collect( t -> executor.createTaskHandler(t) ) + for( TaskHandler handler : handlers ) { + handler.prepareLauncher() + } + + // create work directory + final hash = CacheHelper.hasher( tasks.collect( t -> t.getHash().asLong() ) ).hash() + final workDir = FileHelper.getWorkFolder(executor.getWorkDir(), hash) + Files.createDirectories(workDir) + + // create wrapper script + final script = createArrayTaskScript(handlers) + log.debug "Creating task array run >> $workDir\n$script" + + // create config for job array + final rawConfig = new HashMap(ARRAY_DIRECTIVES.size()) + for( final key : ARRAY_DIRECTIVES ) { + final value = processor.config.get(key) + if( value != null ) + rawConfig[key] = value + } + + // create job array + final first = tasks.min( t -> t.index ) + final taskArray = new TaskArrayRun( + id: first.id, + index: first.index, + processor: processor, + type: processor.taskBody.type, + config: new TaskConfig(rawConfig), + context: new TaskContext(processor), + hash: hash, + workDir: workDir, + script: script, + children: handlers + ) + taskArray.config.context = taskArray.context + taskArray.config.process = taskArray.processor.name + taskArray.config.executor = taskArray.processor.executor.name + + return taskArray + } + + /** + * Create the wrapper script for a job array. + * + * @param array + */ + protected String createArrayTaskScript(List array) { + // get work directory and launch command for each task + final workDirs = array.collect( h -> executor.getArrayWorkDir(h) ) + """ + array=( ${workDirs.collect( p -> Escape.path(p) ).join(' ')} ) + export nxf_array_task_dir=${getArrayIndexRef()} + ${executor.getArrayLaunchCommand('$nxf_array_task_dir')} + """.stripIndent().leftTrim() + } + + protected String getArrayIndexRef() { + final name = executor.getArrayIndexName() + final start = executor.getArrayIndexStart() + final index = start > 0 ? "${name} - ${start}" : name + return '${array[' + index + ']}' + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayRun.groovy new file mode 100644 index 0000000000..6d03af83f0 --- /dev/null +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskArrayRun.groovy @@ -0,0 +1,55 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor + +import groovy.transform.CompileStatic +import groovy.util.logging.Slf4j +import nextflow.container.ContainerConfig +import nextflow.executor.TaskArrayExecutor + +/** + * Models a task array, which submits a collection of independent + * tasks with a single submit script. + * + * @author Ben Sherman + */ +@Slf4j +@CompileStatic +class TaskArrayRun extends TaskRun { + + List children + + int getArraySize() { + children.size() + } + + @Override + ContainerConfig getContainerConfig() { + final config = super.getContainerConfig() + final envWhitelist = config.getEnvWhitelist() ?: [] + final executor = (TaskArrayExecutor)processor.getExecutor() + envWhitelist << executor.getArrayIndexName() + config.put('envWhitelist', envWhitelist) + return config + } + + @Override + boolean isContainerEnabled() { + return false + } + +} diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy index c1ef240388..d1be0cf236 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskBean.groovy @@ -22,6 +22,7 @@ import groovy.transform.CompileStatic import groovy.transform.PackageScope import nextflow.container.ContainerConfig import nextflow.executor.BashWrapperBuilder +import nextflow.executor.TaskArrayExecutor import nextflow.util.MemoryUnit /** @@ -103,6 +104,12 @@ class TaskBean implements Serializable, Cloneable { Map resourceLabels + String arrayIndexName + + Integer arrayIndexStart + + List arrayWorkDirs + @PackageScope TaskBean() { shell = BashWrapperBuilder.BASH @@ -156,6 +163,14 @@ class TaskBean implements Serializable, Cloneable { this.stageOutMode = task.config.getStageOutMode() this.resourceLabels = task.config.getResourceLabels() + + // job array + if( task instanceof TaskArrayRun ) { + final executor = (TaskArrayExecutor)task.getProcessor().getExecutor() + this.arrayIndexName = executor.getArrayIndexName() + this.arrayIndexStart = executor.getArrayIndexStart() + this.arrayWorkDirs = task.children.collect( h -> h.task.workDir ) + } } @Override diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy index 11cb9ecc6d..8c117d671b 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskConfig.groovy @@ -186,6 +186,10 @@ class TaskConfig extends LazyMap implements Cloneable { return false } + int getArray() { + get('array') as Integer ?: 0 + } + String getBeforeScript() { return get('beforeScript') } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy index 89c3b79021..f7ae9c652d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskHandler.groovy @@ -88,6 +88,14 @@ abstract class TaskHandler { */ abstract void submit() + /** + * Prepare the launcher script. + * + * This method is optional. If it is not implemented, the launcher script should + * be prepared in the submit() method. + */ + void prepareLauncher() {} + /** * Task status attribute setter. * diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy index e33ac0a767..2caf82d1ce 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskPollingMonitor.groovy @@ -199,13 +199,27 @@ class TaskPollingMonitor implements TaskMonitor { * A {@link TaskHandler} instance representing the task to be submitted for execution */ protected void submit(TaskHandler handler) { - // submit the job execution -- throws a ProcessException when submit operation fail - handler.submit() - // note: add the 'handler' into the polling queue *after* the submit operation, - // this guarantees that in the queue are only jobs successfully submitted - runningQueue.add(handler) - // notify task submission - session.notifyTaskSubmit(handler) + if( handler.task instanceof TaskArrayRun ) { + // submit task array + handler.prepareLauncher() + handler.submit() + // add each child task to the running queue + final task = handler.task as TaskArrayRun + for( TaskHandler it : task.children ) { + runningQueue.add(it) + session.notifyTaskSubmit(it) + } + } + else { + // submit the job execution -- throws a ProcessException when submit operation fail + handler.prepareLauncher() + handler.submit() + // note: add the 'handler' into the polling queue *after* the submit operation, + // this guarantees that in the queue are only jobs successfully submitted + runningQueue.add(handler) + // notify task submission + session.notifyTaskSubmit(handler) + } } /** @@ -233,7 +247,7 @@ class TaskPollingMonitor implements TaskMonitor { try{ pendingQueue << handler taskAvail.signal() // signal that a new task is available for execution - session.notifyTaskPending(handler) + notifyTaskPending(handler) log.trace "Scheduled task > $handler" } finally { @@ -573,7 +587,7 @@ class TaskPollingMonitor implements TaskMonitor { } catch ( Throwable e ) { handleException(handler, e) - session.notifyTaskComplete(handler) + notifyTaskComplete(handler) } // remove processed handler either on successful submit or failed one (managed by catch section) // when `canSubmit` return false the handler should be retained to be tried in a following iteration @@ -704,5 +718,27 @@ class TaskPollingMonitor implements TaskMonitor { return pendingQueue } + protected void notifyTaskPending(TaskHandler handler) { + if( handler.task instanceof TaskArrayRun ) { + final task = handler.task as TaskArrayRun + for( TaskHandler it : task.children ) + session.notifyTaskPending(it) + } + else { + session.notifyTaskPending(handler) + } + } + + protected void notifyTaskComplete(TaskHandler handler) { + if( handler.task instanceof TaskArrayRun ) { + final task = handler.task as TaskArrayRun + for( TaskHandler it : task.children ) + session.notifyTaskComplete(it) + } + else { + session.notifyTaskComplete(handler) + } + } + } diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy index 677e4c8b0b..302955617a 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskProcessor.groovy @@ -255,6 +255,8 @@ class TaskProcessor { private Boolean isFair0 + private TaskArrayCollector arrayCollector + private CompilerConfiguration compilerConfig() { final config = new CompilerConfiguration() config.addCompilationCustomizers( new ASTTransformationCustomizer(TaskTemplateVarsXform) ) @@ -309,6 +311,9 @@ class TaskProcessor { this.maxForks = config.maxForks && config.maxForks>0 ? config.maxForks as int : 0 this.forksCount = maxForks ? new LongAdder() : null this.isFair0 = config.getFair() + + final arraySize = config.getArray() + this.arrayCollector = arraySize > 0 ? new TaskArrayCollector(this, executor, arraySize) : null } /** @@ -377,6 +382,8 @@ class TaskProcessor { return provider } + boolean isSingleton() { singleton } + /** * Create a "preview" for a task run. This method is only meant for the creation of "mock" task run * to allow the access for the associated {@link TaskConfig} during a pipeline "preview" execution. @@ -2323,7 +2330,10 @@ class TaskProcessor { makeTaskContextStage3(task, hash, folder) // add the task to the collection of running tasks - executor.submit(task) + if( arrayCollector ) + arrayCollector.collect(task) + else + executor.submit(task) } @@ -2417,6 +2427,10 @@ class TaskProcessor { state.update { StateObj it -> it.incCompleted() } } + protected void closeProcess() { + arrayCollector?.close() + } + protected void terminateProcess() { log.trace "<${name}> Sending poison pills and terminating process" sendPoisonPill() @@ -2569,6 +2583,7 @@ class TaskProcessor { // apparently auto if-guard instrumented by @Slf4j is not honoured in inner classes - add it explicitly if( log.isTraceEnabled() ) log.trace "<${name}> After stop" + closeProcess() } /** diff --git a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy index 69e217291e..2189ef39d1 100644 --- a/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/processor/TaskRun.groovy @@ -20,6 +20,7 @@ import java.nio.file.FileSystems import java.nio.file.NoSuchFileException import java.nio.file.Path import java.util.concurrent.ConcurrentHashMap +import java.util.function.Function import com.google.common.hash.HashCode import groovy.transform.PackageScope @@ -448,10 +449,16 @@ class TaskRun implements Cloneable { /** * Look at the {@code nextflow.script.FileOutParam} which name is the expected * output name - * */ List getOutputFilesNames() { - cache0.computeIfAbsent('outputFileNames', (it)-> getOutputFilesNames0()) + // note: use an explicit function instead of a closure or lambda syntax, otherwise + // when calling this method from a subclass it will result into a MissingMethodExeception + // see https://issues.apache.org/jira/browse/GROOVY-2433 + cache0.computeIfAbsent('outputFileNames', new Function>() { + @Override + List apply(String s) { + return getOutputFilesNames0() + }}) } private List getOutputFilesNames0() { @@ -614,7 +621,14 @@ class TaskRun implements Cloneable { } Path getCondaEnv() { - cache0.computeIfAbsent('condaEnv', (it)-> getCondaEnv0()) + // note: use an explicit function instead of a closure or lambda syntax, otherwise + // when calling this method from a subclass it will result into a MissingMethodExeception + // see https://issues.apache.org/jira/browse/GROOVY-2433 + cache0.computeIfAbsent('condaEnv', new Function() { + @Override + Path apply(String it) { + return getCondaEnv0() + }}) } private Path getCondaEnv0() { @@ -626,7 +640,14 @@ class TaskRun implements Cloneable { } Path getSpackEnv() { - cache0.computeIfAbsent('spackEnv', (it)-> getSpackEnv0()) + // note: use an explicit function instead of a closure or lambda syntax, otherwise + // when calling this method from a subclass it will result into a MissingMethodExeception + // see https://issues.apache.org/jira/browse/GROOVY-2433 + cache0.computeIfAbsent('spackEnv', new Function() { + @Override + Path apply(String it) { + return getSpackEnv0() + }}) } private Path getSpackEnv0() { @@ -640,7 +661,14 @@ class TaskRun implements Cloneable { } protected ContainerInfo containerInfo() { - cache0.computeIfAbsent('containerInfo', (it)-> containerInfo0()) + // note: use an explicit function instead of a closure or lambda syntax, otherwise + // when calling this method from a subclass it will result into a MissingMethodExeception + // see https://issues.apache.org/jira/browse/GROOVY-2433 + cache0.computeIfAbsent('containerInfo', new Function() { + @Override + ContainerInfo apply(String s) { + return containerInfo0() + }}) } private ContainerInfo containerInfo0() { diff --git a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy index 8a66c71082..df7ce27ec6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/script/ProcessConfig.groovy @@ -64,6 +64,7 @@ class ProcessConfig implements Map, Cloneable { 'accelerator', 'afterScript', 'arch', + 'array', 'beforeScript', 'cache', 'cleanup', @@ -1004,6 +1005,25 @@ class ProcessConfig implements Map, Cloneable { return this } + int getArray() { + final value = configProperties.get('array') + if( value == null ) + return 0 + if( value instanceof Closure ) + throw new IllegalArgumentException("Process directive `array` cannot be declared in a dynamic manner with a closure") + try { + final result = value as Integer + if( result < 0 ) + throw new IllegalArgumentException("Process directive `array` cannot be a negative number") + if( result == 1 ) + throw new IllegalArgumentException("Process directive `array` should be greater than 1") + return result + } + catch( NumberFormatException e ) { + throw new IllegalArgumentException("Process directive `array` should be an integer greater than 1 -- offending value: '$value'", e) + } + } + private static final List VALID_RESOURCE_LIMITS = List.of('cpus', 'memory', 'disk', 'time') ProcessConfig resourceLimits( Map entries ) { diff --git a/modules/nextflow/src/main/resources/nextflow/executor/command-run.txt b/modules/nextflow/src/main/resources/nextflow/executor/command-run.txt index c63e91f702..2bf34b617a 100644 --- a/modules/nextflow/src/main/resources/nextflow/executor/command-run.txt +++ b/modules/nextflow/src/main/resources/nextflow/executor/command-run.txt @@ -14,7 +14,7 @@ ## limitations under the License. #!/bin/bash {{header_script}} -# NEXTFLOW TASK: {{task_name}} +{{task_metadata}} set -e set -u NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/BashTemplateEngineTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/BashTemplateEngineTest.groovy index 90df503a53..30616c682a 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/BashTemplateEngineTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/BashTemplateEngineTest.groovy @@ -31,7 +31,6 @@ class BashTemplateEngineTest extends Specification { def template = '''\ ## comment #!/bin/bash - # NEXTFLOW TASK: foo line 1 ## comment ## @@ -42,7 +41,6 @@ class BashTemplateEngineTest extends Specification { expect: engine.render(template, [:]) == '''\ #!/bin/bash - # NEXTFLOW TASK: foo line 1 line 2 line 3 diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy index f60f555cdb..c2757468e1 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/BashWrapperBuilderTest.groovy @@ -27,6 +27,7 @@ import nextflow.container.DockerBuilder import nextflow.container.SingularityBuilder import nextflow.processor.TaskBean import nextflow.util.MustacheTemplateEngine +import org.yaml.snakeyaml.Yaml import spock.lang.Specification import spock.lang.Unroll /** @@ -51,6 +52,8 @@ class BashWrapperBuilderTest extends Specification { bean.script = 'echo Hello world!' if( !bean.containsKey('inputFiles') ) bean.inputFiles = [:] + if( !bean.containsKey('outputFiles') ) + bean.outputFiles = [] new BashWrapperBuilder(bean as TaskBean) { @Override protected String getSecretsEnv() { @@ -335,6 +338,94 @@ class BashWrapperBuilderTest extends Specification { } + def 'should create task metadata string' () { + given: + def builder = newBashWrapperBuilder( + name: 'foo', + arrayIndexName: 'SLURM_ARRAY_TASK_ID', + arrayIndexStart: 0, + arrayWorkDirs: [ Path.of('/work/01'), Path.of('/work/02'), Path.of('/work/03') ], + containerConfig: [enabled: true], + containerImage: 'quay.io/nextflow:bash', + outputFiles: ['foo.txt', '*.bar', '**/baz'] + ) + + when: + def meta = builder.getTaskMetadata() + then: + meta == '''\ + ### --- + ### name: 'foo' + ### array: + ### index-name: SLURM_ARRAY_TASK_ID + ### index-start: 0 + ### work-dirs: + ### - /work/01 + ### - /work/02 + ### - /work/03 + ### container: 'quay.io/nextflow:bash' + ### outputs: + ### - 'foo.txt' + ### - '*.bar' + ### - '**/baz' + ### ... + '''.stripIndent() + + when: + def yaml = meta.readLines().collect(it-> it.substring(4)).join('\n') + def obj = new Yaml().load(yaml) as Map + then: + obj.name == 'foo' + obj.array == [ + 'index-name':'SLURM_ARRAY_TASK_ID', + 'index-start':0, + 'work-dirs':['/work/01', '/work/02', '/work/03'] + ] + obj.container == 'quay.io/nextflow:bash' + obj.outputs == ['foo.txt', '*.bar', '**/baz'] + } + + def 'should add task metadata' () { + when: + def bash = newBashWrapperBuilder([name:'task1']) + then: + bash.makeBinding().containsKey('task_metadata') + bash.makeBinding().task_metadata == '''\ + ### --- + ### name: 'task1' + ### ... + '''.stripIndent() + + when: + bash = newBashWrapperBuilder( + name: 'task2', + arrayIndexName: 'SLURM_ARRAY_TASK_ID', + arrayIndexStart: 0, + arrayWorkDirs: [ Path.of('/work/01'), Path.of('/work/02'), Path.of('/work/03') ], + containerConfig: [enabled: true], + containerImage: 'quay.io/nextflow:bash', + outputFiles: ['foo.txt', '*.bar', '**/baz'] + ) + then: + bash.makeBinding().task_metadata == '''\ + ### --- + ### name: 'task2' + ### array: + ### index-name: SLURM_ARRAY_TASK_ID + ### index-start: 0 + ### work-dirs: + ### - /work/01 + ### - /work/02 + ### - /work/03 + ### container: 'quay.io/nextflow:bash' + ### outputs: + ### - 'foo.txt' + ### - '*.bar' + ### - '**/baz' + ### ... + '''.stripIndent() + } + def 'should copy control files' () { when: @@ -1083,6 +1174,7 @@ class BashWrapperBuilderTest extends Specification { given: def bean = Mock(TaskBean) { inputFiles >> [:] + outputFiles >> [] } def copy = Mock(ScriptFileCopyStrategy) bean.workDir >> Paths.get('/work/dir') diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/CrgExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/CrgExecutorTest.groovy index a4f012d5a8..6036be4980 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/CrgExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/CrgExecutorTest.groovy @@ -20,6 +20,7 @@ import java.nio.file.Paths import nextflow.Session import nextflow.container.ContainerConfig +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun @@ -337,6 +338,24 @@ class CrgExecutorTest extends Specification { .stripIndent().leftTrim() } + def 'should get headers with job array' () { + when: + def executor = Spy(new CrgExecutor()) + def task = Mock(TaskArrayRun) { + config >> new TaskConfig() + name >> 'mapping tag' + workDir >> Paths.get('/abc') + getArraySize() >> 5 + } + then: + executor.getHeaders(task) == ''' + #$ -t 1-5 + #$ -N nf-mapping_tag + #$ -terse + #$ -notify + ''' + .stripIndent().leftTrim() + } def testParseJobId() { diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/LsfExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/LsfExecutorTest.groovy index 7b59b80bcc..32d2d75459 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/LsfExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/LsfExecutorTest.groovy @@ -21,6 +21,7 @@ import java.nio.file.Path import java.nio.file.Paths import nextflow.Session +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun @@ -137,21 +138,17 @@ class LsfExecutorTest extends Specification { } - def testHeaders() { + def 'test job script headers' () { setup: - // LSF executor def executor = Spy(LsfExecutor) executor.@memUnit = 'MB' executor.@usageUnit = 'MB' executor.session = new Session() - // mock process def proc = Mock(TaskProcessor) - // process name proc.getName() >> 'task' - // task object def task = new TaskRun() task.processor = proc task.workDir = Paths.get('/scratch') @@ -159,13 +156,11 @@ class LsfExecutorTest extends Specification { when: task.config = new TaskConfig() - // config task.config.queue = 'bsc_ls' task.config.clusterOptions = "-x 1 -R \"span[ptile=2]\"" task.config.cpus = '2' task.config.time = '1h 30min' task.config.memory = '8GB' - then: executor.getHeaders(task) == ''' #BSUB -o /scratch/.command.log @@ -181,7 +176,6 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: task.config = new TaskConfig() task.config.queue = 'alpha' @@ -194,7 +188,6 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: task.config = new TaskConfig() task.config.queue = 'alpha' @@ -212,7 +205,6 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: task.config = new TaskConfig() task.config.queue = 'gamma' @@ -230,7 +222,6 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: task.config = new TaskConfig() task.config.queue = 'gamma' @@ -267,7 +258,6 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: task.config = new TaskConfig() task.config.queue = 'gamma' @@ -287,7 +277,6 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: task.config = new TaskConfig() task.config.queue = 'delta' @@ -304,6 +293,19 @@ class LsfExecutorTest extends Specification { ''' .stripIndent().leftTrim() + when: 'with job array' + def taskArray = Mock(TaskArrayRun) { + config >> new TaskConfig() + name >> task.name + workDir >> task.workDir + getArraySize() >> 5 + } + then: + executor.getHeaders(taskArray) == ''' + #BSUB -J "nf-mapping_hola[1-5]" + ''' + .stripIndent().leftTrim() + } def testDiskResources() { @@ -731,6 +733,27 @@ class LsfExecutorTest extends Specification { 'a'.repeat(509) | 'nf-'.concat("a".repeat(508)) } + def 'should get array index name and start' () { + given: + def executor = Spy(LsfExecutor) + expect: + executor.getArrayIndexName() == 'LSB_JOBINDEX' + executor.getArrayIndexStart() == 1 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(LsfExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo' | 1 | 'foo[2]' + 'bar' | 2 | 'bar[3]' + } + @Unroll def 'should set lsf account' () { given: diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/PbsExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/PbsExecutorTest.groovy index be72557689..7ac672acc8 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/PbsExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/PbsExecutorTest.groovy @@ -18,6 +18,7 @@ package nextflow.executor import java.nio.file.Paths +import nextflow.processor.TaskArrayRun import nextflow.Session import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor @@ -42,7 +43,7 @@ class PbsExecutorTest extends Specification { } - def testHeaders() { + def 'test job script headers'() { setup: def executor = Spy(PbsExecutor) @@ -50,8 +51,6 @@ class PbsExecutorTest extends Specification { // mock process def proc = Mock(TaskProcessor) - - // task object def task = new TaskRun() task.processor = proc task.workDir = Paths.get('/work/dir') @@ -163,6 +162,20 @@ class PbsExecutorTest extends Specification { #PBS -l nodes=1:x86:ppn=4 ''' .stripIndent().leftTrim() + + when: 'with job array' + def taskArray = Mock(TaskArrayRun) { + config >> new TaskConfig() + name >> task.name + workDir >> task.workDir + getArraySize() >> 5 + } + then: + executor.getHeaders(taskArray) == ''' + #PBS -J 0-4 + #PBS -N nf-task_name + ''' + .stripIndent().leftTrim() } def WorkDirWithBlanks() { @@ -302,6 +315,27 @@ class PbsExecutorTest extends Specification { !PbsExecutor.matchOptions('-x-l foo') } + def 'should get array index name and start' () { + given: + def executor = Spy(PbsExecutor) + expect: + executor.getArrayIndexName() == 'PBS_ARRAY_INDEX' + executor.getArrayIndexStart() == 0 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(PbsExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo[]' | 1 | 'foo[1]' + 'bar[]' | 2 | 'bar[2]' + } + def 'should set pbs account' () { given: // task diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/PbsProExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/PbsProExecutorTest.groovy index 28b320bc13..f4828dff71 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/PbsProExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/PbsProExecutorTest.groovy @@ -16,14 +16,15 @@ package nextflow.executor -import nextflow.Session -import nextflow.processor.TaskProcessor -import spock.lang.Specification - import java.nio.file.Paths +import nextflow.Session +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig +import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun +import spock.lang.Specification +import spock.lang.Unroll /** * * @author Lorenz Gerber @@ -184,12 +185,31 @@ class PbsProExecutorTest extends Specification { ] } + def 'should get directives with job array' () { + given: + def executor = Spy(PbsProExecutor) + executor.getSession() >> Mock(Session) + and: + def task = Mock(TaskArrayRun) { + config >> new TaskConfig() + name >> 'foo' + workDir >> Paths.get('/foo/bar') + getArraySize() >> 5 + } + + expect: + executor.getDirectives(task, []) == [ + '-J', '0-4', + '-N', 'nf-foo', + ] + } + def 'should return qstat command line' () { given: def executor = [:] as PbsProExecutor expect: - executor.queueStatusCommand(null) == ['bash','-c', "set -o pipefail; qstat -f \$( qstat -B | grep -E -v '(^Server|^---)' | awk -v ORS=' ' '{print \"@\"\$1}' ) | { grep -E '(Job Id:|job_state =)' || true; }"] + executor.queueStatusCommand(null) == ['bash','-c', "set -o pipefail; qstat -f \$( qstat -B | grep -E -v '(^Server|^---)' | awk -v ORS=' ' '{print \"@\"\$1}' ) | { grep -E '(Job Id:|job_state =)' || true; }"] executor.queueStatusCommand('xxx') == ['bash','-c', "set -o pipefail; qstat -f xxx | { grep -E '(Job Id:|job_state =)' || true; }"] executor.queueStatusCommand('xxx').each { assert it instanceof String } } @@ -256,6 +276,27 @@ class PbsProExecutorTest extends Specification { .stripIndent().leftTrim() } + def 'should get array index name and start' () { + given: + def executor = Spy(PbsProExecutor) + expect: + executor.getArrayIndexName() == 'PBS_ARRAY_INDEX' + executor.getArrayIndexStart() == 0 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(PbsProExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo[]' | 1 | 'foo[1]' + 'bar[]' | 2 | 'bar[2]' + } + def 'should set pbs account' () { given: // task diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/SgeExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/SgeExecutorTest.groovy index c12897e21e..7466c83885 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/SgeExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/SgeExecutorTest.groovy @@ -17,10 +17,13 @@ package nextflow.executor import java.nio.file.Paths +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun import spock.lang.Specification +import spock.lang.Unroll + /** * * @author Paolo Di Tommaso @@ -51,10 +54,7 @@ class SgeExecutorTest extends Specification { def 'test qsub headers' () { given: - def config - // mock process def proc = Mock(TaskProcessor) - def executor = [:] as SgeExecutor def task = new TaskRun() task.processor = proc @@ -62,11 +62,9 @@ class SgeExecutorTest extends Specification { task.name = 'the task name' when: - - // config - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' then: executor.getHeaders(task) == ''' @@ -80,9 +78,9 @@ class SgeExecutorTest extends Specification { .stripIndent().leftTrim() when: - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' then: executor.getHeaders(task) == ''' #$ -N nf-the_task_name @@ -94,13 +92,12 @@ class SgeExecutorTest extends Specification { ''' .stripIndent().leftTrim() - when: - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' - config.time = '10s ' - config.clusterOptions = '-hard -alpha -beta' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' + task.config.time = '10s ' + task.config.clusterOptions = '-hard -alpha -beta' then: executor.getHeaders(task) == ''' #$ -N nf-the_task_name @@ -114,15 +111,12 @@ class SgeExecutorTest extends Specification { ''' .stripIndent().leftTrim() - - when: - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' - config.time = '10m' - config.memory = '1M' - config.remove('clusterOptions') + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' + task.config.time = '10m' + task.config.memory = '1M' then: executor.getHeaders(task) == ''' #$ -N nf-the_task_name @@ -136,16 +130,14 @@ class SgeExecutorTest extends Specification { ''' .stripIndent().leftTrim() - - when: - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' - config.cpus = 1 - config.penv = 'smp' - config.time = '2 m' - config.memory = '2 M' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' + task.config.cpus = 1 + task.config.penv = 'smp' + task.config.time = '2 m' + task.config.memory = '2 M' then: executor.getHeaders(task) == ''' #$ -N nf-the_task_name @@ -161,13 +153,13 @@ class SgeExecutorTest extends Specification { .stripIndent().leftTrim() when: - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' - config.cpus = 2 - config.penv = 'mpi' - config.time = '3 d' - config.memory = '3 g' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' + task.config.cpus = 2 + task.config.penv = 'mpi' + task.config.time = '3 d' + task.config.memory = '3 g' then: executor.getHeaders(task) == ''' #$ -N nf-the_task_name @@ -183,13 +175,13 @@ class SgeExecutorTest extends Specification { .stripIndent().leftTrim() when: - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' - config.cpus = 4 - config.penv = 'orte' - config.time = '1d3h' - config.memory = '4 GB ' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' + task.config.cpus = 4 + task.config.penv = 'orte' + task.config.time = '1d3h' + task.config.memory = '4 GB ' then: executor.getHeaders(task) == ''' #$ -N nf-the_task_name @@ -204,15 +196,28 @@ class SgeExecutorTest extends Specification { ''' .stripIndent().leftTrim() + when: 'with job array' + def taskArray = Mock(TaskArrayRun) { + config >> new TaskConfig() + name >> task.name + workDir >> task.workDir + getArraySize() >> 5 + } + then: + executor.getHeaders(taskArray) == ''' + #$ -t 1-5 + #$ -N nf-the_task_name + #$ -terse + #$ -notify + ''' + .stripIndent().leftTrim() + } def testWorkDirWithBlanks() { given: - def config - // mock process def proc = Mock(TaskProcessor) - def executor = [:] as SgeExecutor def task = new TaskRun() task.processor = proc @@ -220,11 +225,9 @@ class SgeExecutorTest extends Specification { task.name = 'the task name' when: - - // config - config = task.config = new TaskConfig() - config.queue = 'my-queue' - config.name = 'task' + task.config = new TaskConfig() + task.config.queue = 'my-queue' + task.config.name = 'task' then: executor.getHeaders(task) == ''' @@ -382,4 +385,24 @@ class SgeExecutorTest extends Specification { } + def 'should get array index name and start' () { + given: + def executor = Spy(SgeExecutor) + expect: + executor.getArrayIndexName() == 'SGE_TASK_ID' + executor.getArrayIndexStart() == 1 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(SgeExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo' | 1 | 'foo.1' + 'bar' | 2 | 'bar.2' + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/SlurmExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/SlurmExecutorTest.groovy index 39f02bf5dc..9ed86d0042 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/SlurmExecutorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/SlurmExecutorTest.groovy @@ -19,6 +19,7 @@ package nextflow.executor import java.nio.file.Paths import nextflow.Session +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig import nextflow.processor.TaskProcessor import nextflow.processor.TaskRun @@ -77,7 +78,7 @@ class SlurmExecutorTest extends Specification { '/some/path/job.sh' | true | ['sbatch'] } - def testGetHeaders() { + def 'test job script headers' () { setup: // SLURM executor @@ -136,7 +137,6 @@ class SlurmExecutorTest extends Specification { task.config.time = '1h' task.config.memory = '50 M' task.config.clusterOptions = '-a 1 --signal=KILL' - then: executor.getHeaders(task) == ''' #SBATCH -J nf-the_task_name @@ -154,7 +154,6 @@ class SlurmExecutorTest extends Specification { task.config.time = '2h' task.config.memory = '200 M' task.config.clusterOptions = '-b 2' - then: executor.getHeaders(task) == ''' #SBATCH -J nf-the_task_name @@ -174,7 +173,6 @@ class SlurmExecutorTest extends Specification { task.config.time = '2d 3h' task.config.memory = '3 G' task.config.clusterOptions = '-x 3' - then: executor.getHeaders(task) == ''' #SBATCH -J nf-the_task_name @@ -188,8 +186,7 @@ class SlurmExecutorTest extends Specification { ''' .stripIndent().leftTrim() - // test perCpuMemAllocation - when: + when: 'with perCpuMemAllocation' executor.@perCpuMemAllocation = true task.config = new TaskConfig() task.config.cpus = 8 @@ -204,6 +201,22 @@ class SlurmExecutorTest extends Specification { #SBATCH --mem-per-cpu 3072M ''' .stripIndent().leftTrim() + + when: 'with job array' + def taskArray = Mock(TaskArrayRun) { + config >> new TaskConfig() + name >> task.name + workDir >> task.workDir + getArraySize() >> 5 + } + then: + executor.getHeaders(taskArray) == ''' + #SBATCH --array 0-4 + #SBATCH -J nf-the_task_name + #SBATCH --no-requeue + #SBATCH --signal B:USR2@30 + ''' + .stripIndent().leftTrim() } def testWorkDirWithBlanks() { @@ -275,7 +288,27 @@ class SlurmExecutorTest extends Specification { usr exec.queueStatusCommand(null) == ['squeue','--noheader','-o','%i %t','-t','all','-u', usr] exec.queueStatusCommand('xxx') == ['squeue','--noheader','-o','%i %t','-t','all','-p','xxx','-u', usr] + } + def 'should get array index name and start' () { + given: + def executor = Spy(SlurmExecutor) + expect: + executor.getArrayIndexName() == 'SLURM_ARRAY_TASK_ID' + executor.getArrayIndexStart() == 0 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(SlurmExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo' | 1 | 'foo_1' + 'bar' | 2 | 'bar_2' } @Unroll diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/fusion/FusionHelperTest.groovy b/modules/nextflow/src/test/groovy/nextflow/fusion/FusionHelperTest.groovy similarity index 96% rename from modules/nextflow/src/test/groovy/nextflow/executor/fusion/FusionHelperTest.groovy rename to modules/nextflow/src/test/groovy/nextflow/fusion/FusionHelperTest.groovy index 697cdd5238..e8dc875b10 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/fusion/FusionHelperTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/fusion/FusionHelperTest.groovy @@ -15,14 +15,12 @@ * */ -package nextflow.executor.fusion +package nextflow.fusion import java.nio.file.Path import nextflow.container.ContainerConfig import nextflow.file.http.XPath -import nextflow.fusion.FusionHelper -import nextflow.fusion.FusionScriptLauncher import spock.lang.Specification /** diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/fusion/FusionScriptLauncherTest.groovy b/modules/nextflow/src/test/groovy/nextflow/fusion/FusionScriptLauncherTest.groovy similarity index 97% rename from modules/nextflow/src/test/groovy/nextflow/executor/fusion/FusionScriptLauncherTest.groovy rename to modules/nextflow/src/test/groovy/nextflow/fusion/FusionScriptLauncherTest.groovy index 60ff2be1e4..c0be12e632 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/fusion/FusionScriptLauncherTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/fusion/FusionScriptLauncherTest.groovy @@ -15,14 +15,13 @@ * */ -package nextflow.executor.fusion +package nextflow.fusion import java.nio.file.Path import nextflow.Global import nextflow.Session import nextflow.file.http.XPath -import nextflow.fusion.FusionScriptLauncher import nextflow.processor.TaskBean import spock.lang.Specification /** diff --git a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sWrapperBuilderTest.groovy b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sWrapperBuilderTest.groovy index 9951ae2bce..f3e188cdf0 100644 --- a/modules/nextflow/src/test/groovy/nextflow/k8s/K8sWrapperBuilderTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/k8s/K8sWrapperBuilderTest.groovy @@ -45,6 +45,7 @@ class K8sWrapperBuilderTest extends Specification { getProcessor() >> proc getWorkDir() >> folder getInputFilesMap() >> [:] + getOutputFilesNames() >> [] } and: diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy new file mode 100644 index 0000000000..2b4c7fb7e9 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayCollectorTest.groovy @@ -0,0 +1,198 @@ +/* + * Copyright 2013-2023, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package nextflow.processor + +import java.nio.file.Paths + +import com.google.common.hash.HashCode +import nextflow.Session +import nextflow.executor.Executor +import nextflow.executor.TaskArrayExecutor +import nextflow.script.BaseScript +import nextflow.script.BodyDef +import nextflow.script.ProcessConfig +import spock.lang.Specification +import test.TestHelper +/** + * + * @author Ben Sherman + */ +class TaskArrayCollectorTest extends Specification { + + static class DummyExecutor extends Executor implements TaskArrayExecutor { + TaskMonitor createTaskMonitor() { null } + TaskHandler createTaskHandler(TaskRun task) { null } + + String getArrayIndexName() { null } + int getArrayIndexStart() { 0 } + String getArrayTaskId(String jobId, int index) { null } + } + + def 'should submit tasks as job arrays' () { + given: + def executor = Mock(DummyExecutor) + def handler = Mock(TaskHandler) + def taskArray = [:] as TaskArrayRun + def collector = Spy(new TaskArrayCollector(null, executor, 5)) { + createTaskArray(_) >> taskArray + } + and: + def task = Mock(TaskRun) { + getConfig() >> Mock(TaskConfig) { + getAttempt() >> 1 + } + } + + // collect tasks into job array + when: + collector.collect(task) + collector.collect(task) + collector.collect(task) + collector.collect(task) + then: + 0 * executor.submit(_) + + // submit job array when it is ready + when: + collector.collect(task) + then: + 1 * executor.submit(taskArray) + + // submit partial job array when closed + when: + collector.collect(task) + collector.collect(task) + collector.close() + then: + 1 * executor.submit(taskArray) + + // submit tasks directly once closed + when: + collector.collect(task) + then: + 1 * executor.submit(task) + } + + def 'should submit retried tasks directly' () { + given: + def executor = Mock(DummyExecutor) + def collector = Spy(new TaskArrayCollector(null, executor, 5)) + and: + def task = Mock(TaskRun) { + getConfig() >> Mock(TaskConfig) { + getAttempt() >> 2 + } + } + + when: + collector.collect(task) + then: + 1 * executor.submit(task) + } + + def 'should create task array' () { + given: + def exec = Mock(DummyExecutor) { + getWorkDir() >> TestHelper.createInMemTempDir() + getArrayIndexName() >> 'ARRAY_JOB_INDEX' + } + def config = Spy(ProcessConfig, constructorArgs: [Mock(BaseScript), 'PROC']) { + createTaskConfig() >> Mock(TaskConfig) + get('cpus') >> 4 + get('tag') >> 'foo' + } + def proc = Mock(TaskProcessor) { + getConfig() >> config + getExecutor() >> exec + getName() >> 'PROC' + getSession() >> Mock(Session) + isSingleton() >> false + getTaskBody() >> { new BodyDef(null, 'source') } + } + def collector = Spy(new TaskArrayCollector(proc, exec, 5)) + and: + def task = Mock(TaskRun) { + index >> 1 + getHash() >> HashCode.fromString('0123456789abcdef') + getWorkDir() >> Paths.get('/work/foo') + } + def handler = Mock(TaskHandler) { + getTask() >> task + } + + when: + def taskArray = collector.createTaskArray([task, task, task]) + then: + 3 * exec.createTaskHandler(task) >> handler + 3 * handler.prepareLauncher() + 1 * collector.createArrayTaskScript([handler, handler, handler]) >> 'the-task-array-script' + and: + taskArray.name == 'PROC (1)' + taskArray.config.cpus == 4 + taskArray.config.tag == null + taskArray.processor == proc + taskArray.script == 'the-task-array-script' + and: + taskArray.getArraySize() == 3 + taskArray.getContainerConfig().getEnvWhitelist() == [ 'ARRAY_JOB_INDEX' ] + taskArray.isContainerEnabled() == false + } + + def 'should get array ref' () { + given: + def processor = Mock(TaskProcessor) + def executor = Mock(DummyExecutor) { + getArrayIndexName() >> NAME + getArrayIndexStart() >> START + } + def collector = Spy(new TaskArrayCollector(processor, executor, 10)) + expect: + collector.getArrayIndexRef() == EXPECTED + + where: + NAME | START | EXPECTED + 'INDEX' | 0 | '${array[INDEX]}' + 'INDEX' | 1 | '${array[INDEX - 1]}' + 'OTHER' | 99 | '${array[OTHER - 99]}' + } + + def 'should get array launch script' () { + given: + def processor = Mock(TaskProcessor) + def executor = Spy(DummyExecutor) { isWorkDirDefaultFS()>>true } + def collector = Spy(new TaskArrayCollector(processor, executor, 10)) + and: + def h1 = Mock(TaskHandler) + def h2 = Mock(TaskHandler) + def h3 = Mock(TaskHandler) + + when: + def result = collector.createArrayTaskScript([h1,h2,h3]) + then: + executor.getArrayWorkDir(h1) >> '/work/dir/1' + executor.getArrayWorkDir(h2) >> '/work/dir/2' + executor.getArrayWorkDir(h3) >> '/work/dir/3' + and: + collector.getArrayIndexRef() >> '$array[INDEX]' + then: + result == '''\ + array=( /work/dir/1 /work/dir/2 /work/dir/3 ) + export nxf_array_task_dir=$array[INDEX] + bash $nxf_array_task_dir/.command.run 2>&1 > $nxf_array_task_dir/.command.log + '''.stripIndent(true) + } +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayExecutorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayExecutorTest.groovy new file mode 100644 index 0000000000..0697fb807b --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayExecutorTest.groovy @@ -0,0 +1,73 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.executor + +import java.nio.file.Path + +import nextflow.file.http.XPath +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun +import spock.lang.Specification +import spock.lang.Unroll +/** + * + * @author Paolo Di Tommaso + */ +class TaskArrayExecutorTest extends Specification { + + static abstract class TestArrayExecutor implements TaskArrayExecutor { + + } + + @Unroll + def 'should get array work dir' () { + given: + def task = Mock(TaskRun) { getWorkDir() >> WORK_DIR } + def handler = Mock(TaskHandler) { getTask()>>task } + and: + def executor = Spy(TestArrayExecutor) { isFusionEnabled() >> FUSION } + when: + def result = executor.getArrayWorkDir(handler) + then: + result == EXPECTED + + where: + FUSION | WORK_DIR | EXPECTED + false | Path.of('/work/dir') | '/work/dir' + false | XPath.get('http://foo.com/work') | 'http://foo.com/work' + true | XPath.get('http://foo.com/work') | '/fusion/http/foo.com/work' + } + + @Unroll + def 'should get array launch command' () { + given: + def executor = Spy(TestArrayExecutor) { + isFusionEnabled() >> FUSION + isWorkDirDefaultFS() >> !FUSION + } + + expect: + executor.getArrayLaunchCommand(WORK_DIR) == EXPECTED + + where: + FUSION | WORK_DIR | EXPECTED + false | '/work/dir' | 'bash /work/dir/.command.run 2>&1 > /work/dir/.command.log' + true | '/fusion/work/dir' | 'bash /fusion/work/dir/.command.run' + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayRunTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayRunTest.groovy new file mode 100644 index 0000000000..e64af32035 --- /dev/null +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskArrayRunTest.groovy @@ -0,0 +1,49 @@ +/* + * Copyright 2013-2024, Seqera Labs + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package nextflow.processor + +import nextflow.Session +import nextflow.container.resolver.ContainerInfo +import nextflow.executor.Executor +import nextflow.executor.TaskArrayExecutor +import spock.lang.Specification +/** + * + * @author Paolo Di Tommaso + */ +class TaskArrayRunTest extends Specification { + + static abstract class TestExecutor extends Executor implements TaskArrayExecutor { + + } + + def 'should get container info' () { + given: + def session = Mock(Session) + def executor = Mock(TestExecutor) { getSession()>>session } + def processor = Mock(TaskProcessor) { getExecutor()>>executor; getSession()>>session } + and: + def config = new TaskConfig([container:'ubuntu']) + def task = new TaskArrayRun(config: config, processor: processor) + when: + def info = task.containerInfo() + then: + info == new ContainerInfo('ubuntu','ubuntu','ubuntu') + } + +} diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy index ae11a37475..23f5098131 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskPollingMonitorTest.groovy @@ -124,5 +124,28 @@ class TaskPollingMonitorTest extends Specification { 0 * session.notifyTaskComplete(handler) >> null } + def 'should submit a job array' () { + given: + def session = Mock(Session) + def monitor = Spy(new TaskPollingMonitor(name: 'foo', session: session, pollInterval: Duration.of('1min'))) + and: + def handler = Mock(TaskHandler) { + getTask() >> Mock(TaskRun) + } + def arrayHandler = Mock(TaskHandler) { + getTask() >> Mock(TaskArrayRun) { + children >> (1..3).collect( i -> handler ) + } + } + + when: + monitor.submit(arrayHandler) + then: + 1 * arrayHandler.prepareLauncher() + 1 * arrayHandler.submit() + 0 * handler.prepareLauncher() + 0 * handler.submit() + 3 * session.notifyTaskSubmit(handler) + } } diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy index 80bcbf3458..3fcaac0206 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskRunTest.groovy @@ -870,4 +870,19 @@ class TaskRunTest extends Specification { // the engine is enabled by default config == new ContainerConfig(engine:'foo', enabled: true) // <-- 'foo' engine is enabled } + + def 'should get container info' () { + given: + def session = Mock(Session) + def executor = Mock(Executor) { getSession()>>session } + def processor = Mock(TaskProcessor) { getExecutor()>>executor; getSession()>>session } + and: + def config = new TaskConfig([container:'ubuntu']) + def task = new TaskRun(config: config, processor: processor) + when: + def info = task.containerInfo() + then: + info == new ContainerInfo('ubuntu','ubuntu','ubuntu') + } + } diff --git a/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt b/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt index b72d5880ce..ef1380e7cf 100644 --- a/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt +++ b/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper-with-trace.txt @@ -1,5 +1,7 @@ #!/bin/bash -# NEXTFLOW TASK: Hello 2 +### --- +### name: 'Hello 2' +### ... set -e set -u NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x diff --git a/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper.txt b/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper.txt index 5abcf4a511..f465b5b9b4 100644 --- a/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper.txt +++ b/modules/nextflow/src/test/resources/nextflow/executor/test-bash-wrapper.txt @@ -1,7 +1,9 @@ #!/bin/bash #BSUB -x 1 #BSUB -y 2 -# NEXTFLOW TASK: Hello 1 +### --- +### name: 'Hello 1' +### ... set -e set -u NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index a30ed2bb64..5ebca92283 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -23,23 +23,25 @@ import com.amazonaws.services.batch.AWSBatch import com.amazonaws.services.batch.model.AWSBatchException import com.amazonaws.services.ecs.model.AccessDeniedException import com.amazonaws.services.logs.model.ResourceNotFoundException -import nextflow.cloud.aws.nio.S3Path import groovy.transform.CompileDynamic import groovy.transform.CompileStatic import groovy.transform.PackageScope import groovy.util.logging.Slf4j import nextflow.cloud.aws.AwsClientFactory import nextflow.cloud.aws.config.AwsConfig +import nextflow.cloud.aws.nio.S3Path import nextflow.cloud.types.CloudMachineInfo import nextflow.exception.AbortOperationException import nextflow.executor.Executor -import nextflow.fusion.FusionHelper +import nextflow.executor.TaskArrayExecutor import nextflow.extension.FilesEx +import nextflow.fusion.FusionHelper import nextflow.processor.ParallelPollingMonitor import nextflow.processor.TaskHandler import nextflow.processor.TaskMonitor import nextflow.processor.TaskRun import nextflow.util.Duration +import nextflow.util.Escape import nextflow.util.RateUnit import nextflow.util.ServiceName import nextflow.util.ThreadPoolHelper @@ -54,7 +56,7 @@ import org.pf4j.ExtensionPoint @Slf4j @ServiceName('awsbatch') @CompileStatic -class AwsBatchExecutor extends Executor implements ExtensionPoint { +class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExecutor { /** * Proxy to throttle AWS batch client requests @@ -82,6 +84,8 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint { private AwsOptions awsOptions + private final Set deletedJobs = new HashSet<>() + AwsOptions getAwsOptions() { awsOptions } /** @@ -265,6 +269,17 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint { @PackageScope ThrottlingExecutor getReaper() { reaper } + boolean shouldDeleteJob(String jobId) { + if( jobId in deletedJobs ) { + // if the job is already in the list if has been already deleted + return false + } + synchronized (deletedJobs) { + // add the job id to the set of deleted jobs, if it's a new id, the `add` method + // returns true therefore the job should be deleted + return deletedJobs.add(jobId) + } + } CloudMachineInfo getMachineInfoByQueueAndTaskArn(String queue, String taskArn) { try { @@ -308,13 +323,50 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint { ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) } -} - - + @Override + String getArrayIndexName() { 'AWS_BATCH_JOB_ARRAY_INDEX' } + @Override + int getArrayIndexStart() { 0 } + @Override + String getArrayTaskId(String jobId, int index) { + return "${jobId}:${index}" + } + @Override + String getArrayLaunchCommand(String taskDir) { + if( isFusionEnabled() || isWorkDirDefaultFS() ) + return TaskArrayExecutor.super.getArrayLaunchCommand(taskDir) + else + return Escape.cli(getLaunchCommand(taskDir) as String[]) + } + List getLaunchCommand(String s3WorkDir) { + // the cmd list to launch it + final opts = getAwsOptions() + final cmd = opts.s5cmdPath + ? s5Cmd(s3WorkDir, opts) + : s3Cmd(s3WorkDir, opts) + return ['bash','-o','pipefail','-c', cmd.toString()] + } + static String s3Cmd(String workDir, AwsOptions opts) { + final cli = opts.getAwsCli() + final debug = opts.debug ? ' --debug' : '' + final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : '' + final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : '' + final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}" + final cmd = "trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $aws ${workDir}/${TaskRun.CMD_RUN} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}" + return cmd + } + static String s5Cmd(String workDir, AwsOptions opts) { + final cli = opts.getS5cmdPath() + final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : '' + final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : '' + final cmd = "trap \"{ ret=\$?; $cli cp${sse}${kms} ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}||true; exit \$ret; }\" EXIT; $cli cat ${workDir}/${TaskRun.CMD_RUN} | bash 2>&1 | tee ${TaskRun.CMD_LOG}" + return cmd + } +} diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy index 3bb960a226..ff20ca8da3 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchTaskHandler.groovy @@ -24,6 +24,7 @@ import java.time.Instant import com.amazonaws.services.batch.AWSBatch import com.amazonaws.services.batch.model.AWSBatchException +import com.amazonaws.services.batch.model.ArrayProperties import com.amazonaws.services.batch.model.AssignPublicIp import com.amazonaws.services.batch.model.AttemptContainerDetail import com.amazonaws.services.batch.model.ClientException @@ -59,7 +60,6 @@ import groovy.transform.CompileStatic import groovy.transform.Memoized import groovy.util.logging.Slf4j import nextflow.BuildInfo -import nextflow.Const import nextflow.cloud.types.CloudMachineInfo import nextflow.container.ContainerNameValidator import nextflow.exception.ProcessSubmitException @@ -68,6 +68,7 @@ import nextflow.executor.BashWrapperBuilder import nextflow.fusion.FusionAwareTask import nextflow.processor.BatchContext import nextflow.processor.BatchHandler +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskHandler import nextflow.processor.TaskRun import nextflow.processor.TaskStatus @@ -283,7 +284,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler job=$jobId; work-dir=${task.getWorkDirStr()}" + updateStatus(resp.jobId, req.getJobQueue()) + log.debug "[AWS BATCH] Process `${task.lazyName()}` submitted > job=$jobId; work-dir=${task.getWorkDirStr()}" + } + + private void updateStatus(String jobId, String queueName) { + if( task instanceof TaskArrayRun ) { + // update status for children tasks + for( int i=0; i classicSubmitCli() { - // the cmd list to launch it - final opts = getAwsOptions() - final cmd = opts.s5cmdPath ? s5Cmd(opts) : s3Cmd(opts) - return ['bash','-o','pipefail','-c', cmd.toString()] - } - - protected String s3Cmd(AwsOptions opts) { - final cli = opts.getAwsCli() - final debug = opts.debug ? ' --debug' : '' - final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : '' - final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : '' - final aws = "$cli s3 cp --only-show-errors${sse}${kms}${debug}" - final cmd = "trap \"{ ret=\$?; $aws ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; $aws s3:/${getWrapperFile()} - | bash 2>&1 | tee ${TaskRun.CMD_LOG}" - return cmd - } - - protected String s5Cmd(AwsOptions opts) { - final cli = opts.getS5cmdPath() - final sse = opts.storageEncryption ? " --sse $opts.storageEncryption" : '' - final kms = opts.storageKmsKeyId ? " --sse-kms-key-id $opts.storageKmsKeyId" : '' - final cmd = "trap \"{ ret=\$?; $cli cp${sse}${kms} ${TaskRun.CMD_LOG} s3:/${getLogFile()}||true; exit \$ret; }\" EXIT; $cli cat s3:/${getWrapperFile()} | bash 2>&1 | tee ${TaskRun.CMD_LOG}" - return cmd + return executor.getLaunchCommand(task.getWorkDirStr()) } protected List getSubmitCommand() { @@ -776,6 +780,16 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler 10_000 ) + throw new IllegalArgumentException("Job arrays on AWS Batch may not have more than 10,000 tasks") + + result.setArrayProperties(new ArrayProperties().withSize(arraySize)) + } + return result } diff --git a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy index 2ba564e1d1..10cde9f3c4 100644 --- a/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/cloud/aws/batch/AwsBatchTaskHandlerTest.groovy @@ -16,7 +16,6 @@ package nextflow.cloud.aws.batch -import java.nio.file.Paths import java.time.Instant import com.amazonaws.services.batch.AWSBatch @@ -34,7 +33,6 @@ import com.amazonaws.services.batch.model.RegisterJobDefinitionResult import com.amazonaws.services.batch.model.RetryStrategy import com.amazonaws.services.batch.model.SubmitJobRequest import com.amazonaws.services.batch.model.SubmitJobResult -import com.amazonaws.services.batch.model.TerminateJobRequest import nextflow.BuildInfo import nextflow.Session import nextflow.cloud.aws.config.AwsConfig @@ -184,14 +182,18 @@ class AwsBatchTaskHandlerTest extends Specification { def task = Mock(TaskRun) task.getName() >> 'batch-task' task.getConfig() >> new TaskConfig(memory: '2GB', cpus: 4, accelerator: 2) - - def handler = Spy(AwsBatchTaskHandler) + task.getWorkDirStr() >> 's3://my-bucket/work/dir' + and: + def executor = Spy(AwsBatchExecutor) { getAwsOptions()>> new AwsOptions() } + and: + def handler = Spy(new AwsBatchTaskHandler(executor: executor)) when: def req = handler.newSubmitRequest(task) then: handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'],region: 'eu-west-1')) } and: + _ * handler.getTask() >> task _ * handler.fusionEnabled() >> false 1 * handler.maxSpotAttempts() >> 0 1 * handler.getJobQueue(task) >> 'queue1' @@ -210,15 +212,21 @@ class AwsBatchTaskHandlerTest extends Specification { given: def task = Mock(TaskRun) - def handler = Spy(AwsBatchTaskHandler) + task.getWorkDirStr() >> 's3://my-bucket/work/dir' + and: + def executor = Spy(AwsBatchExecutor) { + getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'])) } + } + and: + def handler = Spy(new AwsBatchTaskHandler(executor: executor)) when: def req = handler.newSubmitRequest(task) then: task.getName() >> 'batch-task' task.getConfig() >> new TaskConfig() - handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'])) } and: + _ * handler.getTask() >> task _ * handler.fusionEnabled() >> false 1 * handler.maxSpotAttempts() >> 0 1 * handler.getJobQueue(task) >> 'queue1' @@ -234,8 +242,8 @@ class AwsBatchTaskHandlerTest extends Specification { then: task.getName() >> 'batch-task' task.getConfig() >> new TaskConfig(time: '5 sec') - handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'])) } and: + _ * handler.getTask() >> task _ * handler.fusionEnabled() >> false 1 * handler.maxSpotAttempts() >> 0 1 * handler.getJobQueue(task) >> 'queue2' @@ -253,8 +261,8 @@ class AwsBatchTaskHandlerTest extends Specification { then: task.getName() >> 'batch-task' task.getConfig() >> new TaskConfig(time: '1 hour') - handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch:[cliPath: '/bin/aws'])) } and: + _ * handler.getTask() >> task _ * handler.fusionEnabled() >> false 1 * handler.maxSpotAttempts() >> 0 1 * handler.getJobQueue(task) >> 'queue3' @@ -286,7 +294,7 @@ class AwsBatchTaskHandlerTest extends Specification { handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch: [cliPath: '/bin/aws', retryMode: 'adaptive', maxTransferAttempts: 10])) } and: _ * handler.fusionEnabled() >> false - 1 * handler.getSubmitCommand() >> ['bash','-c','foo'] + 1 * handler.getSubmitCommand() >> ['bash', '-c', 'foo'] 1 * handler.maxSpotAttempts() >> 3 1 * handler.getJobQueue(task) >> 'queue1' 1 * handler.getJobDefinition(task) >> 'job-def:1' @@ -852,7 +860,6 @@ class AwsBatchTaskHandlerTest extends Specification { when: handler.submit() then: - 1 * handler.buildTaskWrapper() >> null 1 * handler.newSubmitRequest(task) >> req 1 * handler.bypassProxy(proxy) >> client 1 * client.submitJob(req) >> resp @@ -865,21 +872,35 @@ class AwsBatchTaskHandlerTest extends Specification { def 'should kill a job' () { given: - def JOB_ID = '54321' + def executor = Mock(AwsBatchExecutor) def task = Mock(TaskRun) def handler = Spy(AwsBatchTaskHandler) + handler.@executor = executor handler.task = task - handler.@jobId = JOB_ID - def req = Mock(TerminateJobRequest) - req.getJobId() >> JOB_ID - req.getReason() >> 'Job killed by NF' + when: + handler.@jobId = 'job1' + handler.kill() + then: + 1 * executor.shouldDeleteJob('job1') >> true + and: + 1 * handler.terminateJob('job1') >> null when: + handler.@jobId = 'job1:task2' handler.kill() then: - 1 * handler.terminateJob(req) >> null + 1 * executor.shouldDeleteJob('job1') >> true + and: + 1 * handler.terminateJob('job1') >> null + when: + handler.@jobId = 'job1:task2' + handler.kill() + then: + 1 * executor.shouldDeleteJob('job1') >> false + and: + 0 * handler.terminateJob('job1') >> null } def 'should create the trace record' () { @@ -914,62 +935,60 @@ class AwsBatchTaskHandlerTest extends Specification { def 'should render submit command' () { given: - def handler = Spy(AwsBatchTaskHandler) { + def executor = Spy(AwsBatchExecutor) + and: + def handler = Spy(new AwsBatchTaskHandler(executor: executor)) { fusionEnabled() >> false + getTask() >> Mock(TaskRun) { getWorkDirStr()>> 's3://work'} } when: def result = handler.getSubmitCommand() then: - handler.getAwsOptions() >> Mock(AwsOptions) { getAwsCli() >> 'aws' } - handler.getLogFile() >> Paths.get('/work/log') - handler.getWrapperFile() >> Paths.get('/work/run') + executor.getAwsOptions()>> Mock(AwsOptions) { getAwsCli() >> 'aws' } then: - result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://work/log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://work/run - | bash 2>&1 | tee .command.log' + result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://work/.command.run - | bash 2>&1 | tee .command.log' when: result = handler.getSubmitCommand() then: - handler.getAwsOptions() >> Mock(AwsOptions) { + executor.getAwsOptions() >> Mock(AwsOptions) { getAwsCli() >> 'aws'; getDebug() >> true getStorageEncryption() >> 'aws:kms' getStorageKmsKeyId() >> 'kms-key-123' } - handler.getLogFile() >> Paths.get('/work/log') - handler.getWrapperFile() >> Paths.get('/work/run') then: - result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug .command.log s3://work/log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug s3://work/run - | bash 2>&1 | tee .command.log' + result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug .command.log s3://work/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors --sse aws:kms --sse-kms-key-id kms-key-123 --debug s3://work/.command.run - | bash 2>&1 | tee .command.log' } def 'should render submit command with s5cmd' () { given: - def handler = Spy(AwsBatchTaskHandler) { + def executor = Spy(AwsBatchExecutor) + and: + def handler = Spy(new AwsBatchTaskHandler(executor: executor)) { fusionEnabled() >> false + getTask() >> Mock(TaskRun) { getWorkDirStr()>> 's3://work'} } when: def result = handler.getSubmitCommand() then: - handler.getAwsOptions() >> Mock(AwsOptions) { getS5cmdPath() >> 's5cmd' } - handler.getLogFile() >> Paths.get('/work/log') - handler.getWrapperFile() >> Paths.get('/work/run') + executor.getAwsOptions() >> Mock(AwsOptions) { getS5cmdPath() >> 's5cmd' } then: - result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; s5cmd cp .command.log s3://work/log||true; exit $ret; }" EXIT; s5cmd cat s3://work/run | bash 2>&1 | tee .command.log' + result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; s5cmd cp .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd cat s3://work/.command.run | bash 2>&1 | tee .command.log' when: result = handler.getSubmitCommand() then: - handler.getAwsOptions() >> Mock(AwsOptions) { + executor.getAwsOptions() >> Mock(AwsOptions) { getS5cmdPath() >> 's5cmd --debug' getStorageEncryption() >> 'aws:kms' getStorageKmsKeyId() >> 'kms-key-123' } - handler.getLogFile() >> Paths.get('/work/log') - handler.getWrapperFile() >> Paths.get('/work/run') then: - result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; s5cmd --debug cp --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/log||true; exit $ret; }" EXIT; s5cmd --debug cat s3://work/run | bash 2>&1 | tee .command.log' + result.join(' ') == 'bash -o pipefail -c trap "{ ret=$?; s5cmd --debug cp --sse aws:kms --sse-kms-key-id kms-key-123 .command.log s3://work/.command.log||true; exit $ret; }" EXIT; s5cmd --debug cat s3://work/.command.run | bash 2>&1 | tee .command.log' } @@ -987,7 +1006,7 @@ class AwsBatchTaskHandlerTest extends Specification { when: def req = handler.newSubmitRequest(task) then: - 1 * handler.getSubmitCommand() >> ['sh','-c','hello'] + 1 * handler.getSubmitCommand() >> ['sh', '-c', 'hello'] 1 * handler.maxSpotAttempts() >> 5 1 * handler.getAwsOptions() >> { new AwsOptions(awsConfig: new AwsConfig(batch: [cliPath: '/bin/aws'])) } 1 * handler.getJobQueue(task) >> 'queue1' @@ -1066,4 +1085,19 @@ class AwsBatchTaskHandlerTest extends Specification { 16 | 100000 | 106496 16 | 200000 | 122880 } + + @Unroll + def 'should normalise job id' () { + given: + def handler = Spy(AwsBatchTaskHandler) + + expect: + handler.normaliseJobId(JOB_ID) == EXPECTED + + where: + JOB_ID | EXPECTED + null | null + 'job1' | 'job1' + 'job1:task2' | 'job1' + } } diff --git a/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy b/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy index af11ae542a..5dbef4439e 100644 --- a/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy +++ b/plugins/nf-amazon/src/test/nextflow/executor/AwsBatchExecutorTest.groovy @@ -7,11 +7,17 @@ package nextflow.executor +import java.nio.file.Path + import nextflow.Session import nextflow.SysEnv import nextflow.cloud.aws.batch.AwsBatchExecutor +import nextflow.cloud.aws.batch.AwsOptions +import nextflow.cloud.aws.util.S3PathFactory +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun import spock.lang.Specification - +import spock.lang.Unroll /** * * @author Paolo Di Tommaso @@ -43,4 +49,89 @@ class AwsBatchExecutorTest extends Specification { } + def 'should validate shouldDeleteJob method' () { + given: + def executor = Spy(AwsBatchExecutor) + + expect: + executor.shouldDeleteJob('job-1') + executor.shouldDeleteJob('job-2') + executor.shouldDeleteJob('job-3') + and: + !executor.shouldDeleteJob('job-1') + !executor.shouldDeleteJob('job-1') + !executor.shouldDeleteJob('job-2') + !executor.shouldDeleteJob('job-2') + !executor.shouldDeleteJob('job-3') + !executor.shouldDeleteJob('job-3') + } + + def 'should get array index variable and start' () { + given: + def executor = Spy(AwsBatchExecutor) + expect: + executor.getArrayIndexName() == 'AWS_BATCH_JOB_ARRAY_INDEX' + executor.getArrayIndexStart() == 0 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(AwsBatchExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo' | 1 | 'foo:1' + 'bar' | 2 | 'bar:2' + } + + protected Path s3(String path) { + S3PathFactory.parse('s3:/' + path) + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(AwsBatchExecutor) { + isFusionEnabled()>>FUSION + isWorkDirDefaultFS()>>DEFAULT_FS + } + and: + def handler = Mock(TaskHandler) { + getTask() >> Mock(TaskRun) { getWorkDir() >> WORK_DIR } + } + expect: + executor.getArrayWorkDir(handler) == EXPECTED + + where: + FUSION | DEFAULT_FS | WORK_DIR | EXPECTED + false | false | s3('/foo/work/dir') | 's3://foo/work/dir' + true | false | s3('/foo/work/dir') | '/fusion/s3/foo/work/dir' + false | true | Path.of('/nfs/work') | '/nfs/work' + } + + def 'should get array launch command' (){ + given: + def executor = Spy(AwsBatchExecutor) { + isFusionEnabled()>>FUSION + isWorkDirDefaultFS()>>DEFAULT_FS + getAwsOptions() >> Mock(AwsOptions) { + getS5cmdPath() >> { S5CMD ? 's5cmd' : null } + getAwsCli() >> { 'aws' } + } + } + expect: + executor.getArrayLaunchCommand(TASK_DIR) == EXPECTED + + where: + FUSION | DEFAULT_FS | S5CMD | TASK_DIR | EXPECTED + false | false | false | 's3://foo/work/dir' | 'bash -o pipefail -c \'trap "{ ret=$?; aws s3 cp --only-show-errors .command.log s3://foo/work/dir/.command.log||true; exit $ret; }" EXIT; aws s3 cp --only-show-errors s3://foo/work/dir/.command.run - | bash 2>&1 | tee .command.log\'' + false | false | true | 's3://foo/work/dir' | 'bash -o pipefail -c \'trap "{ ret=$?; s5cmd cp .command.log s3://foo/work/dir/.command.log||true; exit $ret; }" EXIT; s5cmd cat s3://foo/work/dir/.command.run | bash 2>&1 | tee .command.log\'' + and: + true | false | false | '/fusion/work/dir' | 'bash /fusion/work/dir/.command.run' + false | true | false | '/nfs/work/dir' | 'bash /nfs/work/dir/.command.run 2>&1 > /nfs/work/dir/.command.log' + } + } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchExecutor.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchExecutor.groovy index 8ce631f00d..17a06ae563 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchExecutor.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchExecutor.groovy @@ -17,16 +17,20 @@ package nextflow.cloud.google.batch +import static nextflow.cloud.google.batch.GoogleBatchScriptLauncher.* + import java.nio.file.Path +import com.google.cloud.storage.contrib.nio.CloudStoragePath import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import nextflow.SysEnv -import nextflow.cloud.google.batch.client.BatchConfig import nextflow.cloud.google.batch.client.BatchClient +import nextflow.cloud.google.batch.client.BatchConfig import nextflow.cloud.google.batch.logging.BatchLogging import nextflow.exception.AbortOperationException import nextflow.executor.Executor +import nextflow.executor.TaskArrayExecutor import nextflow.extension.FilesEx import nextflow.fusion.FusionHelper import nextflow.processor.TaskHandler @@ -34,6 +38,7 @@ import nextflow.processor.TaskMonitor import nextflow.processor.TaskPollingMonitor import nextflow.processor.TaskRun import nextflow.util.Duration +import nextflow.util.Escape import nextflow.util.ServiceName import org.pf4j.ExtensionPoint /** @@ -44,13 +49,15 @@ import org.pf4j.ExtensionPoint @Slf4j @ServiceName(value='google-batch') @CompileStatic -class GoogleBatchExecutor extends Executor implements ExtensionPoint { +class GoogleBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExecutor { private BatchClient client private BatchConfig config private Path remoteBinDir private BatchLogging logging + private final Set deletedJobs = new HashSet<>() + BatchClient getClient() { return client } BatchConfig getConfig() { return config } Path getRemoteBinDir() { return remoteBinDir } @@ -129,4 +136,50 @@ class GoogleBatchExecutor extends Executor implements ExtensionPoint { boolean isCloudinfoEnabled() { return Boolean.parseBoolean(SysEnv.get('NXF_CLOUDINFO_ENABLED', 'true') ) } + + boolean shouldDeleteJob(String jobId) { + if( jobId in deletedJobs ) { + // if the job is already in the list it has been already deleted + return false + } + synchronized (deletedJobs) { + // add the job id to the set of deleted jobs, if it's a new id, the `add` method + // returns true therefore the job should be deleted + return deletedJobs.add(jobId) + } + } + + @Override + String getArrayIndexName() { + return 'BATCH_TASK_INDEX' + } + + @Override + int getArrayIndexStart() { + return 0 + } + + @Override + String getArrayTaskId(String jobId, int index) { + return index.toString() + } + + @Override + String getArrayWorkDir(TaskHandler handler) { + return isFusionEnabled() || isWorkDirDefaultFS() + ? TaskArrayExecutor.super.getArrayWorkDir(handler) + : containerMountPath(handler.task.workDir as CloudStoragePath) + } + + @Override + String getArrayLaunchCommand(String taskDir) { + if( isFusionEnabled() || isWorkDirDefaultFS() ) { + return TaskArrayExecutor.super.getArrayLaunchCommand(taskDir) + } + else { + final cmd = List.of('/bin/bash','-o','pipefail','-c', launchCommand(taskDir)) + return Escape.cli(cmd as String[]) + } + } + } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy index 87c75c157f..c54b92f615 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchScriptLauncher.groovy @@ -64,6 +64,12 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc bean.workDir = toContainerMount(bean.workDir) bean.targetDir = toContainerMount(bean.targetDir) + // add all children work dir + if( bean.arrayWorkDirs ) { + for( Path it : bean.arrayWorkDirs ) + toContainerMount(it) + } + // remap input files to container mounted paths for( Map.Entry entry : new HashMap<>(bean.inputFiles).entrySet() ) { bean.inputFiles.put( entry.key, toContainerMount(entry.value, true) ) @@ -102,7 +108,7 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc if( path instanceof CloudStoragePath ) { buckets.add(path.bucket()) pathTrie.add( (parent ? "/${path.bucket()}${path.parent}" : "/${path.bucket()}${path}").toString() ) - final containerMount = "$MOUNT_ROOT/${path.bucket()}${path}" + final containerMount = containerMountPath(path) log.trace "Path ${FilesEx.toUriString(path)} to container mount: $containerMount" return Paths.get(containerMount) } @@ -113,7 +119,7 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc @Override String runCommand() { - "trap \"{ cp ${TaskRun.CMD_LOG} ${workDirMount}/${TaskRun.CMD_LOG}; }\" ERR; /bin/bash ${workDirMount}/${TaskRun.CMD_RUN} 2>&1 | tee ${TaskRun.CMD_LOG}" + launchCommand(workDirMount) } @Override @@ -171,4 +177,11 @@ class GoogleBatchScriptLauncher extends BashWrapperBuilder implements GoogleBatc return this } + static String launchCommand( String workDir ) { + "trap \"{ cp ${TaskRun.CMD_LOG} ${workDir}/${TaskRun.CMD_LOG}; }\" ERR; /bin/bash ${workDir}/${TaskRun.CMD_RUN} 2>&1 | tee ${TaskRun.CMD_LOG}" + } + + static String containerMountPath(CloudStoragePath path) { + return "$MOUNT_ROOT/${path.bucket()}${path}" + } } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy index 2782a89752..fdaee0a74f 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/GoogleBatchTaskHandler.groovy @@ -42,6 +42,7 @@ import nextflow.executor.BashWrapperBuilder import nextflow.executor.res.DiskResource import nextflow.fusion.FusionAwareTask import nextflow.fusion.FusionScriptLauncher +import nextflow.processor.TaskArrayRun import nextflow.processor.TaskConfig import nextflow.processor.TaskHandler import nextflow.processor.TaskProcessor @@ -67,20 +68,27 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { private BatchClient client + private BashWrapperBuilder launcher + /** * Job Id assigned by Nextflow */ private String jobId + /** + * Task id assigned by Google Batch service + */ + private String taskId + /** * Job unique id assigned by Google Batch service */ private String uid /** - * Job state assigned by Google Batch service + * Task state assigned by Google Batch service */ - private String jobState + private String taskState private volatile CloudMachineInfo machineInfo @@ -143,24 +151,41 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { } @Override - void submit() { - /* - * create the task runner script - */ - final launcher = createTaskWrapper() + void prepareLauncher() { + launcher = createTaskWrapper() launcher.build() + } + @Override + void submit() { /* * create submit request */ final req = newSubmitRequest(task, spec0(launcher)) log.trace "[GOOGLE BATCH] new job request > $req" final resp = client.submitJob(jobId, req) - this.uid = resp.getUid() - this.status = TaskStatus.SUBMITTED + final uid = resp.getUid() + updateStatus(jobId, '0', uid) log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` submitted > job=$jobId; uid=$uid; work-dir=${task.getWorkDirStr()}" } + private void updateStatus(String jobId, String taskId, String uid) { + if( task instanceof TaskArrayRun ) { + // update status for children + for( int i=0; i= 1_000) { - final status = client.getJobStatus(jobId) + if( !taskState || delta >= 1_000) { + final status = client.getTaskStatus(jobId, taskId) final newState = status?.state as String if( newState ) { - log.trace "[GOOGLE BATCH] Get job=$jobId state=$newState" - jobState = newState + log.trace "[GOOGLE BATCH] Get job=$jobId task=$taskId state=$newState" + taskState = newState timestamp = now } - if( newState == 'SCHEDULED' ) { + if( newState == 'PENDING' ) { final eventsCount = status.getStatusEventsCount() final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null if( lastEvent?.getDescription()?.contains('CODE_GCE_QUOTA_EXCEEDED') ) log.warn1 "Batch job cannot be run: ${lastEvent.getDescription()}" } } - return jobState + return taskState } - static private List RUNNING_AND_TERMINATED = ['RUNNING', 'SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS'] + static private final List RUNNING_OR_COMPLETED = ['RUNNING', 'SUCCEEDED', 'FAILED'] - static private List TERMINATED = ['SUCCEEDED', 'FAILED', 'DELETION_IN_PROGRESS'] + static private final List COMPLETED = ['SUCCEEDED', 'FAILED'] @Override boolean checkIfRunning() { if(isSubmitted()) { // include `terminated` state to allow the handler status to progress - if (getJobState() in RUNNING_AND_TERMINATED) { + if( getTaskState() in RUNNING_OR_COMPLETED ) { status = TaskStatus.RUNNING return true } @@ -441,16 +476,16 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { @Override boolean checkIfCompleted() { - final state = getJobState() - if( state in TERMINATED ) { - log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - terminated job=$jobId; state=$state" + final state = getTaskState() + if( state in COMPLETED ) { + log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - terminated job=$jobId; task=$taskId; state=$state" // finalize the task task.exitStatus = getJobExitCode() if( task.exitStatus == null ) task.exitStatus = readExitFile() if( state == 'FAILED' ) { - task.stdout = executor.logging.stdout(uid) ?: outputFile - task.stderr = executor.logging.stderr(uid) ?: errorFile + task.stdout = executor.logging.stdout(uid, taskId) ?: outputFile + task.stderr = executor.logging.stderr(uid, taskId) ?: errorFile } else { task.stdout = outputFile @@ -465,7 +500,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { protected Integer getJobExitCode() { try { - final status = client.getJobStatus(jobId) + final status = client.getTaskStatus(jobId, taskId) final eventsCount = status.getStatusEventsCount() final lastEvent = eventsCount > 0 ? status.getStatusEvents(eventsCount - 1) : null log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - last event: ${lastEvent}" @@ -496,7 +531,8 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { void kill() { if( isActive() ) { log.trace "[GOOGLE BATCH] Process `${task.lazyName()}` - deleting job name=$jobId" - client.deleteJob(jobId) + if( executor.shouldDeleteJob(jobId) ) + client.deleteJob(jobId) } else { log.debug "[GOOGLE BATCH] Process `${task.lazyName()}` - invalid delete action" @@ -510,9 +546,8 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask { @Override TraceRecord getTraceRecord() { def result = super.getTraceRecord() - if( jobId && uid ) { - result.put('native_id', "$jobId/$uid") - } + if( jobId && uid ) + result.put('native_id', "$jobId/$taskId/$uid") result.machineInfo = getMachineInfo() return result } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy index 65cc354e82..2d9137a5f2 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/client/BatchClient.groovy @@ -28,9 +28,11 @@ import com.google.cloud.batch.v1.BatchServiceClient import com.google.cloud.batch.v1.BatchServiceSettings import com.google.cloud.batch.v1.Job import com.google.cloud.batch.v1.JobName -import com.google.cloud.batch.v1.JobStatus import com.google.cloud.batch.v1.LocationName +import com.google.cloud.batch.v1.Task import com.google.cloud.batch.v1.TaskGroupName +import com.google.cloud.batch.v1.TaskName +import com.google.cloud.batch.v1.TaskStatus import dev.failsafe.Failsafe import dev.failsafe.RetryPolicy import dev.failsafe.event.EventListener @@ -101,23 +103,27 @@ class BatchClient { return apply(()-> batchServiceClient.getJob(name)) } - Iterable listTasks(String jobId) { + Iterable listTasks(String jobId) { final parent = TaskGroupName.of(projectId, location, jobId, 'group0') return apply(()-> batchServiceClient.listTasks(parent).iterateAll()) } + Task describeTask(String jobId, String taskId) { + final name = TaskName.of(projectId, location, jobId, 'group0', taskId) + return batchServiceClient.getTask(name) + } + void deleteJob(String jobId) { final name = JobName.of(projectId, location, jobId).toString() apply(()-> batchServiceClient.deleteJobAsync(name)) } - JobStatus getJobStatus(String jobId) { - final job = describeJob(jobId) - return job.getStatus() + TaskStatus getTaskStatus(String jobId, String taskId) { + return describeTask(jobId, taskId).getStatus() } - String getJobState(String jobId) { - final status = getJobStatus(jobId) + String getTaskState(String jobId, String taskId) { + final status = getTaskStatus(jobId, taskId) return status ? status.getState().toString() : null } diff --git a/plugins/nf-google/src/main/nextflow/cloud/google/batch/logging/BatchLogging.groovy b/plugins/nf-google/src/main/nextflow/cloud/google/batch/logging/BatchLogging.groovy index da394dd39d..04091f8b51 100644 --- a/plugins/nf-google/src/main/nextflow/cloud/google/batch/logging/BatchLogging.groovy +++ b/plugins/nf-google/src/main/nextflow/cloud/google/batch/logging/BatchLogging.groovy @@ -47,30 +47,29 @@ class BatchLogging implements Closeable { this.opts = LoggingOptions .newBuilder() .setCredentials(creds) .setProjectId(this.projectId) .build() } - String stdout(String jobId) { - return safeLogs(jobId,0) + String stdout(String uid, String taskId) { + return safeLogs(uid, taskId, 0) } - String stderr(String jobId) { - return safeLogs(jobId,1) + String stderr(String uid, String taskId) { + return safeLogs(uid, taskId, 1) } - protected String safeLogs(String jobId, int index) { + protected String safeLogs(String uid, String taskId, int index) { try { - return fetchLogs(jobId)[index] + return fetchLogs(uid, taskId)[index] } catch (Exception e) { - log.warn("Cannot read logs for Batch job '$jobId' - cause: ${e.message}", e) + log.warn("Cannot read logs for Batch job '$uid/$taskId' - cause: ${e.message}", e) return null } } @Memoized(maxCacheSize = 1000) - @PackageScope List fetchLogs(String uid) { + @PackageScope List fetchLogs(String uid, String taskId) { final stdout = new StringBuilder() final stderr = new StringBuilder() - // use logging here - final filter = "resource.type=generic_task OR resource.type=\"batch.googleapis.com/Job\" AND logName=\"projects/${projectId}/logs/batch_task_logs\" AND labels.job_uid=$uid" + final filter = "resource.type=generic_task OR resource.type=\"batch.googleapis.com/Job\" AND logName=\"projects/${projectId}/logs/batch_task_logs\" AND labels.job_uid=$uid AND labels.task_id=$uid-group0-$taskId" final entries = loggingService().listLogEntries( Logging.EntryListOption.filter(filter), Logging.EntryListOption.pageSize(1000) ) diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchExecutorTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchExecutorTest.groovy index f1ab831680..ed05765ff8 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchExecutorTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchExecutorTest.groovy @@ -7,11 +7,15 @@ package nextflow.cloud.google.batch +import java.nio.file.Path + +import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem import nextflow.Session import nextflow.SysEnv +import nextflow.processor.TaskHandler +import nextflow.processor.TaskRun import spock.lang.Specification import spock.lang.Unroll - /** * * @author Paolo Di Tommaso @@ -62,4 +66,85 @@ class GoogleBatchExecutorTest extends Specification { [NXF_CLOUDINFO_ENABLED:'true'] | true [NXF_CLOUDINFO_ENABLED:'false'] | false } + + def 'should get array index variable and start' () { + given: + def executor = Spy(GoogleBatchExecutor) + expect: + executor.getArrayIndexName() == 'BATCH_TASK_INDEX' + executor.getArrayIndexStart() == 0 + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(GoogleBatchExecutor) + expect: + executor.getArrayTaskId(JOB_ID, TASK_INDEX) == EXPECTED + + where: + JOB_ID | TASK_INDEX | EXPECTED + 'foo' | 1 | '1' + 'bar' | 2 | '2' + } + + protected Path gs(String str) { + def b = str.tokenize('/')[0] + def p = str.tokenize('/')[1..-1].join('/') + CloudStorageFileSystem.forBucket(b).getPath('/'+p) + } + + @Unroll + def 'should get array task id' () { + given: + def executor = Spy(GoogleBatchExecutor) { + isFusionEnabled()>>FUSION + isWorkDirDefaultFS()>>DEFAULT_FS + } + and: + def handler = Mock(TaskHandler) { + getTask() >> Mock(TaskRun) { getWorkDir() >> WORK_DIR } + } + expect: + executor.getArrayWorkDir(handler) == EXPECTED + + where: + FUSION | DEFAULT_FS | WORK_DIR | EXPECTED + false | false | gs('/foo/work/dir') | '/mnt/disks/foo/work/dir' + true | false | gs('/foo/work/dir') | '/fusion/gs/foo/work/dir' + false | true | Path.of('/nfs/work') | '/nfs/work' + } + + def 'should get array launch command' (){ + given: + def executor = Spy(GoogleBatchExecutor) { + isFusionEnabled()>>FUSION + isWorkDirDefaultFS()>>DEFAULT_FS + } + expect: + executor.getArrayLaunchCommand(TASK_DIR) == EXPECTED + + where: + FUSION | DEFAULT_FS | TASK_DIR | EXPECTED + false | false | 'gs://foo/work/dir' | '/bin/bash -o pipefail -c \'trap "{ cp .command.log gs://foo/work/dir/.command.log; }" ERR; /bin/bash gs://foo/work/dir/.command.run 2>&1 | tee .command.log\'' + true | false | '/fusion/work/dir' | 'bash /fusion/work/dir/.command.run' + false | true | '/nfs/work/dir' | 'bash /nfs/work/dir/.command.run 2>&1 > /nfs/work/dir/.command.log' + } + + def 'should validate shouldDeleteJob method' () { + given: + def executor = Spy(GoogleBatchExecutor) + + expect: + executor.shouldDeleteJob('job-1') + executor.shouldDeleteJob('job-2') + executor.shouldDeleteJob('job-3') + and: + !executor.shouldDeleteJob('job-1') + !executor.shouldDeleteJob('job-1') + !executor.shouldDeleteJob('job-2') + !executor.shouldDeleteJob('job-2') + !executor.shouldDeleteJob('job-3') + !executor.shouldDeleteJob('job-3') + } } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy index 0f9b4ca851..005a91d4ca 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/GoogleBatchTaskHandlerTest.groovy @@ -20,8 +20,8 @@ package nextflow.cloud.google.batch import java.nio.file.Path import com.google.cloud.batch.v1.GCS -import com.google.cloud.batch.v1.JobStatus import com.google.cloud.batch.v1.StatusEvent +import com.google.cloud.batch.v1.TaskStatus import com.google.cloud.batch.v1.Volume import com.google.cloud.storage.contrib.nio.CloudStorageFileSystem import nextflow.Session @@ -350,6 +350,7 @@ class GoogleBatchTaskHandlerTest extends Specification { def handler = Spy(GoogleBatchTaskHandler) handler.task = task handler.@jobId = 'xyz-123' + handler.@taskId = '0' handler.@uid = '789' when: @@ -357,7 +358,7 @@ class GoogleBatchTaskHandlerTest extends Specification { then: handler.isCompleted() >> false and: - trace.native_id == 'xyz-123/789' + trace.native_id == 'xyz-123/0/789' trace.executorName == 'google-batch' } @@ -454,8 +455,8 @@ class GoogleBatchTaskHandlerTest extends Specification { } - JobStatus makeJobStatus(String desc) { - JobStatus.newBuilder() + TaskStatus makeTaskStatus(String desc) { + TaskStatus.newBuilder() .addStatusEvents( StatusEvent.newBuilder() .setDescription(desc) @@ -466,16 +467,17 @@ class GoogleBatchTaskHandlerTest extends Specification { def 'should detect spot failures from status event'() { given: def jobId = 'job-id' + def taskId = 'task-id' def client = Mock(BatchClient) def task = Mock(TaskRun) { lazyName() >> 'foo (1)' } - def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, client: client, task: task)) + def handler = Spy(new GoogleBatchTaskHandler(jobId: jobId, taskId: taskId, client: client, task: task)) when: - client.getJobStatus(jobId) >>> [ - makeJobStatus('Task failed due to Spot VM preemption with exit code 50001.'), - makeJobStatus('Task succeeded') + client.getTaskStatus(jobId, taskId) >>> [ + makeTaskStatus('Task failed due to Spot VM preemption with exit code 50001.'), + makeTaskStatus('Task succeeded') ] then: handler.getJobExitCode() == 50001 @@ -537,4 +539,42 @@ class GoogleBatchTaskHandlerTest extends Specification { cleanup: SysEnv.pop() } + + def 'should kill a job' () { + given: + def client = Mock(BatchClient) + def executor = Mock(GoogleBatchExecutor) + def task = Mock(TaskRun) + def handler = Spy(GoogleBatchTaskHandler) + handler.@executor = executor + handler.@client = client + handler.task = task + + when: + handler.@jobId = 'job1' + handler.kill() + then: + handler.isActive() >> false + 0 * executor.shouldDeleteJob('job1') >> true + and: + 0 * client.deleteJob('job1') >> null + + when: + handler.@jobId = 'job1' + handler.kill() + then: + handler.isActive() >> true + 1 * executor.shouldDeleteJob('job1') >> true + and: + 1 * client.deleteJob('job1') >> null + + when: + handler.@jobId = 'job1' + handler.kill() + then: + handler.isActive() >> true + 1 * executor.shouldDeleteJob('job1') >> false + and: + 0 * client.deleteJob('job1') >> null + } } diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/batch/logging/BatchLoggingTest.groovy b/plugins/nf-google/src/test/nextflow/cloud/google/batch/logging/BatchLoggingTest.groovy index de496f0e90..625a5336a0 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/batch/logging/BatchLoggingTest.groovy +++ b/plugins/nf-google/src/test/nextflow/cloud/google/batch/logging/BatchLoggingTest.groovy @@ -130,16 +130,19 @@ class BatchLoggingTest extends Specification { when: def state=null do { - state = batchClient.getJobState(jobId) - log.debug "Test job state=$state" + if( batchClient.listTasks(jobId).iterator().hasNext() ) + state = batchClient.getTaskState(jobId, '0') + else + state = 'PENDING' + log.debug "Test task state=$state" sleep 10_000 } while( state !in ['SUCCEEDED', 'FAILED'] ) then: state in ['SUCCEEDED', 'FAILED'] when: - def stdout = logClient.stdout(uid) - def stderr = logClient.stderr(uid) + def stdout = logClient.stdout(uid, '0') + def stderr = logClient.stderr(uid, '0') log.debug "STDOUT: $stdout" log.debug "STDERR: $stderr" then: diff --git a/plugins/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt b/plugins/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt index ab322a6ef6..70a68452aa 100644 --- a/plugins/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt +++ b/plugins/nf-google/src/test/nextflow/cloud/google/lifesciences/bash-wrapper-gcp.txt @@ -1,5 +1,7 @@ #!/bin/bash -# NEXTFLOW TASK: Hello 1 +### --- +### name: 'Hello 1' +### ... set -e set -u NXF_DEBUG=${NXF_DEBUG:=0}; [[ $NXF_DEBUG > 1 ]] && set -x diff --git a/validation/awsbatch.sh b/validation/awsbatch.sh index 6b0298756b..d58727e7e8 100644 --- a/validation/awsbatch.sh +++ b/validation/awsbatch.sh @@ -60,3 +60,17 @@ $NXF_CMD run nextflow-io/rnaseq-nf \ -profile batch \ -plugins nf-cloudcache,nf-wave \ -c awsfargate.config + +## Test use of job array +NXF_CLOUDCACHE_PATH=s3://nextflow-ci/cache \ +$NXF_CMD run nextflow-io/hello \ + -process.array 10 \ + -plugins nf-cloudcache \ + -c awsbatch.config + +## Test use of job array using Fusion +$NXF_CMD run nextflow-io/hello \ + -process.array 10 \ + -with-wave \ + -with-fusion \ + -c awsbatch.config \ No newline at end of file diff --git a/validation/google.sh b/validation/google.sh index d97fedd4c1..8dcda8ab99 100644 --- a/validation/google.sh +++ b/validation/google.sh @@ -72,3 +72,16 @@ $NXF_CMD -C ./google.config \ -resume [[ `grep -c 'Using Nextflow cache factory: nextflow.cache.CloudCacheFactory' .nextflow.log` == 1 ]] || false [[ `grep -c 'Cached process > ' .nextflow.log` == 4 ]] || false + +## Test job array with Fusion +$NXF_CMD -C ./google.config \ + run nextflow-io/hello \ + -process.array 10 \ + -with-wave \ + -with-fusion + +## Test job array +$NXF_CMD -C ./google.config \ + run nextflow-io/hello \ + -process.array 10 +