Skip to content

Commit

Permalink
Fail the run if publish thread pool times out
Browse files Browse the repository at this point in the history
Signed-off-by: Ben Sherman <[email protected]>
  • Loading branch information
bentsherman committed Dec 5, 2024
1 parent 5c5a810 commit f6763ba
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 48 deletions.
14 changes: 11 additions & 3 deletions modules/nextflow/src/main/groovy/nextflow/Session.groovy
Original file line number Diff line number Diff line change
Expand Up @@ -682,15 +682,23 @@ class Session implements ISession {
if( !aborted ) {
joinAllOperators()
log.trace "Session > all operators finished"
final finalizerComplete = finalizePoolManager?.shutdown(false)
final publisherComplete = publishPoolManager?.shutdown(false)
if( !finalizerComplete || !publisherComplete ) {
final failOnIncomplete = config.navigate('workflow.output.ignoreErrors')
if( failOnIncomplete )
throw new AbortOperationException("Timed out while waiting to publish outputs")
}
}
else {
finalizePoolManager?.shutdown(true)
publishPoolManager?.shutdown(true)
}
}

void destroy() {
try {
log.trace "Session > destroying"
// shutdown thread pools
finalizePoolManager?.shutdown(aborted)
publishPoolManager?.shutdown(aborted)
// invoke shutdown callbacks
shutdown0()
log.trace "Session > after cleanup"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import jdk.internal.vm.ThreadContainer
@Slf4j
class ThreadPoolHelper {

static void await(ExecutorService pool, Duration maxAwait, String waitMessage, String exitMsg) {
static boolean await(ExecutorService pool, Duration maxAwait, String waitMessage) {
final max = maxAwait.millis
final t0 = System.currentTimeMillis()
// wait for ongoing file transfer to complete
Expand All @@ -45,10 +45,8 @@ class ThreadPoolHelper {
break

final delta = System.currentTimeMillis()-t0
if( delta > max ) {
log.warn(exitMsg)
break
}
if( delta > max )
return false

// log to console every 10 minutes (120 * 5 sec)
if( count % 120 == 0 ) {
Expand All @@ -61,6 +59,7 @@ class ThreadPoolHelper {
// increment the count
count++
}
return true
}

static protected int pending(ExecutorService pool) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,24 +109,27 @@ class ThreadPoolManager {
return result
}

void shutdown(ISession session) {
boolean shutdown(ISession session) {
final sess = (Session) session
shutdown( sess != null && sess.aborted )
return shutdown( sess != null && sess.aborted )
}

void shutdown(boolean hard) {
boolean shutdown(boolean hard) {
if( !executorService )
return
return true

if( hard ) {
executorService.shutdownNow()
return
return true
}

executorService.shutdown()
// wait for remaining threads to complete
ThreadPoolHelper.await(executorService, maxAwait, waitMsg, exitMsg)
final complete = ThreadPoolHelper.await(executorService, maxAwait, waitMsg)
if( !complete )
log.warn exitMsg
log.debug "Thread pool '$name' shutdown completed (hard=$hard)"
return complete
}

static ExecutorService create(String name, int maxThreads=0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,8 +319,9 @@ class AwsBatchExecutor extends Executor implements ExtensionPoint, TaskArrayExec
// start shutdown process
reaper.shutdown()
final waitMsg = "[AWS BATCH] Waiting jobs reaper to complete (%d jobs to be terminated)"
final exitMsg = "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated"
ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg, exitMsg)
final complete = ThreadPoolHelper.await(reaper, Duration.of('60min'), waitMsg)
if( !complete )
log.warn "[AWS BATCH] Exiting before jobs reaper thread pool complete -- Some jobs may not be terminated"
}

@Override
Expand Down
14 changes: 0 additions & 14 deletions plugins/nf-amazon/src/main/nextflow/cloud/aws/nio/S3Client.java
Original file line number Diff line number Diff line change
Expand Up @@ -660,18 +660,4 @@ public void uploadDirectory(File source, S3Path target) {
String getObjectKmsKeyId(String bucketName, String key) {
return getObjectMetadata(bucketName,key).getSSEAwsKmsKeyId();
}

protected void showdownTransferPool(boolean hard) {
log.debug("Initiating transfer manager shutdown (hard={})", hard);
if( hard ) {
transferPool.shutdownNow();
}
else {
// await pool completion
transferPool.shutdown();
final String waitMsg = "[AWS S3] Waiting files transfer to complete (%d files)";
final String exitMsg = "[AWS S3] Exiting before FileTransfer thread pool complete -- Some files maybe lost";
ThreadPoolHelper.await(transferPool, Duration.of("1h"), waitMsg, exitMsg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -665,22 +665,4 @@ static synchronized ExecutorService getOrCreateExecutor(int maxThreads) {
return executorSingleton;
}

/**
* Shutdown the executor and clear the singleton
*/
static void shutdownExecutor(boolean hard) {
if( hard ) {
executorSingleton.shutdownNow();
}
else {
executorSingleton.shutdown();
log.trace("Uploader await completion");
final String waitMsg = "[AWS S3] Waiting stream uploader to complete (%d files)";
final String exitMsg = "[AWS S3] Exiting before stream uploader thread pool complete -- Some files maybe lost";
ThreadPoolHelper.await(executorSingleton, Duration.of("1h") ,waitMsg, exitMsg);
log.trace("Uploader shutdown completed");
executorSingleton = null;
}
}

}

0 comments on commit f6763ba

Please sign in to comment.