diff --git a/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy b/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy index 25939c7e90..4a651aa9d6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/executor/res/DiskResource.groovy @@ -23,7 +23,7 @@ import nextflow.util.MemoryUnit /** * Models disk resource request - * + * * @author Ben Sherman */ @ToString(includeNames = true, includePackage = false) @@ -33,30 +33,34 @@ class DiskResource { final MemoryUnit request final String type + final Boolean fusion - DiskResource( value ) { + DiskResource(value) { this(request: value) } - DiskResource( Map opts ) { - this.request = toMemoryUnit(opts.request) + DiskResource(Map opts) { + this.request = opts.request != null ? toMemoryUnit(opts.request) : null - if( opts.type ) + if (opts.type) this.type = opts.type as String + + if (opts.fusion != null) + this.fusion = opts.fusion as Boolean } DiskResource withRequest(MemoryUnit value) { - return new DiskResource(request: value, type: this.type) + return new DiskResource(request: value, type: this.type, fusion: this.fusion) } - private static MemoryUnit toMemoryUnit( value ) { - if( value instanceof MemoryUnit ) - return (MemoryUnit)value + private static MemoryUnit toMemoryUnit(value) { + if (value instanceof MemoryUnit) + return (MemoryUnit) value try { return new MemoryUnit(value.toString().trim()) } - catch( Exception e ) { + catch (Exception e) { throw new IllegalArgumentException("Not a valid disk value: $value") } } diff --git a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionAwareTask.groovy b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionAwareTask.groovy index d75c8f44fc..4ffc135775 100644 --- a/modules/nextflow/src/main/groovy/nextflow/fusion/FusionAwareTask.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/fusion/FusionAwareTask.groovy @@ -42,7 +42,14 @@ trait FusionAwareTask { boolean fusionEnabled() { if( fusionEnabled==null ) { - fusionEnabled = getExecutor0().isFusionEnabled() + // Check task-level disk.fusion setting first + def diskFusion = getTask().config?.getDiskResource()?.fusion + if( diskFusion != null ) { + fusionEnabled = diskFusion + } else { + // Fall back to executor-level setting + fusionEnabled = getExecutor0().isFusionEnabled() + } } return fusionEnabled } diff --git a/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy b/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy index d0541ca9ec..9342f2176b 100644 --- a/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/executor/res/DiskResourceTest.groovy @@ -35,17 +35,23 @@ class DiskResourceTest extends Specification { then: disk.request == REQ disk.type == TYPE + disk.fusion == FUSION where: - VALUE | REQ | TYPE - _100_GB | _100_GB | null - [request: _100_GB] | _100_GB | null - [request: _375_GB, type: 'local-ssd'] | _375_GB | 'local-ssd' + VALUE | REQ | TYPE | FUSION + _100_GB | _100_GB | null | null + [request: _100_GB] | _100_GB | null | null + [request: _375_GB, type: 'local-ssd'] | _375_GB | 'local-ssd' | null + [request: _100_GB, fusion: true] | _100_GB | null | true + [request: _100_GB, fusion: false] | _100_GB | null | false + [request: _375_GB, type: 'local-ssd', fusion: false] | _375_GB | 'local-ssd' | false } def 'should return a disk resource with the specified request' () { expect: new DiskResource(request: _100_GB).withRequest(_375_GB) == new DiskResource(request: _375_GB) new DiskResource(request: _100_GB, type: 'ssd').withRequest(_375_GB) == new DiskResource(request: _375_GB, type: 'ssd') + new DiskResource(request: _100_GB, fusion: true).withRequest(_375_GB) == new DiskResource(request: _375_GB, fusion: true) + new DiskResource(request: _100_GB, type: 'ssd', fusion: false).withRequest(_375_GB) == new DiskResource(request: _375_GB, type: 'ssd', fusion: false) } }