From 5325e5a62747472fae70c75662d2e82bd925d40e Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 23 Jan 2025 03:28:20 -0600 Subject: [PATCH] Fail the run if publish thread pool times out (#5578) [ci fast] Signed-off-by: Ben Sherman Signed-off-by: Paolo Di Tommaso Co-authored-by: Paolo Di Tommaso --- .../src/main/groovy/nextflow/Session.groovy | 14 ++++++++++++-- .../nextflow/util/ThreadPoolHelper.groovy | 10 ++++------ .../nextflow/util/ThreadPoolManager.groovy | 16 ++++++++++++++++ .../nextflow/processor/TaskConfigTest.groovy | 1 - .../cloud/aws/batch/AwsBatchExecutor.groovy | 13 ++++++++++++- .../main/nextflow/cloud/aws/nio/S3Client.java | 14 -------------- .../nextflow/cloud/aws/nio/S3OutputStream.java | 18 ------------------ 7 files changed, 44 insertions(+), 42 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index c9054e6673..08c1488382 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -23,6 +23,7 @@ import java.nio.file.Paths import java.util.concurrent.ConcurrentLinkedQueue import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException import com.google.common.hash.HashCode import groovy.transform.CompileDynamic @@ -689,8 +690,17 @@ class Session implements ISession { try { log.trace "Session > destroying" // shutdown thread pools - finalizePoolManager?.shutdown(aborted) - publishPoolManager?.shutdown(aborted) + try { + finalizePoolManager?.shutdownOrAbort(aborted,this) + publishPoolManager?.shutdownOrAbort(aborted,this) + } + catch( TimeoutException e ) { + final ignoreErrors = config.navigate('workflow.output.ignoreErrors', false) + if( !ignoreErrors ) + throw new AbortOperationException("Timed out while waiting to publish outputs") + else + log.warn e.message + } // invoke shutdown callbacks shutdown0() log.trace "Session > after cleanup" diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy index 12c736bd60..d06addfa73 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy @@ -20,11 +20,11 @@ package nextflow.util import java.util.concurrent.ExecutorService import java.util.concurrent.ThreadPoolExecutor import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import groovy.transform.CompileStatic import groovy.util.logging.Slf4j import jdk.internal.vm.ThreadContainer - /** * Thread pool helpers * @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer @Slf4j class ThreadPoolHelper { - static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) { + static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) throws TimeoutException { final max = maxAwait.millis final t0 = System.currentTimeMillis() // wait for ongoing file transfer to complete @@ -45,10 +45,8 @@ class ThreadPoolHelper { break final delta = System.currentTimeMillis()-t0 - if( delta > max ) { - log.warn(exitMsg) - break - } + if( delta > max ) + throw new TimeoutException(exitMsg) // log to console every 10 minutes (120 * 5 sec) if( count % 120 == 0 ) { diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy index d1ea726f74..892d81998d 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy @@ -19,6 +19,7 @@ package nextflow.util import java.util.concurrent.ExecutorService import java.util.concurrent.Executors +import java.util.concurrent.TimeoutException import java.util.concurrent.atomic.AtomicInteger import groovy.transform.CompileStatic @@ -26,6 +27,8 @@ import groovy.util.logging.Slf4j import nextflow.Global import nextflow.ISession import nextflow.Session +import nextflow.exception.AbortOperationException + /** * Holder object for file transfer thread pool * @@ -129,6 +132,19 @@ class ThreadPoolManager { log.debug "Thread pool '$name' shutdown completed (hard=$hard)" } + void shutdownOrAbort(boolean hard, Session session) throws AbortOperationException { + try { + shutdown(hard) + } + catch( TimeoutException e ) { + final ignoreErrors = session.config.navigate('workflow.output.ignoreErrors', false) + if( ignoreErrors ) + log.warn(e.message) + else + throw new AbortOperationException("Timed out while waiting to publish outputs", e) + } + } + static ExecutorService create(String name, int maxThreads=0) { final session = Global.session as Session new ThreadPoolManager(name) diff --git a/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy b/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy index b997c00a25..ace82766bc 100644 --- a/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy +++ b/modules/nextflow/src/test/groovy/nextflow/processor/TaskConfigTest.groovy @@ -32,7 +32,6 @@ import spock.lang.Specification */ class TaskConfigTest extends Specification { - def testShell() { when: diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index 1ce8875521..4fb8fb5b8c 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -18,6 +18,7 @@ package nextflow.cloud.aws.batch import java.nio.file.Path import java.util.concurrent.TimeUnit +import java.util.concurrent.TimeoutException import com.amazonaws.services.batch.AWSBatch import com.amazonaws.services.batch.model.AWSBatchException @@ -320,7 +321,17 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec reaper.shutdown() final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)" final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" - ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) + awaitCompletion(reaper, Duration.of('60min'), waitMsg, exitMsg) + + } + + protected void awaitCompletion(ThrottlingExecutor executor, Duration duration, String waitMsg, String exitMsg) { + try { + ThreadPoolHelper.await(executor, duration, waitMsg, exitMsg) + } + catch( TimeoutException e ) { + log.warn(e.message, e) + } } @Override diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java index 9661afd657..25aadbb2c8 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java @@ -660,18 +660,4 @@ public void uploadDirectory(File source, S3Path target) { String getObjectKmsKeyId(String bucketName, String key) { return getObjectMetadata(bucketName,key).getSSEAwsKmsKeyId(); } - - protected void showdownTransferPool(boolean hard) { - log.debug("Initiating transfer manager shutdown (hard={})", hard); - if( hard ) { - transferPool.shutdownNow(); - } - else { - // await pool completion - transferPool.shutdown(); - final String waitMsg = "[AWS S3] Waiting files transfer to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before FileTransfer thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(transferPool, Duration.of("1h"), waitMsg, exitMsg); - } - } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java index 56454b186a..eed9e3cda7 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java @@ -665,22 +665,4 @@ static synchronized ExecutorService getOrCreateExecutor(int maxThreads) { return executorSingleton; } - /** - * Shutdown the executor and clear the singleton - */ - static void shutdownExecutor(boolean hard) { - if( hard ) { - executorSingleton.shutdownNow(); - } - else { - executorSingleton.shutdown(); - log.trace("Uploader await completion"); - final String waitMsg = "[AWS S3] Waiting stream uploader to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before stream uploader thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(executorSingleton, Duration.of("1h") ,waitMsg, exitMsg); - log.trace("Uploader shutdown completed"); - executorSingleton = null; - } - } - }