Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job arrays #3892

Merged
merged 113 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
Show all changes
113 commits
Select commit Hold shift + click to select a range
41ac662
Add initial array executor
bentsherman Apr 21, 2023
a0900ae
Add support for SLURM array jobs
bentsherman Apr 21, 2023
b5bad00
Fix failing test
bentsherman Apr 21, 2023
713e33e
Document array executor
bentsherman Apr 21, 2023
76de0d4
Use concurrent queues in array executor, add fallback for leftover tasks
bentsherman Apr 21, 2023
1ef005c
Move batching logic to array task polling monitor
bentsherman Apr 21, 2023
27728eb
Cache status checks in array task handler
bentsherman Apr 21, 2023
343b331
Finalize support for SLURM array jobs
bentsherman Apr 24, 2023
a70e6b4
Merge branch 'master' into 1477-job-array-executor [ci skip]
bentsherman Apr 25, 2023
e5ca280
Refactor array executor as process directive
bentsherman Apr 25, 2023
ed5089d
minor edits
bentsherman Apr 25, 2023
5b36129
Support array jobs for LSF, PBS, SGE
bentsherman Apr 25, 2023
d162948
Replace synchronized with atomics and locks
bentsherman Apr 26, 2023
3e38ec0
Merge branch 'master' into 1477-job-array-executor
pditommaso Apr 30, 2023
99e92d5
Add array job support to AWS Batch
bentsherman Apr 30, 2023
33502e7
Merge branch '1477-job-array-executor' of github.com:nextflow-io/next…
bentsherman Apr 30, 2023
75135d7
Fix bug in awsbatch + fusion + array job
bentsherman Apr 30, 2023
2ee42ae
Support grid + fusion + array jobs
bentsherman Apr 30, 2023
3d84594
Refactor ArrayTask* to TaskArray*
bentsherman May 1, 2023
fc78470
Reduce code duplication
bentsherman May 1, 2023
45a1c35
minor edits
bentsherman May 1, 2023
16af92f
Fix failing tests
bentsherman May 1, 2023
98cfb0e
Reduce code duplication
bentsherman May 2, 2023
f02de40
Reduce code duplication
bentsherman May 2, 2023
eac8099
Merge branch 'master' into 1477-job-array-executor
abhi18av May 2, 2023
603913f
minor edits
bentsherman May 2, 2023
fe3a3d7
minor edits
bentsherman May 2, 2023
aaaa35d
Use TaskRun subclass to submit array job in a generic manner
bentsherman May 2, 2023
6945450
Minor changes
bentsherman May 3, 2023
f95c7d0
Fix bugs in array submit script
bentsherman May 3, 2023
44f6825
Fix Fusion support
bentsherman May 3, 2023
12612a3
Replace array submitter with array handling logic in the task polling…
bentsherman May 3, 2023
b9dafe0
Fix task reporting in log observer
bentsherman May 3, 2023
5a9f6de
Merge branch 'master' into 1477-job-array-executor
abhi18av May 7, 2023
ffc6d31
Fix missing method error
bentsherman May 3, 2023
7ec397f
Add support for AWS SSE env variables
pditommaso May 24, 2023
324681b
Add support for google batch
bentsherman May 26, 2023
eb0750f
Merge branch 'master' into 1477-job-array-executor
bentsherman May 26, 2023
d3d0377
Fix static compilation errors
bentsherman May 26, 2023
acea09f
Delete entire AWS Batch array job on workflow termination
bentsherman May 26, 2023
91b61a5
Merge branch 'master' into 1477-job-array-executor
bentsherman May 29, 2023
d78daa8
Fix failing test
bentsherman May 29, 2023
75dc404
Fix issues with google batch array jobs
bentsherman May 31, 2023
7c7a7b3
Rename getSubmitCommand() to getLaunchCommand()
bentsherman May 31, 2023
52f3195
Fix SLURM+Fusion+array jobs, cleanup
bentsherman May 31, 2023
c5e8b76
Remove unrelated changes
bentsherman May 31, 2023
14b2455
Submit retried tasks directly
bentsherman May 31, 2023
0de3c84
Add unit tests
bentsherman Jun 1, 2023
55d7941
Update docs [ci fast]
bentsherman Jun 13, 2023
eeeb12d
Fix issues with LSF array jobs
bentsherman Jun 13, 2023
fd10361
Add array index name to env whitelist
bentsherman Jun 14, 2023
782dea2
Disable non-native container run for task array
bentsherman Jun 15, 2023
eb45069
Fix incorrect index offset in array job script
bentsherman Jun 21, 2023
50c300c
Change SGE array index start to 1
bentsherman Jun 22, 2023
c206a5c
minor edits
bentsherman Jun 23, 2023
d4ab783
Change SGE array index start to 1 (for real this time)
bentsherman Jul 13, 2023
786a0b3
Use Bolts.withLock
bentsherman Jul 27, 2023
68ea23f
Add array jobs to CRG executor
bentsherman Sep 15, 2023
28dbe47
Add array jobs to CRG executor
bentsherman Sep 15, 2023
6c7494a
Add array job to SGE parseJobId
bentsherman Sep 18, 2023
dfca759
Merge branch 'master' into 1477-job-array-executor
bentsherman Jan 24, 2024
1929773
Refactor "array job" -> "job array"
bentsherman Jan 24, 2024
fcdd6b7
Merge branch 'master' into 1477-job-array-executor
pditommaso Apr 21, 2024
50c8f6f
Fix intellij compiler errors [ci fast]
pditommaso Apr 21, 2024
c5a12ec
Apply suggestions from review
bentsherman Apr 22, 2024
9897a6c
Fix failing tests
bentsherman Apr 23, 2024
c15e5ec
Stability improvements
bentsherman Apr 23, 2024
ba176e0
Improve construction of job array
bentsherman Apr 24, 2024
584c961
Add process directives to wrapper script for Fusion
bentsherman Apr 24, 2024
7d8aa5c
Merge branch 'master' into 1477-job-array-executor
pditommaso Apr 26, 2024
49f7fbd
Prefer try/finally idiom to avoid closure [ci fast]
pditommaso Apr 26, 2024
99a1ec3
Apply suggestions from review
bentsherman Apr 26, 2024
bf04e3f
Fix bug with job array context
bentsherman Apr 29, 2024
67f76a1
Merge branch 'master' into 1477-job-array-executor
pditommaso May 1, 2024
16dfb25
Add integration tests
pditommaso May 1, 2024
0e88bf7
Restore private method access (#4961) [ci fast]
pditommaso May 1, 2024
11aeaae
Minor changes
pditommaso May 1, 2024
519c93a
Add aws batch integration test
pditommaso May 1, 2024
ade72e2
Revert re-use of NXF_CHDIR in job array script
bentsherman May 1, 2024
8c13ec3
Add unit tests for grid executors
bentsherman May 1, 2024
b7e829d
Add unit test for crg executor
bentsherman May 1, 2024
6f6e5a8
Fix TaskArrayRun access to private methods
pditommaso May 1, 2024
4bb0e98
Fix typ
pditommaso May 2, 2024
beddbbd
Add integration tests
pditommaso May 2, 2024
1713167
Use idiomatic name for task array dir var
pditommaso May 2, 2024
5137589
Merge branch 'master' into 1477-job-array-executor
pditommaso May 2, 2024
bf859bd
Use TaskArrayRun as return type
pditommaso May 2, 2024
a4518a3
Promote status update methods private
pditommaso May 2, 2024
56eb754
Minor change
pditommaso May 2, 2024
8afc6a9
Refactor task name in task meta comment
pditommaso May 2, 2024
b42ccbb
Force new test
pditommaso May 3, 2024
7e411fd
Disable job log when submitting job arrays to grid schedulers
bentsherman May 3, 2024
653064f
Fix collectFile saving to GCS with sort: false (#4965)
bentsherman May 2, 2024
ea1cc2c
Fix script error text alignment (#4681)
mahesh-panchal May 2, 2024
7367687
Use for instead eachLine in error formatting [ci fast]
pditommaso May 3, 2024
be536f8
Update aws.md to include Cluster access (#4951) [ci skip]
Kartstig May 3, 2024
d899ebc
Add Wave and Fusion info to workflow metadata (#4945)
marcodelapierre May 3, 2024
11d4a87
Strip auth secret from logs
pditommaso May 4, 2024
a9d5d86
[ci skip] empty
pditommaso May 5, 2024
174bdfb
Job array refactor (#4973)
pditommaso May 6, 2024
12656bc
Merge branch 'master' into 1477-job-array-executor
bentsherman May 6, 2024
a77cfd5
Move misplaced test
bentsherman May 3, 2024
4e41436
Strengthen aws batch deletion logi
pditommaso May 7, 2024
e8b701d
Strengthen google batch deletion logic
pditommaso May 7, 2024
3291f6c
Merge branch 'master' into 1477-job-array-executor
pditommaso May 7, 2024
b7ce0a1
minor edits
bentsherman May 7, 2024
3e29adc
Update google batch logging to select task logs
bentsherman May 7, 2024
247b721
Fix race condition in LogsCheckpoint
bentsherman May 7, 2024
e086674
Revert "Fix race condition in LogsCheckpoint"
pditommaso May 8, 2024
cbbbc46
Resolve conflicts
pditommaso May 8, 2024
6b8bb49
Restore grid handler names [ci fast]
pditommaso May 9, 2024
912f90d
Revert submit naming
pditommaso May 9, 2024
4b45aa0
Revert volatile change
pditommaso May 9, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 58 additions & 0 deletions docs/executor.rst
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,64 @@ In other words, you can write your pipeline script once and have it running on y
or the cloud — simply change the executor definition in the Nextflow configuration file.


.. _array-executor:

Array
=====

.. note:: This feature requires Nextflow version ``23.05.0-edge`` or later.

.. warning:: This feature is experimental and may change in a future release.

Many execution platforms support "array jobs", that is, a collection of jobs with the same resource requirements and script
parameterized by an index. An array job incurs significantly less scheduling overhead compared to submitting each task separately,
and they are a best practice in HPC environments. Nextflow supports array jobs through the Array executor.

The Array executor takes two config options: an array size (default: ``100``) and a "target" executor (default: ``'local'``).
These options can be specified in the Nextflow configuration as follows::

process {
executor = 'array'
}

executor {
$array {
arraySize = 100
target = 'slurm'
}
}

The target executor can be any Nextflow executor that supports array jobs, which currently includes the following:

* :ref:`local-executor`
* :ref:`slurm-executor`

The Array executor submits tasks in batches to the target executor, as soon as a full batch is ready. Batches are separated
by process. Any "leftover" tasks are submitted as a partial batch at the end. If any tasks in an array job fail and can be retried,
they will be retried in another array job without interfering with the tasks that succeeded. Because the order in which tasks are
executed varies across workflow runs, so too will the grouping of tasks into array jobs.

Aside from the batching, tasks in an array job are executed in the same way, i.e. each task is executed in its own work directory with
its own script. However, certain configuration properties must be the same for all tasks created by a process.

The following process directives msut be the same for all tasks when using array jobs, because they are specified once for the array job:

* :ref:`process-accelerator`
* :ref:`process-clusterOptions`
* :ref:`process-cpus`
* :ref:`process-disk`
* :ref:`process-machineType`
* :ref:`process-memory`
* :ref:`process-queue`
* :ref:`process-resourcelabels`
* :ref:`process-time`

For cloud-based executors like AWS Batch, the following additional directives must be uniform:

* :ref:`process-container`
* :ref:`process-containerOptions`


.. _awsbatch-executor:

AWS Batch
Expand Down
14 changes: 14 additions & 0 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,9 @@ class Session implements ISession {
return result
}

void registerObserver(TraceObserver observer) {
observers << observer
}

/*
* intercepts interruption signal i.e. CTRL+C
Expand Down Expand Up @@ -935,6 +938,17 @@ class Session implements ISession {
}
}

void notifyProcessClose(String process) {
observers.each { observer ->
try {
observer.onProcessClose(process)
}
catch( Exception e ) {
log.debug(e.getMessage(), e)
}
}
}

void notifyProcessTerminate(TaskProcessor process) {
for( int i=0; i<observers.size(); i++ ) {
final observer = observers.get(i)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.nio.file.Path
import groovy.transform.CompileStatic
import groovy.transform.PackageScope
import groovy.util.logging.Slf4j
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskPollingMonitor
import nextflow.processor.TaskProcessor
Expand All @@ -36,7 +37,7 @@ import org.apache.commons.lang.StringUtils
*/
@Slf4j
@CompileStatic
abstract class AbstractGridExecutor extends Executor {
abstract class AbstractGridExecutor extends Executor implements ArrayTaskAware {

protected Duration queueInterval

Expand Down Expand Up @@ -65,7 +66,7 @@ abstract class AbstractGridExecutor extends Executor {
/*
* Prepare and launch the task in the underlying execution platform
*/
GridTaskHandler createTaskHandler(TaskRun task) {
TaskHandler createTaskHandler(TaskRun task) {
assert task
assert task.workDir

Expand Down Expand Up @@ -407,5 +408,43 @@ abstract class AbstractGridExecutor extends Executor {
// Instead, it is the command wrapper script that is launched run within a container process.
return isFusionEnabled()
}

@Override
ArrayTaskHandler createArrayTaskHandler(List<TaskRun> array) {
final handlers = array.collect { task -> createTaskHandler(task) }
new ArrayGridTaskHandler(handlers, this)
}

String createArrayTaskWrapper(ArrayGridTaskHandler handler) {
final array = handler.array
final task = array.first().getTask()

final arrayHeader = getArrayDirective(array.size())
final taskHeaders = getHeaders(task)
final files = array
.collect { h -> ((GridTaskHandler)h).wrapperFile }
.join(' ')

final builder = new StringBuilder()
<< '#!/bin/bash\n'
<< "${headerToken} ${arrayHeader}\n"
<< taskHeaders
<< "declare -a array=( ${files} )\n"
<< "bash \${array[\$${arrayIndexName}]}\n"

return builder.toString()
}

protected String getArrayDirective(int arraySize) {
throw new UnsupportedOperationException("Executor '${name}' does not support array jobs")
}

protected String getArrayIndexName() {
throw new UnsupportedOperationException("Executor '${name}' does not support array jobs")
}

protected List<String> getArraySubmitCommandLine() {
throw new UnsupportedOperationException("Executor '${name}' does not support array jobs")
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/*
* Copyright 2013-2023, Seqera Labs
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package nextflow.executor

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.ConcurrentLinkedQueue

import groovy.transform.CompileStatic
import groovy.util.logging.Slf4j
import nextflow.processor.ArrayTaskPollingMonitor
import nextflow.processor.TaskHandler
import nextflow.processor.TaskMonitor
import nextflow.processor.TaskProcessor
import nextflow.processor.TaskRun
import nextflow.trace.TraceObserver
import nextflow.util.Duration
import org.codehaus.groovy.runtime.typehandling.GroovyCastException
/**
* Executor that submits tasks in batches to a target executor
* that supports array jobs.
*
* @author Ben Sherman <bentshermann@gmail.com>
*/
@Slf4j
@CompileStatic
class ArrayExecutor extends Executor implements TraceObserver {

private ArrayTaskAware target

private Integer arraySize

private Map<String,Queue<TaskRun>> queues = new ConcurrentHashMap<>()

private Map<String,Boolean> closed = new ConcurrentHashMap<>()

/**
* Initialize the executor class
*/
@Override
protected void register() {
super.register()

session.registerObserver(this)

final targetName = session.getExecConfigProp('array', 'target', 'local') as String
try {
target = (ArrayTaskAware)session.executorFactory.getExecutor(targetName, session)
}
catch( GroovyCastException e ) {
throw new IllegalArgumentException("Executor '${targetName}' does not support array jobs")
}

arraySize = session.getExecConfigProp('array', 'arraySize', 100) as Integer

log.debug "Creating 'array' executor > target executor: '${targetName}', array size: ${arraySize}"
}

@Override
TaskMonitor createTaskMonitor() {
return ArrayTaskPollingMonitor.create(session, name, 100, Duration.of('5 sec'))
}

/**
* Add submitted tasks to the queue, and schedule an array job when
* the queue reaches the desired size.
*
* @param task
*/
@Override
synchronized void submit( TaskRun task ) {
log.trace "Scheduling process: ${task}"

if( session.isTerminated() )
new IllegalStateException("Session terminated - Cannot add process to execution array: ${task}")

final process = task.processor.name

// submit task directly if process has already closed
if( closed[process] ) {
((Executor)target).submit(task)
return
}

// initialize process queue
if( process !in queues )
queues[process] = new ConcurrentLinkedQueue<>()

// add task to the process queue
final queue = queues[process]
queue.add(task)

// schedule array job when a batch is ready
if( queue.size() >= arraySize ) {
log.debug "[ARRAY] Submitting array job for process '${process}'"
submit0(queue, arraySize)
}
}

synchronized private void submit0( Queue<TaskRun> queue, int size ) {
def array = new ArrayList<TaskRun>()
def iter = queue.iterator()

for( int i : 1..size ) {
array << iter.next()
iter.remove()
}

monitor.schedule(target.createArrayTaskHandler(array))
}

@Override
TaskHandler createTaskHandler(TaskRun task) {
throw new UnsupportedOperationException()
}

/**
* Submit any remaining tasks as a partial batch when a process is closed.
*
* @param process
*/
@Override
void onProcessClose(String process) {
final queue = queues[process]

if( queue != null && queue.size() > 0 ) {
log.debug "[ARRAY] Submitting remainder array job for process '${process}'"
submit0(queue, queue.size())
}

closed[process] = true
}

}
Loading