From f6763bae82a56d21e765db8b579bcd0064d2e64a Mon Sep 17 00:00:00 2001 From: Ben Sherman Date: Thu, 5 Dec 2024 10:30:05 -0600 Subject: [PATCH] Fail the run if publish thread pool times out Signed-off-by: Ben Sherman --- .../src/main/groovy/nextflow/Session.groovy | 14 +++++++++++--- .../nextflow/util/ThreadPoolHelper.groovy | 9 ++++----- .../nextflow/util/ThreadPoolManager.groovy | 15 +++++++++------ .../cloud/aws/batch/AwsBatchExecutor.groovy | 5 +++-- .../main/nextflow/cloud/aws/nio/S3Client.java | 14 -------------- .../nextflow/cloud/aws/nio/S3OutputStream.java | 18 ------------------ 6 files changed, 27 insertions(+), 48 deletions(-) diff --git a/modules/nextflow/src/main/groovy/nextflow/Session.groovy b/modules/nextflow/src/main/groovy/nextflow/Session.groovy index f394245259..1432f53051 100644 --- a/modules/nextflow/src/main/groovy/nextflow/Session.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/Session.groovy @@ -682,15 +682,23 @@ class Session implements ISession { if( !aborted ) { joinAllOperators() log.trace "Session > all operators finished" + final finalizerComplete = finalizePoolManager?.shutdown(false) + final publisherComplete = publishPoolManager?.shutdown(false) + if( !finalizerComplete || !publisherComplete ) { + final failOnIncomplete = config.navigate('workflow.output.ignoreErrors') + if( failOnIncomplete ) + throw new AbortOperationException("Timed out while waiting to publish outputs") + } + } + else { + finalizePoolManager?.shutdown(true) + publishPoolManager?.shutdown(true) } } void destroy() { try { log.trace "Session > destroying" - // shutdown thread pools - finalizePoolManager?.shutdown(aborted) - publishPoolManager?.shutdown(aborted) // invoke shutdown callbacks shutdown0() log.trace "Session > after cleanup" diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy index 12c736bd60..7e5ba62c35 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolHelper.groovy @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer @Slf4j class ThreadPoolHelper { - static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) { + static boolean await(ExecutorService pool, Duration maxAwait, String waitMessage) { final max = maxAwait.millis final t0 = System.currentTimeMillis() // wait for ongoing file transfer to complete @@ -45,10 +45,8 @@ class ThreadPoolHelper { break final delta = System.currentTimeMillis()-t0 - if( delta > max ) { - log.warn(exitMsg) - break - } + if( delta > max ) + return false // log to console every 10 minutes (120 * 5 sec) if( count % 120 == 0 ) { @@ -61,6 +59,7 @@ class ThreadPoolHelper { // increment the count count++ } + return true } static protected int pending(ExecutorService pool) { diff --git a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy index d1ea726f74..96af1ec1b6 100644 --- a/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy +++ b/modules/nextflow/src/main/groovy/nextflow/util/ThreadPoolManager.groovy @@ -109,24 +109,27 @@ class ThreadPoolManager { return result } - void shutdown(ISession session) { + boolean shutdown(ISession session) { final sess = (Session) session - shutdown( sess != null && sess.aborted ) + return shutdown( sess != null && sess.aborted ) } - void shutdown(boolean hard) { + boolean shutdown(boolean hard) { if( !executorService ) - return + return true if( hard ) { executorService.shutdownNow() - return + return true } executorService.shutdown() // wait for remaining threads to complete - ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg) + final complete = ThreadPoolHelper.await(executorService, maxAwait, waitMsg) + if( !complete ) + log.warn exitMsg log.debug "Thread pool '$name' shutdown completed (hard=$hard)" + return complete } static ExecutorService create(String name, int maxThreads=0) { diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy index 1ce8875521..f026c2730a 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/batch/AwsBatchExecutor.groovy @@ -319,8 +319,9 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec // start shutdown process reaper.shutdown() final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)" - final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" - ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg) + final complete = ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg) + if( !complete ) + log.warn "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated" } @Override diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java index 9661afd657..25aadbb2c8 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java @@ -660,18 +660,4 @@ public void uploadDirectory(File source, S3Path target) { String getObjectKmsKeyId(String bucketName, String key) { return getObjectMetadata(bucketName,key).getSSEAwsKmsKeyId(); } - - protected void showdownTransferPool(boolean hard) { - log.debug("Initiating transfer manager shutdown (hard={})", hard); - if( hard ) { - transferPool.shutdownNow(); - } - else { - // await pool completion - transferPool.shutdown(); - final String waitMsg = "[AWS S3] Waiting files transfer to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before FileTransfer thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(transferPool, Duration.of("1h"), waitMsg, exitMsg); - } - } } diff --git a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java index 56454b186a..eed9e3cda7 100644 --- a/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java +++ b/plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3OutputStream.java @@ -665,22 +665,4 @@ static synchronized ExecutorService getOrCreateExecutor(int maxThreads) { return executorSingleton; } - /** - * Shutdown the executor and clear the singleton - */ - static void shutdownExecutor(boolean hard) { - if( hard ) { - executorSingleton.shutdownNow(); - } - else { - executorSingleton.shutdown(); - log.trace("Uploader await completion"); - final String waitMsg = "[AWS S3] Waiting stream uploader to complete (%d files)"; - final String exitMsg = "[AWS S3] Exiting before stream uploader thread pool complete -- Some files maybe lost"; - ThreadPoolHelper.await(executorSingleton, Duration.of("1h") ,waitMsg, exitMsg); - log.trace("Uploader shutdown completed"); - executorSingleton = null; - } - } - }