Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Job arrays #3892

Merged
merged 113 commits into from
May 9, 2024
Merged

Job arrays #3892

merged 113 commits into from
May 9, 2024

Conversation

bentsherman
Copy link
Member

@bentsherman bentsherman commented Apr 21, 2023

Closes #1477 (and possibly #1427)

Summary of changes:

  • Adds array directive to submit tasks as array jobs of a given size

  • TaskArrayCollector collects tasks into arrays and submits each array job when it is ready to the underlying executor. The executor must implement the TaskArrayAware interface. Each process has its own array job collector.

    When all input channels to a process have received the "poison pill", the process is "closed" and the array job collector is notified so that it can submit any remaining tasks. All subsequent tasks (e.g. retries) will be submitted as individual tasks.

  • TaskArray is a special type of TaskRun for an array job that holds the list of child task handlers. For an executor that supports array jobs, the task handler can check whether its task is a TaskArray to apply perform job specific behavior.

  • TaskHandler has a few more methods, which the array job collector uses to create the array job script. This script simply defines the list of child work directories, selects a work dir based on the index, and launches the child task using an executor-specific launch command.

  • TaskPollingMonitor has been modified to handle both array jobs and regular tasks. The array job is handled like any other task, but then discarded once it has been submitted. The task stats reported by Nextflow are the same with or without array jobs -- array jobs are not included in the task stats.

Here's the pipeline I'm using as the e2e test:

params.n_tasks = 50

process foo {
    array 10

    input: val index
    output: path 'output.txt'

    """
    echo "Hello from task ${index}!" > output.txt
    """
}

process bar {
    debug true
    array 10

    input: path 'input.txt'

    """
    cat input.txt
    """
}

workflow {
    Channel.of(1 .. params.n_tasks) | foo | bar
}

TODO:

  • documentation
  • unit tests
  • extra
    • AWS Batch: kill array job instead of child jobs
    • Google Batch: kill array job instead of child jobs
    • Grid executors: add array index environment var to container
    • handle retried tasks with dynamic resources
  • manual e2e tests
    • SLURM
    • SLURM + Fusion
    • AWS Batch
    • AWS Batch + Fusion
    • Google Batch
    • Google Batch + Fusion

@bentsherman

This comment was marked as outdated.

Signed-off-by: Ben Sherman <[email protected]>
@bentsherman

This comment was marked as outdated.

@bentsherman
Copy link
Member Author

If a task fails and is retried with increased resources, it will be batched with other tasks that may still be on their first attempt. In that case, the array job resources will depend on whichever tasks happens to be first in the batch.

One solution is to take the max value of cpus, memory, time, etc for all tasks in an array job. That would be "safe" but likely much more expensive -- if a single task requests twice the resources, suddenly the entire array job does as well.

Another solution is to further separate batches by configuration, to ensure that they are uniform. We could go crazy and separate batches by the tuple of (cpus, memory, time, ...), but I think that would be overkill. Better I think to just split based on attempt and tell users to "handle with care".

@bentsherman
Copy link
Member Author

bentsherman commented Apr 21, 2023

We could also just provide config options for these things:

  • executor.$array.groupKeys (default: ['process', 'attempt']) controls how batches are separated

  • executor.$array.requestMaxResources controls whether the array executor "plays if safe" by taking the max resources across all tasks in an array

@bentsherman
Copy link
Member Author

The point of these config options is that there is a trade-off between bandwidth and latency when batching tasks like this, so users should ideally have the ability to manage that trade-off in a way that best fits their use case. If someone doesn't use retry with dynamic resources, then they don't need to group by attempt, and vise versa.

@bentsherman

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@pditommaso

This comment was marked as outdated.

@abhi18av

This comment was marked as outdated.

pditommaso

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

@pditommaso

This comment was marked as outdated.

@bentsherman

This comment was marked as outdated.

Signed-off-by: Ben Sherman <[email protected]>
@pditommaso
Copy link
Member

umm, this still shows

