Skip to content

Improve kill task logic #5594

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ class CachedTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ class GridTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( batch ) {
batch.collect(executor, jobId)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ class NopeTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}

Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ class StoredTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
throw new UnsupportedOperationException()
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ class LocalTaskHandler extends TaskHandler implements FusionAwareTask {
* Force the submitted job to quit
*/
@Override
void kill() {
protected void killTask() {
if( !process ) return
final pid = ProcessHelper.pid(process)
log.trace("Killing process with pid: ${pid}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class NativeTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( result ) result.cancel(true)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -472,7 +472,7 @@ class K8sTaskHandler extends TaskHandler implements FusionAwareTask {
* Terminates the current task execution
*/
@Override
void kill() {
protected void killTask() {
if( cleanupDisabled() )
return

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package nextflow.processor
import static nextflow.processor.TaskStatus.*

import java.nio.file.NoSuchFileException
import java.util.concurrent.atomic.AtomicBoolean

import groovy.transform.CompileDynamic
import groovy.transform.CompileStatic
Expand All @@ -37,6 +38,8 @@ import nextflow.trace.TraceRecord
@CompileStatic
abstract class TaskHandler {

private AtomicBoolean killed = new AtomicBoolean()

protected TaskHandler(TaskRun task) {
this.task = task
}
Expand Down Expand Up @@ -77,10 +80,22 @@ abstract class TaskHandler {
abstract boolean checkIfCompleted()

/**
* Force the submitted job to quit
* Template method implementing the termination of a task execution.
* This is not mean to be invoked directly. See also {@link #kill()}
*/
abstract void kill()
abstract protected void killTask()

/**
* Kill a job execution.
*
* @see #killTask()
*/
void kill() {
if (!killed.getAndSet(true)) {
killTask()
}
}

/**
* Submit the task for execution.
*
Expand Down Expand Up @@ -301,12 +316,4 @@ abstract class TaskHandler {
return workflowId ? "tw-${workflowId}-${name}" : name
}

private volatile boolean terminated

final void terminate() {
if (!terminated) {
terminated = true
kill()
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,9 @@ import nextflow.exception.ProcessSubmitTimeoutException
import nextflow.executor.BatchCleanup
import nextflow.executor.GridTaskHandler
import nextflow.util.Duration
import nextflow.util.SysHelper
import nextflow.util.Threads
import nextflow.util.Throttle
import static nextflow.util.SysHelper.dumpThreads

/**
* Monitors the queued tasks waiting for their termination
*
Expand Down Expand Up @@ -471,7 +470,7 @@ class TaskPollingMonitor implements TaskMonitor {
}

protected dumpCurrentThreads() {
log.trace "Current running threads:\n${dumpThreads()}"
log.trace "Current running threads:\n${SysHelper.dumpThreads()}"
}

protected void dumpRunningQueue() {
Expand Down Expand Up @@ -581,7 +580,7 @@ class TaskPollingMonitor implements TaskMonitor {
catch (Throwable error) {
// At this point NF assumes job is not running, but there could be errors at monitoring that could leave a job running (#5516).
// In this case, NF needs to ensure the job is killed.
handler.terminate()
handler.kill()
handleException(handler, error)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -515,13 +515,13 @@ class K8sTaskHandlerTest extends Specification {
def handler = Spy(new K8sTaskHandler(client:client, podName: POD_NAME))

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> false
1 * client.podDelete(POD_NAME) >> null

when:
handler.kill()
handler.killTask()
then:
1 * handler.cleanupDisabled() >> true
0 * client.podDelete(POD_NAME) >> null
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class TaskPollingMonitorTest extends Specification {
then:
1 * session.disableJobsCancellation >> true
and:
0 * handler.kill() >> null
0 * handler.killTask() >> null
0 * session.notifyTaskComplete(handler) >> null
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,6 @@ class MockTaskHandler extends TaskHandler {
}

@Override
void kill() { }
protected void killTask() { }

}
Original file line number Diff line number Diff line change
Expand Up @@ -298,7 +298,7 @@ class AwsBatchTaskHandler extends TaskHandler implements BatchHandler<String,Job
* {@inheritDoc}
*/
@Override
void kill() {
protected void killTask() {
assert jobId
log.trace "[AWS BATCH] Process `${task.lazyName()}` - killing job=$jobId"
final targetId = normaliseJobId(jobId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -881,23 +881,23 @@ class AwsBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> true
and:
1 * handler.terminateJob('job1') >> null

when:
handler.@jobId = 'job1:task2'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> true
and:
1 * handler.terminateJob('job1') >> null

when:
handler.@jobId = 'job1:task2'
handler.kill()
handler.killTask()
then:
1 * executor.shouldDeleteJob('job1') >> false
and:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ class AzBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( !taskKey )
return
batchService.terminate(taskKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,7 @@ class GoogleBatchTaskHandler extends TaskHandler implements FusionAwareTask {
}

@Override
void kill() {
protected void killTask() {
if( isActive() ) {
log.trace "[GOOGLE BATCH] Process `${task.lazyName()}` - deleting job name=$jobId"
if( executor.shouldDeleteJob(jobId) )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,7 @@ class GoogleLifeSciencesTaskHandler extends TaskHandler {
}

@Override
void kill() {
protected void killTask() {
if( !operation ) return
log.debug "[GLS] Killing task > $task.name - Pipeline Id: $pipelineId"
helper.cancelOperation(operation)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> false
0 * executor.shouldDeleteJob('job1') >> true
Expand All @@ -568,7 +568,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> true
1 * executor.shouldDeleteJob('job1') >> true
Expand All @@ -577,7 +577,7 @@ class GoogleBatchTaskHandlerTest extends Specification {

when:
handler.@jobId = 'job1'
handler.kill()
handler.killTask()
then:
handler.isActive() >> true
1 * executor.shouldDeleteJob('job1') >> false
Expand Down
Loading