diff --git a/docs/reference/config.md b/docs/reference/config.md index 0efb1ce6ae..80b39c355b 100644 --- a/docs/reference/config.md +++ b/docs/reference/config.md @@ -395,6 +395,11 @@ The following settings are available: `azure.batch.pools..vmType` : Specify the virtual machine type used by the pool identified with ``. +`azure.batch.requireContainer` +: :::{versionadded} 25.01.0-edge + ::: +: Require the use of a container when running tasks (default: `true`). Note this requires an existing Azure Batch pool configured to not use a container image. Running a process without a container when the node pool requires one will raise an error on Azure Batch + `azure.managedIdentity.clientId` : Specify the client ID for an Azure [managed identity](https://learn.microsoft.com/en-us/entra/identity/managed-identities-azure-resources/overview). See {ref}`azure-managed-identities` for more details. diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy index cd3e4b37f5..66ae83723f 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchService.groovy @@ -392,8 +392,8 @@ class AzBatchService implements Closeable { throw new IllegalArgumentException("Missing Azure Blob storage SAS token") final container = task.getContainer() - if( !container ) - throw new IllegalArgumentException("Missing container image for process: $task.name") + if( !container && config.batch().requireContainer ) + throw new IllegalArgumentException("Missing container image for process: $task.name\nYou can disable this behaviour setting `azure.batch.requireContainer=false` in the nextflow config file") final taskId = "nf-${task.hash.toString()}" // get the pool config final pool = getPoolSpec(poolId) @@ -419,8 +419,11 @@ class AzBatchService implements Closeable { } } // config overall container settings - final containerOpts = new BatchTaskContainerSettings(container) + BatchTaskContainerSettings containerOpts = null + if (container) { + containerOpts = new BatchTaskContainerSettings(container) .setContainerRunOptions(opts) + } // submit command line final String cmd = fusionEnabled ? launcher.fusionSubmitCli(task).join(' ') @@ -434,15 +437,18 @@ class AzBatchService implements Closeable { log.trace "[AZURE BATCH] Submitting task: $taskId, cpus=${task.config.getCpus()}, mem=${task.config.getMemory()?:'-'}, slots: $slots" - return new BatchTaskCreateContent(taskId, cmd) + final batchTask = new BatchTaskCreateContent(taskId, cmd) .setUserIdentity(userIdentity(pool.opts.privileged, pool.opts.runAs, AutoUserScope.TASK)) - .setContainerSettings(containerOpts) .setResourceFiles(resourceFileUrls(task, sas)) .setOutputFiles(outputFileUrls(task, sas)) .setRequiredSlots(slots) .setConstraints(constraints) - + if (containerOpts) { + batchTask.setContainerSettings(containerOpts) + } + + return batchTask } AzTaskKey runTask(String poolId, String jobId, TaskRun task) { diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy index 581bed4ec7..06448b9fd2 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/batch/AzBatchTaskHandler.groovy @@ -71,8 +71,9 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask { } void validateConfiguration() { - if (!task.container) { - throw new ProcessUnrecoverableException("No container image specified for process $task.name -- Either specify the container to use in the process definition or with 'process.container' value in your config") + println "AZURE_BATCH_CONFIG: ${executor.config.batch()}" + if (!task.container && executor.config.batch().requireContainer ) { + throw new ProcessUnrecoverableException("No container image specified for process $task.name -- Either specify the container to use in the process definition or with 'process.container' value in your config. You can disable this behaviour setting `azure.batch.requireContainer=false` in the nextflow config file") } } diff --git a/plugins/nf-azure/src/main/nextflow/cloud/azure/config/AzBatchOpts.groovy b/plugins/nf-azure/src/main/nextflow/cloud/azure/config/AzBatchOpts.groovy index 358c70413d..cd11f88161 100644 --- a/plugins/nf-azure/src/main/nextflow/cloud/azure/config/AzBatchOpts.groovy +++ b/plugins/nf-azure/src/main/nextflow/cloud/azure/config/AzBatchOpts.groovy @@ -54,6 +54,7 @@ class AzBatchOpts implements CloudTransferOptions { Boolean deleteJobsOnCompletion Boolean deletePoolsOnCompletion Boolean deleteTasksOnCompletion + Boolean requireContainer CopyToolInstallMode copyToolInstallMode Map pools @@ -67,6 +68,7 @@ class AzBatchOpts implements CloudTransferOptions { location = config.location autoPoolMode = config.autoPoolMode allowPoolCreation = config.allowPoolCreation + requireContainer = config.requireContainer == null ? true : config.requireContainer terminateJobsOnCompletion = config.terminateJobsOnCompletion != Boolean.FALSE deleteJobsOnCompletion = config.deleteJobsOnCompletion deletePoolsOnCompletion = config.deletePoolsOnCompletion diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy index eceeb644b2..45b58011bb 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchServiceTest.groovy @@ -739,4 +739,80 @@ class AzBatchServiceTest extends Specification { [managedIdentity: [clientId: 'client-123']] | 'client-123' } + def 'should create task for submit without container' () { + given: + Global.session = Mock(Session) { getConfig()>>[:] } + and: + def POOL_ID = 'my-pool' + def SAS = '123' + def CONFIG = [storage: [sasToken: SAS], batch: [requireContainer: false]] + def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(CONFIG) } + AzBatchService azure = Spy(new AzBatchService(exec)) + and: + def TASK = Mock(TaskRun) { + getHash() >> HashCode.fromInt(1) + getContainer() >> null + getConfig() >> Mock(TaskConfig) + } + and: + def SPEC = new AzVmPoolSpec(poolId: POOL_ID, vmType: Mock(AzVmType), opts: new AzPoolOpts([:])) + + when: + def result = azure.createTask(POOL_ID, 'salmon', TASK) + then: + 1 * azure.getPoolSpec(POOL_ID) >> SPEC + 1 * azure.computeSlots(TASK, SPEC) >> 4 + 1 * azure.resourceFileUrls(TASK, SAS) >> [] + 1 * azure.outputFileUrls(TASK, SAS) >> [] + and: + result.id == 'nf-01000000' + result.requiredSlots == 4 + and: + result.commandLine == "sh -c 'bash .command.run 2>&1 | tee .command.log'" + and: + result.containerSettings == null + } + + def 'should create task for submit with container and fusion' () { + given: + def SAS = '1234567890' * 10 + def AZURE = [storage: [sasToken: SAS, accountName: 'my-account']] + Global.session = Mock(Session) { getConfig()>>[fusion:[enabled:true], azure: AZURE] } + def WORKDIR = FileSystemPathFactory.parse('az://foo/work/dir') + and: + def POOL_ID = 'my-pool' + def exec = Mock(AzBatchExecutor) {getConfig() >> new AzConfig(AZURE) } + AzBatchService azure = Spy(new AzBatchService(exec)) + and: + def TASK = Mock(TaskRun) { + getHash() >> HashCode.fromInt(1) + getContainer() >> 'ubuntu:latest' + getConfig() >> Mock(TaskConfig) + getWorkDir() >> WORKDIR + toTaskBean() >> Mock(TaskBean) { + getWorkDir() >> WORKDIR + getInputFiles() >> [:] + } + } + and: + def SPEC = new AzVmPoolSpec(poolId: POOL_ID, vmType: Mock(AzVmType), opts: new AzPoolOpts([:])) + + when: + def result = azure.createTask(POOL_ID, 'salmon', TASK) + then: + 1 * azure.getPoolSpec(POOL_ID) >> SPEC + 1 * azure.computeSlots(TASK, SPEC) >> 1 + 1 * azure.resourceFileUrls(TASK, SAS) >> [] + 1 * azure.outputFileUrls(TASK, SAS) >> [] + and: + result.id == 'nf-01000000' + result.requiredSlots == 1 + and: + result.commandLine == "/usr/bin/fusion bash /fusion/az/foo/work/dir/.command.run" + and: + result.containerSettings.imageName == 'ubuntu:latest' + result.containerSettings.containerRunOptions.contains('--privileged') + result.containerSettings.containerRunOptions.contains('-e FUSION_WORK=/fusion/az/foo/work/dir') + } + } diff --git a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy index 6d0b40d4c4..fbecf1b8be 100644 --- a/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy +++ b/plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchTaskHandlerTest.groovy @@ -1,5 +1,6 @@ package nextflow.cloud.azure.batch +import nextflow.cloud.azure.config.AzConfig import nextflow.cloud.types.CloudMachineInfo import nextflow.cloud.types.PriceModel import nextflow.exception.ProcessUnrecoverableException @@ -20,20 +21,52 @@ class AzBatchTaskHandlerTest extends Specification { def 'should validate config' () { when: - def task = Mock(TaskRun) { getName() >> 'foo'; } + def task = Mock(TaskRun) { + getName() >> 'foo' + getProcessor() >> Mock(TaskProcessor) { + getExecutor() >> Mock(Executor) { getName() >> 'azurebatch' } + } + } and: - new AzBatchTaskHandler(task: task) - .validateConfiguration() + new AzBatchTaskHandler(task: task, executor: Mock(AzBatchExecutor) { + getConfig() >> new AzConfig([:]) + }).validateConfiguration() then: def e = thrown(ProcessUnrecoverableException) e.message.startsWith('No container image specified for process foo') - when: - task = Mock(TaskRun) { getName() >> 'foo'; getContainer() >> 'ubuntu' } + task = Mock(TaskRun) { + getName() >> 'foo' + getContainer() >> 'ubuntu' + } + def CONFIG = [batch: [requireContainer: true]] + and: + new AzBatchTaskHandler(task: task, executor: Mock(AzBatchExecutor) { + getConfig() >> new AzConfig(CONFIG) + }).validateConfiguration() + then: + noExceptionThrown() + } + + def 'should ignore missing container if disabled' () { + given: + def task = Mock(TaskRun) + task.getName() >> 'batch-task' + task.getConfig() >> new TaskConfig() + task.getProcessor() >> Mock(TaskProcessor) { + getExecutor() >> Mock(Executor) { getName() >> 'azurebatch' } + } and: - new AzBatchTaskHandler(task: task) - .validateConfiguration() + def handler = Spy(AzBatchTaskHandler) + handler.task = task + handler.executor = Mock(AzBatchExecutor) { + getConfig() >> new AzConfig([batch: [requireContainer: false]]) + } + + when: + handler.validateConfiguration() + then: noExceptionThrown() }