Uploading local `bin` scripts folder to az://my-data/work/tmp/ef/5754c0af6a3622dadd5bab803c316c/bin
Monitor the execution with Seqera Platform using this URL: https://cloud.seqera.io/user/pditommaso/watch/3NQ9I51JIGRbX4
Error: Exception in thread "tower-logs-checkpoint" java.lang.NullPointerException: Cannot invoke "java.lang.Thread.isInterrupted()" because "this.thread" is null
	at io.seqera.tower.plugin.LogsCheckpoint.run(LogsCheckpoint.groovy:70)
	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
	at org.codehaus.groovy.reflection.CachedMethod.invoke(CachedMethod.java:343)
	at groovy.lang.MetaMethod.doMethodInvoke(MetaMethod.java:328)
	at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1333)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
	at groovy.lang.MetaClassImpl.invokeMethodClosure(MetaClassImpl.java:1017)
	at groovy.lang.MetaClassImpl.doInvokeMethod(MetaClassImpl.java:1207)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1088)
	at groovy.lang.MetaClassImpl.invokeMethod(MetaClassImpl.java:1007)
	at groovy.lang.Closure.call(Closure.java:433)
	at groovy.lang.Closure.call(Closure.java:412)
	at groovy.lang.Closure.run(Closure.java:[505](https://github.com/nextflow-io/nextflow/actions/runs/8991243665/job/24698707803#step:5:506))
	at java.base/java.lang.VirtualThread.run(VirtualThread.java:309)

@bentsherman
Copy link
Member Author

Yeah I didn't think the volatile would help. Really there is no point in checking if the current thread is interrupted...

@pditommaso
Copy link
Member

That's the recommended pattern by Java Java Concurrency in Practice bible

@pditommaso
Copy link
Member

Reverted 247b721 because the problem comes from the fact the thread starts before the variable is assigned.

The bottom line is that this code is faulty

@pditommaso
Copy link
Member

@bentsherman all solved with Google Batch logs and child ids?

@bentsherman
Copy link
Member Author

Google Batch logs are working

Why would you check if the thread you are currently in is interrupted? If it's interrupted then you wouldn't get the chance to check if it's interrupted, you would just be interrupted...

@pditommaso
Copy link
Member

@bentsherman
Copy link
Member Author

Should be solved with: !thread?.isInterrupted()

@pditommaso
Copy link
Member

think so

@pditommaso
Copy link
Member

Fascinating the dir content when using job array (only for nerds :))

» tree work/
work/
├── 17
│   └── 5ee762d51d42993b304ec32c2ac69e
├── 38
│   └── 30315d77627dd73e5d2c31b92c8d2b
│       ├── slurm-8_0.out
│       └── slurm-8_1.out
├── 46
│   └── 59224af77ca4b3daf702a4507ffed8
├── bc
│   └── 15aecb161e4bb851a7f7ff27b6b728
├── d0
│   └── 43bb64e2b8b5e78147e93894e495d7
└── e2
    └── 119acedcd1ca2d851bc3abd00ae99a
        ├── slurm-6_0.out
        └── slurm-6_1.out

pditommaso added 3 commits May 9, 2024 10:57
Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
@pditommaso
Copy link
Member

Reverted some renaming on launch <> submit methods because still there were some inconsistencies and to prevent breaking the xpack deps

@pditommaso
Copy link
Member

Think we are finally ready to merge this. Great effort 👏 👏

@pditommaso pditommaso merged commit ca9bc9d into master May 9, 2024
22 checks passed
@pditommaso pditommaso deleted the 1477-job-array-executor branch May 9, 2024 10:16
pditommaso added a commit that referenced this pull request May 9, 2024
Job array is a capability provided by some  batch schedulers that allows spawning the execution 
of multiple copies of the same job in an efficient manner. 

Nextflow allows the use of this capability by setting the process directive `array <some value>` that 
determines  the (max) number of jobs in the array. For example 

```
process foo  {
  array 10 
  '''
  your_task
  '''
}
```

or  in the nextflow config file 


```
process.array = 10 
```

Currently this feature is supported by the following executors:
 
* Slurm
* Sge
* Pbs
* Pbs Pro
* LSF 
* AWS Batch
* Google Batch


Signed-off-by: Ben Sherman <[email protected]>
Signed-off-by: Paolo Di Tommaso <[email protected]>
Signed-off-by: Mahesh Binzer-Panchal <[email protected]>
Signed-off-by: Herman Singh <[email protected]>
Signed-off-by: Dr Marco Claudio De La Pierre <[email protected]>
Co-authored-by: Paolo Di Tommaso <[email protected]>
Co-authored-by: Abhinav Sharma <[email protected]>
Co-authored-by: Mahesh Binzer-Panchal <[email protected]>
Co-authored-by: Herman Singh <[email protected]>
Co-authored-by: Dr Marco Claudio De La Pierre <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment