Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support not using a container in Azure Batch #5536

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
5 changes: 5 additions & 0 deletions docs/reference/config.md
Original file line number Diff line number Diff line change
Expand Up @@ -395,6 +395,11 @@ The following settings are available:
`azure.batch.pools.<name>.vmType`
: Specify the virtual machine type used by the pool identified with `<name>`.

`azure.batch.requireContainer`
: :::{versionadded} 25.01.0-edge
:::
: Require the use of a container when running tasks (default: `true`). Note this requires an existing Azure Batch pool configured to not use a container image. Running a process without a container when the node pool requires one will raise an error on Azure Batch

`azure.managedIdentity.clientId`
: Specify the client ID for an Azure [managed identity](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). See {ref}`azure-managed-identities` for more details.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,8 +392,8 @@ class AzBatchService implements Closeable {
throw new IllegalArgumentException("Missing Azure Blob storage SAS token")

final container = task.getContainer()
if( !container )
throw new IllegalArgumentException("Missing container image for process: $task.name")
if( !container && config.batch().requireContainer )
throw new IllegalArgumentException("Missing container image for process: $task.name\nYou can disable this behaviour setting `azure.batch.requireContainer=false` in the nextflow config file")
final taskId = "nf-${task.hash.toString()}"
// get the pool config
final pool = getPoolSpec(poolId)
Expand All @@ -419,8 +419,11 @@ class AzBatchService implements Closeable {
}
}
// config overall container settings
final containerOpts = new BatchTaskContainerSettings(container)
BatchTaskContainerSettings containerOpts = null
if (container) {
containerOpts = new BatchTaskContainerSettings(container)
.setContainerRunOptions(opts)
}
// submit command line
final String cmd = fusionEnabled
? launcher.fusionSubmitCli(task).join(' ')
Expand All @@ -434,15 +437,18 @@ class AzBatchService implements Closeable {

log.trace "[AZURE BATCH] Submitting task: $taskId, cpus=${task.config.getCpus()}, mem=${task.config.getMemory()?:'-'}, slots: $slots"

return new BatchTaskCreateContent(taskId, cmd)
final batchTask = new BatchTaskCreateContent(taskId, cmd)
.setUserIdentity(userIdentity(pool.opts.privileged, pool.opts.runAs, AutoUserScope.TASK))
.setContainerSettings(containerOpts)
.setResourceFiles(resourceFileUrls(task, sas))
.setOutputFiles(outputFileUrls(task, sas))
.setRequiredSlots(slots)
.setConstraints(constraints)


if (containerOpts) {
batchTask.setContainerSettings(containerOpts)
}

return batchTask
}

AzTaskKey runTask(String poolId, String jobId, TaskRun task) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,9 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

