Skip to content

Commit

Permalink
Add cpu-shares and memory limits to Azure Batch tasks (#5799)
Browse files Browse the repository at this point in the history
The Azure Batch task did not include the --cpu-shares or --memory options of the docker run command, meaning resource allocation was only controlled by slots on the node. This PR introduces the additional options to the container run command, including --cpu-shares and --memory. The behaviour will now be more similar to AWS Batch which includes a soft limit on CPUs and hard limit on memory. It also refactors the container options to use a StringBuilder class instead of string concatenation.


Signed-off-by: adamrtalbot <12817534+adamrtalbot@users.noreply.github.com>
Signed-off-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
Co-authored-by: Paolo Di Tommaso <paolo.ditommaso@gmail.com>
  • Loading branch information
adamrtalbot and pditommaso authored Feb 19, 2025
1 parent 976e002 commit f9c0cbf
Show file tree
Hide file tree
Showing 2 changed files with 67 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -459,17 +459,28 @@ class AzBatchService implements Closeable {
final pool = getPoolSpec(poolId)
if( !pool )
throw new IllegalStateException("Missing Azure Batch pool spec with id: $poolId")

// container settings
// mount host certificates otherwise `azcopy` fails
def opts = "-v /etc/ssl/certs:/etc/ssl/certs:ro -v /etc/pki:/etc/pki:ro "
// shared volume mounts
String opts = ""
// Add CPU and memory constraints if specified
if( task.config.getCpus() )
opts += "--cpu-shares ${task.config.getCpus() * 1024} "
if( task.config.getMemory() )
opts += "--memory ${task.config.getMemory().toMega()}m "

// Mount host certificates for azcopy
opts += "-v /etc/ssl/certs:/etc/ssl/certs:ro -v /etc/pki:/etc/pki:ro "

// Add any shared volume mounts
final shares = getShareVolumeMounts(pool)
if( shares )
opts += "${shares.join(' ')} "
// custom container settings
opts += shares.join(' ') + ' '

// Add custom container options
if( task.config.getContainerOptions() )
opts += "${task.config.getContainerOptions()} "
// fusion environment settings
opts += task.config.getContainerOptions() + ' '

// Handle Fusion settings
final fusionEnabled = FusionHelper.isFusionEnabled((Session)Global.session)
final launcher = fusionEnabled ? FusionScriptLauncher.create(task.toTaskBean(), 'az') : null
if( fusionEnabled ) {
Expand All @@ -478,9 +489,11 @@ class AzBatchService implements Closeable {
opts += "-e $it.key=$it.value "
}
}
// config overall container settings

// Create container settings
final containerOpts = new BatchTaskContainerSettings(container)
.setContainerRunOptions(opts)

// submit command line
final String cmd = fusionEnabled
? launcher.fusionSubmitCli(task).join(' ')
Expand All @@ -493,16 +506,13 @@ class AzBatchService implements Closeable {
constraints.setMaxWallClockTime( Duration.of(task.config.getTime().toMillis(), ChronoUnit.MILLIS) )

log.trace "[AZURE BATCH] Submitting task: $taskId, cpus=${task.config.getCpus()}, mem=${task.config.getMemory()?:'-'}, slots: $slots"

return new BatchTaskCreateContent(taskId, cmd)
.setUserIdentity(userIdentity(pool.opts.privileged, pool.opts.runAs, AutoUserScope.TASK))
.setContainerSettings(containerOpts)
.setResourceFiles(resourceFileUrls(task, sas))
.setOutputFiles(outputFileUrls(task, sas))
.setRequiredSlots(slots)
.setConstraints(constraints)


}

AzTaskKey runTask(String poolId, String jobId, TaskRun task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -633,6 +633,52 @@ class AzBatchServiceTest extends Specification {
result.containerSettings.containerRunOptions == '-v /etc/ssl/certs:/etc/ssl/certs:ro -v /etc/pki:/etc/pki:ro '
}

def 'should create task for submit with cpu and memory' () {
given:
def POOL_ID = 'my-pool'
def SAS = '123'

def CONFIG = [storage: [sasToken: SAS]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService azure = Spy(new AzBatchService(exec))
def session = Mock(Session) {
getConfig() >>[fusion:[enabled:false]]
statsEnabled >> true
}
Global.session = session
and:
def TASK = Mock(TaskRun) {
getHash() >> HashCode.fromInt(2)
getContainer() >> 'ubuntu:latest'
getConfig() >> Mock(TaskConfig) {
getTime() >> Duration.of('24 h')
getCpus() >> 4
getMemory() >> MemoryUnit.of('8 GB')
}

}
and:
def SPEC = new AzVmPoolSpec(poolId: POOL_ID, vmType: Mock(AzVmType), opts: new AzPoolOpts([:]))

when:
def result = azure.createTask(POOL_ID, 'salmon', TASK)
then:
1 * azure.getPoolSpec(POOL_ID) >> SPEC
1 * azure.computeSlots(TASK, SPEC) >> 4
1 * azure.resourceFileUrls(TASK, SAS) >> []
1 * azure.outputFileUrls(TASK, SAS) >> []
and:
result.id == 'nf-02000000'
result.requiredSlots == 4
and:
result.commandLine == "sh -c 'bash .command.run 2>&1 | tee .command.log'"
and:
result.containerSettings.imageName == 'ubuntu:latest'
result.containerSettings.containerRunOptions == '--cpu-shares 4096 --memory 8192m -v /etc/ssl/certs:/etc/ssl/certs:ro -v /etc/pki:/etc/pki:ro '
and:
Duration.of(result.constraints.maxWallClockTime.toMillis()) == TASK.config.time
}

def 'should create task for submit with extra options' () {
given:
def POOL_ID = 'my-pool'
Expand Down

0 comments on commit f9c0cbf

Please sign in to comment.