Skip to content

Commit

Permalink
Add support for Job arrays (#3892)
Browse files Browse the repository at this point in the history
Job array is a capability provided by some  batch schedulers that allows spawning the execution 
of multiple copies of the same job in an efficient manner. 

Nextflow allows the use of this capability by setting the process directive `array <some value>` that 
determines  the (max) number of jobs in the array. For example 

```
process foo  {
  array 10 
  '''
  your_task
  '''
}
```

or  in the nextflow config file 


```
process.array = 10 
```

Currently this feature is supported by the following executors:
 
* Slurm
* Sge
* Pbs
* Pbs Pro
* LSF 
* AWS Batch
* Google Batch


Signed-off-by: Ben Sherman <bentshermann@gmail.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Signed-off-by: Mahesh Binzer-Panchal <mahesh.binzer-panchal@nbis.se>
Signed-off-by: Herman Singh <herman@massmatrix.bio>
Signed-off-by: Dr Marco Claudio De La Pierre <marco.delapierre@gmail.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Abhinav Sharma <abhi18av@users.noreply.github.com>
Co-authored-by: Mahesh Binzer-Panchal <mahesh.binzer-panchal@nbis.se>
Co-authored-by: Herman Singh <kartstig@gmail.com>
Co-authored-by: Dr Marco Claudio De La Pierre <marco.delapierre@gmail.com>
  • Loading branch information
6 people committed May 9, 2024
1 parent 8a9c858 commit f760542
Show file tree
Hide file tree
Showing 57 changed files with 2,068 additions and 280 deletions.
1 change: 1 addition & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ jobs:
if: ${{ !contains(github.event.head_commit.message, '[ci fast]') && needs.build.outputs.any_changed == 'true' }}
needs: build
runs-on: ubuntu-latest
timeout-minutes: 90
strategy:
fail-fast: false
matrix:
Expand Down
59 changes: 59 additions & 0 deletions docs/process.md
Original file line number Diff line number Diff line change
Expand Up @@ -1331,6 +1331,65 @@ Allowed values for the `arch` directive are as follows, grouped by equivalent fa

Examples of values for the architecture `target` option are `cascadelake`, `icelake`, `zen2` and `zen3`. See the Spack documentation for the full and up-to-date [list of meaningful targets](https://spack.readthedocs.io/en/latest/basic_usage.html#support-for-specific-microarchitectures).

(process-array)=

## array

:::{versionadded} 24.04.0
:::

:::{warning} *Experimental: may change in a future release.*
:::

The `array` directive allows you to submit tasks as *job arrays* for executors that support it.

A job array is a collection of jobs with the same resource requirements and the same script (parameterized by an index). Job arrays incur significantly less scheduling overhead compared to individual jobs, and as a result they are preferred by HPC schedulers where possible.

The directive should be specified with a given array size, along with an executor that supports job arrays. For example:

```groovy
process cpu_task {
executor 'slurm'
array 100
'''
your_command --here
'''
}
```

Nextflow currently supports job arrays for the following executors:

- {ref}`awsbatch-executor`
- {ref}`google-batch-executor`
- {ref}`lsf-executor`
- {ref}`pbs-executor`
- {ref}`pbspro-executor`
- {ref}`sge-executor`
- {ref}`slurm-executor`

A process using job arrays will collect tasks and submit each batch as a job array when it is ready. Any "leftover" tasks will be submitted as a partial job array.

Once a job array is submitted, each "child" task is executed as an independent job. Any tasks that fail (and can be retried) will be retried without interfering with the tasks that succeeded. Retried tasks are submitted individually rather than through a job array, in order to allow for the use of [dynamic resources](#dynamic-computing-resources).

The following directives must be uniform across all tasks in a process that uses job arrays, because these directives are specified once for the entire job array:

- {ref}`process-accelerator`
- {ref}`process-clusterOptions`
- {ref}`process-cpus`
- {ref}`process-disk`
- {ref}`process-machineType`
- {ref}`process-memory`
- {ref}`process-queue`
- {ref}`process-resourcelabels`
- {ref}`process-resourcelimits`
- {ref}`process-time`

For cloud-based executors like AWS Batch, or when using Fusion with any executor, the following additional directives must be uniform:

- {ref}`process-container`
- {ref}`process-containerOptions`

(process-beforescript)=

### beforeScript
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import nextflow.container.ContainerBuilder
import nextflow.container.DockerBuilder
import nextflow.container.SingularityBuilder
import nextflow.exception.ProcessException
import nextflow.extension.FilesEx
import nextflow.file.FileHelper
import nextflow.processor.TaskBean
import nextflow.processor.TaskProcessor
Expand Down Expand Up @@ -307,6 +308,7 @@ class BashWrapperBuilder {

final binding = new HashMap<String,String>(20)
binding.header_script = headerScript
binding.task_metadata = getTaskMetadata()
binding.task_name = name
binding.helpers_script = getHelpersScript()

Expand Down Expand Up @@ -461,6 +463,31 @@ class BashWrapperBuilder {
}
}

protected String getTaskMetadata() {
final lines = new StringBuilder()
lines << '### ---\n'
lines << "### name: '${bean.name}'\n"
if( bean.arrayIndexName ) {
lines << '### array:\n'
lines << "### index-name: ${bean.arrayIndexName}\n"
lines << "### index-start: ${bean.arrayIndexStart}\n"
lines << "### work-dirs:\n"
for( Path it : bean.arrayWorkDirs )
lines << "### - ${Escape.path(FilesEx.toUriString(it))}\n"
}

if( containerConfig?.isEnabled() )
lines << "### container: '${bean.containerImage}'\n"

if( outputFiles.size() > 0 ) {
lines << '### outputs:\n'
for( final output : bean.outputFiles )
lines << "### - '${output}'\n"
}

lines << '### ...\n'
}

protected String getHelpersScript() {
def result = new StringBuilder()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskRun
/**
* An executor specialised for CRG cluster
Expand All @@ -41,9 +42,18 @@ class CrgExecutor extends SgeExecutor {
task.config.penv = 'smp'
}

if( task instanceof TaskArrayRun ) {
final arraySize = task.getArraySize()
result << '-t' << "1-${arraySize}".toString()
}

result << '-N' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'y'

if( task !instanceof TaskArrayRun ) {
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'y'
}

result << '-terse' << '' // note: directive need to be returned as pairs

/*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ abstract class Executor {
*
* @param task A {@code TaskRun} instance
*/
final void submit( TaskRun task ) {
void submit( TaskRun task ) {
log.trace "Scheduling process: ${task}"

if( session.isTerminated() ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import nextflow.exception.ProcessNonZeroExitStatusException
import nextflow.file.FileHelper
import nextflow.fusion.FusionAwareTask
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskHandler
import nextflow.processor.TaskRun
import nextflow.trace.TraceRecord
Expand Down Expand Up @@ -100,6 +101,12 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
this.sanityCheckInterval = duration
}

@Override
void prepareLauncher() {
// -- create the wrapper script
createTaskWrapper(task).build()
}

protected ProcessBuilder createProcessBuilder() {

// -- log the qsub command
Expand Down Expand Up @@ -254,17 +261,15 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
void submit() {
ProcessBuilder builder = null
try {
// -- create the wrapper script
createTaskWrapper(task).build()
// -- start the execution and notify the event to the monitor
builder = createProcessBuilder()
// -- forward the job launcher script to the command stdin if required
final stdinScript = executor.pipeLauncherScript() ? stdinLauncherScript() : null
// -- execute with a re-triable strategy
final result = safeExecute( () -> processStart(builder, stdinScript) )
// -- save the JobId in the
this.jobId = executor.parseJobId(result)
this.status = SUBMITTED
// -- save the job id
final jobId = (String)executor.parseJobId(result)
updateStatus(jobId)
log.debug "[${executor.name.toUpperCase()}] submitted process ${task.name} > jobId: $jobId; workDir: ${task.workDir}"

}
Expand All @@ -281,9 +286,21 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
status = COMPLETED
throw new ProcessFailedException("Error submitting process '${task.name}' for execution", e )
}

}

private void updateStatus(String jobId) {
if( task instanceof TaskArrayRun ) {
for( int i=0; i<task.children.size(); i++ ) {
final handler = task.children[i] as GridTaskHandler
final arrayTaskId = ((TaskArrayExecutor)executor).getArrayTaskId(jobId, i)
handler.updateStatus(arrayTaskId)
}
}
else {
this.jobId = jobId
this.status = SUBMITTED
}
}

private long startedMillis

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import java.util.regex.Pattern
import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.fusion.FusionHelper
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskRun
/**
* Processor for LSF resource manager
Expand All @@ -36,7 +37,7 @@ import nextflow.processor.TaskRun
*/
@Slf4j
@CompileStatic
class LsfExecutor extends AbstractGridExecutor {
class LsfExecutor extends AbstractGridExecutor implements TaskArrayExecutor {

static private Pattern KEY_REGEX = ~/^[A-Z_0-9]+=.*/

Expand Down Expand Up @@ -68,7 +69,9 @@ class LsfExecutor extends AbstractGridExecutor {
*/
protected List<String> getDirectives(TaskRun task, List<String> result) {

result << '-o' << task.workDir.resolve(TaskRun.CMD_LOG).toString()
if( task !instanceof TaskArrayRun ) {
result << '-o' << task.workDir.resolve(TaskRun.CMD_LOG).toString()
}

// add other parameters (if any)
if( task.config.queue ) {
Expand Down Expand Up @@ -104,7 +107,13 @@ class LsfExecutor extends AbstractGridExecutor {
}

// -- the job name
result << '-J' << getJobNameFor(task)
if( task instanceof TaskArrayRun ) {
final arraySize = task.getArraySize()
result << '-J' << "${getJobNameFor(task)}[1-${arraySize}]".toString()
}
else {
result << '-J' << getJobNameFor(task)
}

// -- at the end append the command script wrapped file name
result.addAll( task.config.getClusterOptionsAsList() )
Expand Down Expand Up @@ -317,4 +326,21 @@ class LsfExecutor extends AbstractGridExecutor {
boolean isFusionEnabled() {
return FusionHelper.isFusionEnabled(session)
}

@Override
String getArrayIndexName() {
return 'LSB_JOBINDEX'
}

@Override
int getArrayIndexStart() {
return 1
}

@Override
String getArrayTaskId(String jobId, int index) {
assert jobId, "Missing 'jobId' argument"
return "${jobId}[${index + 1}]"
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.util.regex.Pattern

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskRun
/**
* Implements a executor for PBS/Torque cluster
Expand All @@ -29,7 +30,7 @@ import nextflow.processor.TaskRun
*/
@Slf4j
@CompileStatic
class PbsExecutor extends AbstractGridExecutor {
class PbsExecutor extends AbstractGridExecutor implements TaskArrayExecutor {

private static Pattern OPTS_REGEX = ~/(?:^|\s)-l.+/

Expand All @@ -43,9 +44,17 @@ class PbsExecutor extends AbstractGridExecutor {
protected List<String> getDirectives( TaskRun task, List<String> result ) {
assert result !=null

if( task instanceof TaskArrayRun ) {
final arraySize = task.getArraySize()
result << '-J' << "0-${arraySize - 1}".toString()
}

result << '-N' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'oe'

if( task !instanceof TaskArrayRun ) {
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'oe'
}

// the requested queue name
if( task.config.queue ) {
Expand Down Expand Up @@ -180,4 +189,21 @@ class PbsExecutor extends AbstractGridExecutor {
static protected boolean matchOptions(String value) {
value ? OPTS_REGEX.matcher(value).find() : null
}

@Override
String getArrayIndexName() {
return 'PBS_ARRAY_INDEX'
}

@Override
int getArrayIndexStart() {
return 0
}

@Override
String getArrayTaskId(String jobId, int index) {
assert jobId, "Missing 'jobId' argument"
return jobId.replace('[]', "[$index]")
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package nextflow.executor

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.TaskArrayRun
import nextflow.processor.TaskRun
/**
* Implements a executor for PBSPro cluster executor
Expand All @@ -44,7 +45,12 @@ class PbsProExecutor extends PbsExecutor {
@Override
protected List<String> getDirectives(TaskRun task, List<String> result ) {
assert result !=null


if( task instanceof TaskArrayRun ) {
final arraySize = task.getArraySize()
result << '-J' << "0-${arraySize - 1}".toString()
}

// when multiple competing directives are provided, only the first one will take effect
// therefore clusterOptions is added as first to give priority over other options as expected
// by the clusterOptions semantics -- see https://github.com/nextflow-io/nextflow/pull/2036
Expand All @@ -53,8 +59,11 @@ class PbsProExecutor extends PbsExecutor {
}

result << '-N' << getJobNameFor(task)
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'oe'

if( task !instanceof TaskArrayRun ) {
result << '-o' << quote(task.workDir.resolve(TaskRun.CMD_LOG))
result << '-j' << 'oe'
}

// the requested queue name
if( task.config.queue ) {
Expand Down
Loading

0 comments on commit f760542

Please sign in to comment.