void validateConfiguration() {
if (!task.container) {
throw new ProcessUnrecoverableException("No container image specified for process $task.name -- Either specify the container to use in the process definition or with 'process.container' value in your config")
println "AZURE_BATCH_CONFIG: ${executor.config.batch()}"
if (!task.container && executor.config.batch().requireContainer ) {
throw new ProcessUnrecoverableException("No container image specified for process $task.name -- Either specify the container to use in the process definition or with 'process.container' value in your config. You can disable this behaviour setting `azure.batch.requireContainer=false` in the nextflow config file")
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class AzBatchOpts implements CloudTransferOptions {
Boolean deleteJobsOnCompletion
Boolean deletePoolsOnCompletion
Boolean deleteTasksOnCompletion
Boolean requireContainer
CopyToolInstallMode copyToolInstallMode

Map<String,AzPoolOpts> pools
Expand All @@ -67,6 +68,7 @@ class AzBatchOpts implements CloudTransferOptions {
location = config.location
autoPoolMode = config.autoPoolMode
allowPoolCreation = config.allowPoolCreation
requireContainer = config.requireContainer == null ? true : config.requireContainer
terminateJobsOnCompletion = config.terminateJobsOnCompletion != Boolean.FALSE
deleteJobsOnCompletion = config.deleteJobsOnCompletion
deletePoolsOnCompletion = config.deletePoolsOnCompletion
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -739,4 +739,80 @@ class AzBatchServiceTest extends Specification {
[managedIdentity: [clientId: 'client-123']] | 'client-123'
}

def 'should create task for submit without container' () {
given:
Global.session = Mock(Session) { getConfig()>>[:] }
and:
def POOL_ID = 'my-pool'
def SAS = '123'
def CONFIG = [storage: [sasToken: SAS], batch: [requireContainer: false]]
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) }
AzBatchService azure = Spy(new AzBatchService(exec))
and:
def TASK = Mock(TaskRun) {
getHash() >> HashCode.fromInt(1)
getContainer() >> null
getConfig() >> Mock(TaskConfig)
}
and:
def SPEC = new AzVmPoolSpec(poolId: POOL_ID, vmType: Mock(AzVmType), opts: new AzPoolOpts([:]))

when:
def result = azure.createTask(POOL_ID, 'salmon', TASK)
then:
1 * azure.getPoolSpec(POOL_ID) >> SPEC
1 * azure.computeSlots(TASK, SPEC) >> 4
1 * azure.resourceFileUrls(TASK, SAS) >> []
1 * azure.outputFileUrls(TASK, SAS) >> []
and:
result.id == 'nf-01000000'
result.requiredSlots == 4
and:
result.commandLine == "sh -c 'bash .command.run 2>&1 | tee .command.log'"
and:
result.containerSettings == null
}

def 'should create task for submit with container and fusion' () {
given:
def SAS = '1234567890' * 10
def AZURE = [storage: [sasToken: SAS, accountName: 'my-account']]
Global.session = Mock(Session) { getConfig()>>[fusion:[enabled:true], azure: AZURE] }
def WORKDIR = FileSystemPathFactory.parse('az://foo/work/dir')
and:
def POOL_ID = 'my-pool'
def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(AZURE) }
AzBatchService azure = Spy(new AzBatchService(exec))
and:
def TASK = Mock(TaskRun) {
getHash() >> HashCode.fromInt(1)
getContainer() >> 'ubuntu:latest'
getConfig() >> Mock(TaskConfig)
getWorkDir() >> WORKDIR
toTaskBean() >> Mock(TaskBean) {
getWorkDir() >> WORKDIR
getInputFiles() >> [:]
}
}
and:
def SPEC = new AzVmPoolSpec(poolId: POOL_ID, vmType: Mock(AzVmType), opts: new AzPoolOpts([:]))

when:
def result = azure.createTask(POOL_ID, 'salmon', TASK)
then:
1 * azure.getPoolSpec(POOL_ID) >> SPEC
1 * azure.computeSlots(TASK, SPEC) >> 1
1 * azure.resourceFileUrls(TASK, SAS) >> []
1 * azure.outputFileUrls(TASK, SAS) >> []
and:
result.id == 'nf-01000000'
result.requiredSlots == 1
and:
result.commandLine == "/usr/bin/fusion bash /fusion/az/foo/work/dir/.command.run"
and:
result.containerSettings.imageName == 'ubuntu:latest'
result.containerSettings.containerRunOptions.contains('--privileged')
result.containerSettings.containerRunOptions.contains('-e FUSION_WORK=/fusion/az/foo/work/dir')
}

}
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package nextflow.cloud.azure.batch

import nextflow.cloud.azure.config.AzConfig
import nextflow.cloud.types.CloudMachineInfo
import nextflow.cloud.types.PriceModel
import nextflow.exception.ProcessUnrecoverableException
Expand All @@ -20,20 +21,52 @@ class AzBatchTaskHandlerTest extends Specification {

def 'should validate config' () {
when:
def task = Mock(TaskRun) { getName() >> 'foo'; }
def task = Mock(TaskRun) {
getName() >> 'foo'
getProcessor() >> Mock(TaskProcessor) {
getExecutor() >> Mock(Executor) { getName() >> 'azurebatch' }
}
}
and:
new AzBatchTaskHandler(task: task)
.validateConfiguration()
new AzBatchTaskHandler(task: task, executor: Mock(AzBatchExecutor) {
getConfig() >> new AzConfig([:])
}).validateConfiguration()
then:
def e = thrown(ProcessUnrecoverableException)
e.message.startsWith('No container image specified for process foo')


when:
task = Mock(TaskRun) { getName() >> 'foo'; getContainer() >> 'ubuntu' }
task = Mock(TaskRun) {
getName() >> 'foo'
getContainer() >> 'ubuntu'
}
def CONFIG = [batch: [requireContainer: true]]
and:
new AzBatchTaskHandler(task: task, executor: Mock(AzBatchExecutor) {
getConfig() >> new AzConfig(CONFIG)
}).validateConfiguration()
then:
noExceptionThrown()
}

def 'should ignore missing container if disabled' () {
given:
def task = Mock(TaskRun)
task.getName() >> 'batch-task'
task.getConfig() >> new TaskConfig()
task.getProcessor() >> Mock(TaskProcessor) {
getExecutor() >> Mock(Executor) { getName() >> 'azurebatch' }
}
and:
new AzBatchTaskHandler(task: task)
.validateConfiguration()
def handler = Spy(AzBatchTaskHandler)
handler.task = task
handler.executor = Mock(AzBatchExecutor) {
getConfig() >> new AzConfig([batch: [requireContainer: false]])
}

when:
handler.validateConfiguration()

then:
noExceptionThrown()
}
Expand Down
Loading