Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support disk directive in Azure Batch #5120

Open
wants to merge 27 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
33b3669
Azure uses disk directive when calculating slots on virtual machines
adamrtalbot Jun 25, 2024
f80bc55
Correct tests
adamrtalbot Jul 2, 2024
a2721aa
Add support for process.disk directive in Azure autoPools feature
adamrtalbot Jul 2, 2024
00b0ef0
Correct Azure Batch computeScore tests
adamrtalbot Jul 3, 2024
3b44a37
Read correct attribute from Azure VM
adamrtalbot Jul 8, 2024
c7e3a63
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Jul 8, 2024
52a5b86
fixup
adamrtalbot Jul 8, 2024
a5b4a01
Update plugins/nf-azure/src/test/nextflow/cloud/azure/batch/AzBatchSe…
adamrtalbot Jul 8, 2024
cb1620c
Use toGiga()
adamrtalbot Jul 9, 2024
dfbfb06
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 2, 2024
c93dc1a
Add disk directive support for Azure Batch tasks
adamrtalbot Dec 2, 2024
0a0fbea
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 2, 2024
89d319e
test fixup
adamrtalbot Dec 3, 2024
fc61f87
test fixup
adamrtalbot Dec 3, 2024
9db1ef7
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 3, 2024
7172d80
Remove debug statement
adamrtalbot Dec 3, 2024
91de04f
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 5, 2024
96e142e
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 9, 2024
5392e16
Update docs to reflect slots
adamrtalbot Dec 9, 2024
5b1568c
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 9, 2024
acd4bed
Update docs/azure.md
adamrtalbot Dec 9, 2024
8920652
Add regex to docs
adamrtalbot Dec 9, 2024
e864b9e
Handle resource vs osdisk size
adamrtalbot Dec 9, 2024
3601dfd
fixup
adamrtalbot Dec 9, 2024
fe387c4
Use length of name as a modifier for Azure Batch scores
adamrtalbot Dec 11, 2024
9aa2b9d
remove log.warn
adamrtalbot Dec 11, 2024
fda939a
Merge branch 'master' into 4920_azure_tasks_support_disk_directive
adamrtalbot Dec 11, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 16 additions & 1 deletion docs/azure.md
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ process EXAMPLE_PROCESS {

Note when creating tasks that use fewer than 4 CPUs, Nextflow will create a pool with machines that have 4 times the number of CPUs required in order to pack more tasks onto each machine. This means the pipeline spends less time waiting for machines to be created, startup and join the Azure Batch pool. Similarly, if a process requires fewer than 8 CPUs Nextflow will use a machine with double the number of CPUs required. If you wish to override this behaviour you can use a specific `machineType` directive, e.g. using a `machineType` directive of `Standard_E2d_v5` will use always use a Standard_E2d_v5 machine.

:::{note}
You can use the regular expressions to avoid certain types. For example using `standard_*[^p]_v*` will avoid any machine type that contains the letter `p` in the name, which includes ARM based machines.

The pool is not removed when the pipeline terminates, unless the configuration setting `deletePoolsOnCompletion = true` is added in your Nextflow configuration file.

Pool specific settings should be provided in the `auto` pool configuration scope. If you wish to specify a single machine size for all processes, you can specify a fixed `vmSize` for the `auto` pool.
Expand Down Expand Up @@ -266,6 +269,18 @@ azure {
```
:::

### Task packing on nodes

Each node is given a number of task slots, which is the number of tasks that can be run concurrently on the node. The number of task slots is determined by the number of cores for the selected VM. Nextflow will assign each process a number of task slots equal to a percentage of the total resources available on the node, based on the `cpus`, `memory` and `disk` directives.

For example, if using a `Standard_D4d_v5` machine with 4 cores, 16GB of memory and a 150GB local disk. If a process has the directives `cpus 2`, `memory 8.GB` or `disk 75.GB`, it will be assigned 1 task slot and 2 tasks will concurrently run on the node. If the process has `cpus 4`, `memory 2.GB` or `disk 150.GB`, it will be assigned 2 task slots and only 1 task will concurrently run on the node.

A node may become overprovisioned if the tasks are using more than their fraction of total resources. For example, in the above example if a process has the `cpus` directive set to 2, it will be assigned 1 task slot and 2 tasks will concurrently run on the node. If the process uses more than 8GB of memory or 75GB of disk space, the node might become overprovisioned and performance might degrade or the task will fail.

:::{warning}
The `cpus` directive is used to determine the number of task slots, not the number of cores.
:::

### Requirements on pre-existing named pools

When Nextflow is configured to use a pool already available in the Batch account, the target pool must satisfy the following requirements:
Expand Down Expand Up @@ -327,7 +342,7 @@ When Nextflow creates a pool of compute nodes, it selects:

Together, these settings determine the Operating System and version installed on each node.

By default, Nextflow creates pool nodes based on CentOS 8, but this behavior can be customised in the pool configuration. Below are configurations for image reference/SKU combinations to select two popular systems.
By default, Nextflow creates pool nodes based on Ubuntu 20.04, but this behavior can be customised in the pool configuration. Below are configurations for image reference/SKU combinations to select two popular systems.

- Ubuntu 20.04 (default):

Expand Down
1 change: 1 addition & 0 deletions docs/executor.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ Resource requests and other job characteristics can be controlled via the follow
- {ref}`process-container`
- {ref}`process-containerOptions`
- {ref}`process-cpus`
- {ref}`process-disk`
- {ref}`process-machineType`
- {ref}`process-memory`
- {ref}`process-queue`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,23 +159,23 @@ class AzBatchService implements Closeable {
return listAllVms(location).collect { it.name as String }
}

AzVmType guessBestVm(String location, int cpus, MemoryUnit mem, String family) {
AzVmType guessBestVm(String location, int cpus, MemoryUnit mem, MemoryUnit disk, String family) {
log.debug "[AZURE BATCH] guessing best VM given location=$location; cpus=$cpus; mem=$mem; family=$family"
if( !family.contains('*') && !family.contains('?') )
return findBestVm(location, cpus, mem, family)
return findBestVm(location, cpus, mem, disk, family)

// well this is a quite heuristic tentative to find a bigger instance to accommodate more tasks
AzVmType result=null
if( cpus<=4 ) {
result = findBestVm(location, cpus*4, mem!=null ? mem*4 : null, family)
result = findBestVm(location, cpus*4, mem!=null ? mem*4 : null, disk!=null ? disk*4 : null, family)
if( !result )
result = findBestVm(location, cpus*2, mem!=null ? mem*2 : null, family)
result = findBestVm(location, cpus*2, mem!=null ? mem*2 : null, disk!=null ? disk*2 : null, family)
}
else if( cpus <=8 ) {
result = findBestVm(location, cpus*2, mem!=null ? mem*2 : null, family)
result = findBestVm(location, cpus*2, mem!=null ? mem*2 : null, disk!=null ? disk*2 : null, family)
}
if( !result )
result = findBestVm(location, cpus, mem, family)
result = findBestVm(location, cpus, mem, disk, family)
return result
}

Expand All @@ -188,21 +188,22 @@ class AzBatchService implements Closeable {
* @param allFamilies Comma separate list of Azure VM machine types, each value can also contain wildcard characters ie. `*` and `?`
* @return The `AzVmType` instance that best accommodate the resource requirement
*/
AzVmType findBestVm(String location, int cpus, MemoryUnit mem, String allFamilies) {
AzVmType findBestVm(String location, int cpus, MemoryUnit mem, MemoryUnit disk, String allFamilies) {
def all = listAllVms(location)
def scores = new TreeMap<Double,String>()
List<Tuple2<Double,String>> scores = []
def list = allFamilies ? allFamilies.tokenize(',') : ['']
for( String family : list ) {
for( Map entry : all ) {
if( !matchType(family, entry.name as String) )
continue
def score = computeScore(cpus, mem, entry)
if( score != null )
scores.put(score, entry.name as String)
def score = computeScore(cpus, mem, disk, entry)
if( score != null ) {
scores << new Tuple2(score, entry.name as String)
}
}
}

return scores ? getVmType(location, scores.firstEntry().value) : null
def sortedScores = scores.sort { it[0] }
return sortedScores ? getVmType(location, sortedScores.first()[1] as String) : null
}

protected boolean matchType(String family, String vmType) {
Expand All @@ -216,25 +217,52 @@ class AzBatchService implements Closeable {
return vmType =~ /(?i)^${family}$/
}

protected Double computeScore(int cpus, MemoryUnit mem, Map entry) {
protected Double computeScore(int cpus, MemoryUnit mem, MemoryUnit disk, Map entry) {
def vmCores = entry.numberOfCores as int
double vmMemGb = (entry.memoryInMB as int) /1024
double vmDiskGb = entry.resourceDiskSizeInMB ? (entry.resourceDiskSizeInMB as int) / 1024 : 0.0

if( cpus > vmCores ) {
// If requested CPUs exceed available, disqualify
if( cpus > vmCores )
return null
}

int cpusDelta = cpus-vmCores
double score = cpusDelta * cpusDelta
// If disk is requested but VM has no resource disk, disqualify
if( disk && vmDiskGb == 0.0 )
return null

// Calculate weighted scores
double score = 0.0

// CPU score - heavily weight exact matches
double cpuScore = Math.abs(cpus - vmCores)
score += cpuScore * 10 // Give more weight to CPU match

// Memory score if specified
if( mem && vmMemGb ) {
double memGb = mem.toMega()/1024
if( memGb > vmMemGb )
return null
double memDelta = memGb - vmMemGb
score += memDelta*memDelta
double memScore = Math.abs(memGb - vmMemGb)
score += memScore
}

// Disk score if specified
if( disk && vmDiskGb != MemoryUnit.ZERO) {
double diskGb = disk.toMega()/1024
if( diskGb > vmDiskGb )
return null
double diskScore = Math.abs(diskGb - vmDiskGb) / 100 // Reduce disk impact
score += diskScore
}

return Math.sqrt(score)
// Round to 3 decimal places
if (score == 0.0) return 0.0

// Add a small fraction based on name length to uniqueify names
// and sort scores by VM name from smallest to largest
// VM sizes with shorter names have fewer features and are less expensive
score += 1-(1.0/entry.name.toString().length())
return new BigDecimal(score).setScale(3, RoundingMode.HALF_UP).doubleValue()
}

@Memoized
Expand All @@ -246,32 +274,55 @@ class AzBatchService implements Closeable {
new AzVmType(vm)
}

protected int computeSlots(int cpus, MemoryUnit mem, int vmCpus, MemoryUnit vmMem) {
// cpus requested should not exceed max cpus avail
final cpuSlots = Math.min(cpus, vmCpus) as int
protected int computeSlots(int cpus, MemoryUnit mem, MemoryUnit disk, int vmCpus, MemoryUnit vmMem, MemoryUnit vmDisk) {
// cpus requested should not exceed max cpus avail
// Max slots is 256
final cpuSlots = Collections.min([cpus, vmCpus, 256]) as int
if( !mem || !vmMem )
return cpuSlots

// mem requested should not exceed max mem avail
final vmMemGb = vmMem.mega /_1GB as float
final memGb = mem.mega /_1GB as float
final vmMemGb = vmMem.toGiga() as float
final memGb = mem.toGiga() as float
final mem0 = Math.min(memGb, vmMemGb)
return Math.max(cpuSlots, memSlots(mem0, vmMemGb, vmCpus))
final nMemSlots = memSlots(mem0, vmMemGb, vmCpus)

// If disk and vmDisk are not supplied, grab the max of cpuSlots and nMemSlots
if ( !disk || !vmDisk)
return Math.max(cpuSlots, nMemSlots)

// Get slots based on disk usage
final vmDiskGb = vmDisk.toGiga()
final diskGb = disk.toGiga()
final disk0 = Math.min(diskGb, vmDiskGb)
final nDiskSlots = diskSlots(disk0, vmDiskGb, vmCpus)

// Get maximum slots per VM based on CPU, memory, and disk
return Collections.max([cpuSlots, nMemSlots, nDiskSlots])
}

protected int computeSlots(TaskRun task, AzVmPoolSpec pool) {
computeSlots(
task.config.getCpus(),
task.config.getMemory(),
task.config.getDisk(),
pool.vmType.numberOfCores,
pool.vmType.memory )
pool.vmType.memory,
pool.vmType.resourceDiskSize )
}


protected int memSlots(float memGb, float vmMemGb, int vmCpus) {
BigDecimal result = memGb / (vmMemGb / vmCpus)
log.warn("[AZURE BATCH] memSlots: memGb=${memGb}, vmMemGb=${vmMemGb}, vmCpus=${vmCpus}, result=${result}")
result.setScale(0, RoundingMode.UP).intValue()
}

protected int diskSlots(float disk, float vmDisk, int vmCpus) {
BigDecimal result = disk / (vmDisk / vmCpus)
log.warn("[AZURE BATCH] diskSlots: disk=${disk}, vmDisk=${vmDisk}, vmCpus=${vmCpus}, result=${result}")
result.setScale(0, RoundingMode.UP).intValue()
}

protected AzureNamedKeyCredential createBatchCredentialsWithKey() {
log.debug "[AZURE BATCH] Creating Azure Batch client using shared key creddentials"
Expand Down Expand Up @@ -559,11 +610,12 @@ class AzBatchService implements Closeable {
final opts = config.batch().autoPoolOpts()
final mem = task.config.getMemory()
final cpus = task.config.getCpus()
final disk = task.config.getDisk()
final type = task.config.getMachineType() ?: opts.vmType
if( !type )
throw new IllegalArgumentException("Missing Azure Batch VM type for task '${task.name}'")

final vmType = guessBestVm(loc, cpus, mem, type)
final vmType = guessBestVm(loc, cpus, mem, disk, type)
if( !vmType ) {
def msg = "Cannot find a VM for task '${task.name}' matching these requirements: type=$type, cpus=${cpus}, mem=${mem?:'-'}, location=${loc}"
throw new IllegalArgumentException(msg)
Expand Down Expand Up @@ -716,7 +768,7 @@ class AzBatchService implements Closeable {
.setVirtualMachineConfiguration(poolVmConfig(spec.opts))
// same as the number of cores
// https://docs.microsoft.com/en-us/azure/batch/batch-parallel-node-tasks
.setTaskSlotsPerNode(spec.vmType.numberOfCores)
.setTaskSlotsPerNode(Math.min(256, spec.vmType.numberOfCores))

final startTask = createStartTask(spec.opts.startTask)
if( startTask ) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ class AzVmType {
this.maxDataDiskCount = map.maxDataDiskCount as Integer
this.memory = map.memoryInMB ? MemoryUnit.of( "$map.memoryInMB MB" ) : null
this.numberOfCores = map.numberOfCores as Integer
this.osDiskSize = map.osDiskSizeInMB ? MemoryUnit.of( "$map.osDiskSizeInMB MB" ) : null
this.resourceDiskSize = map.resourceDiskSizeInMB ? MemoryUnit.of( "$map.resourceDiskSizeInMB MB" ) : null
this.osDiskSize = map.osDiskSizeInMB ? MemoryUnit.of( "$map.osDiskSizeInMB MB" ) : MemoryUnit.of("0 MB")
this.resourceDiskSize = map.resourceDiskSizeInMB ? MemoryUnit.of( "$map.resourceDiskSizeInMB MB" ) : MemoryUnit.of("0 MB")
}
}
Loading
Loading