From f6eead722efab4a4cd749ef301a8b671378258b9 Mon Sep 17 00:00:00 2001 From: jaymode Date: Fri, 31 May 2019 13:07:51 -0600 Subject: [PATCH] Do not close threadpool if termination fails This commit changes the code so that the threadpool is not closed unless termination succeeds. Otherwise there can still be running tasks that rely on resources that are closed by closing the threadpool. Additionally, there is a test fix included for the NodeTests that ensures the submitted task is actually running prior to closing the node in the test. Closes #42577 --- .../client/transport/TransportClient.java | 9 +++++++- .../elasticsearch/threadpool/ThreadPool.java | 23 ++++++++++++------- .../org/elasticsearch/node/NodeTests.java | 9 +++++++- .../elasticsearch/test/client/NoOpClient.java | 7 +++++- 4 files changed, 37 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java index 4c2f4932de2f2..163fa69712668 100644 --- a/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java +++ b/server/src/main/java/org/elasticsearch/client/transport/TransportClient.java @@ -374,7 +374,14 @@ public void close() { for (LifecycleComponent plugin : pluginLifecycleComponents) { closeables.add(plugin); } - closeables.add(() -> ThreadPool.terminate(injector.getInstance(ThreadPool.class), 10, TimeUnit.SECONDS)); + closeables.add(() -> { + final ThreadPool pool = injector.getInstance(ThreadPool.class); + final boolean terminated = ThreadPool.terminate(pool, 10, TimeUnit.SECONDS); + if (terminated == false) { + // the pool is only closed if termination succeeds, just close even if termination failed + pool.close(); + } + }); IOUtils.closeWhileHandlingException(closeables); } diff --git a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java index a72b66de52845..dc69f042d245b 100644 --- a/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java +++ b/server/src/main/java/org/elasticsearch/threadpool/ThreadPool.java @@ -412,6 +412,7 @@ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedE } } cachedTimeThread.join(unit.toMillis(timeout)); + result &= cachedTimeThread.isAlive() == false; return result; } @@ -699,22 +700,28 @@ private static boolean awaitTermination( /** * Returns true if the given pool was terminated successfully. If the termination timed out, - * the service is null this method will return false. + * the service is null this method will return false. The pool is only closed if + * the termination was successful. */ public static boolean terminate(ThreadPool pool, long timeout, TimeUnit timeUnit) { + boolean terminated = false; if (pool != null) { - // Leverage try-with-resources to close the threadpool - try (ThreadPool c = pool) { + try { pool.shutdown(); if (awaitTermination(pool, timeout, timeUnit)) { - return true; + terminated = true; + } else { + // last resort + pool.shutdownNow(); + terminated = awaitTermination(pool, timeout, timeUnit); + } + } finally { + if (terminated) { + pool.close(); } - // last resort - pool.shutdownNow(); - return awaitTermination(pool, timeout, timeUnit); } } - return false; + return terminated; } private static boolean awaitTermination( diff --git a/server/src/test/java/org/elasticsearch/node/NodeTests.java b/server/src/test/java/org/elasticsearch/node/NodeTests.java index a5653eb88e176..a30ba5d2633f4 100644 --- a/server/src/test/java/org/elasticsearch/node/NodeTests.java +++ b/server/src/test/java/org/elasticsearch/node/NodeTests.java @@ -50,7 +50,6 @@ import static org.elasticsearch.cluster.metadata.IndexMetaData.SETTING_NUMBER_OF_SHARDS; import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked; -@LuceneTestCase.AwaitsFix(bugUrl = "https://github.com/elastic/elasticsearch/issues/42577") @LuceneTestCase.SuppressFileSystems(value = "ExtrasFS") public class NodeTests extends ESTestCase { @@ -154,9 +153,12 @@ public void testCloseOnOutstandingTask() throws Exception { node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); AtomicBoolean shouldRun = new AtomicBoolean(true); + CountDownLatch threadRunning = new CountDownLatch(1); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); while (shouldRun.get()); }); + threadRunning.await(); node.close(); shouldRun.set(false); assertTrue(node.awaitClose(1, TimeUnit.DAYS)); @@ -167,12 +169,17 @@ public void testAwaitCloseTimeoutsOnNonInterruptibleTask() throws Exception { node.start(); ThreadPool threadpool = node.injector().getInstance(ThreadPool.class); AtomicBoolean shouldRun = new AtomicBoolean(true); + CountDownLatch threadRunning = new CountDownLatch(1); threadpool.executor(ThreadPool.Names.SEARCH).execute(() -> { + threadRunning.countDown(); while (shouldRun.get()); }); + threadRunning.await(); node.close(); assertFalse(node.awaitClose(0, TimeUnit.MILLISECONDS)); shouldRun.set(false); + // call this again to ensure we terminate and close the threadpool + assertTrue(node.awaitClose(1, TimeUnit.DAYS)); } public void testCloseOnInterruptibleTask() throws Exception { diff --git a/test/framework/src/main/java/org/elasticsearch/test/client/NoOpClient.java b/test/framework/src/main/java/org/elasticsearch/test/client/NoOpClient.java index d95e1d32663e7..7cec6fb5b0382 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/client/NoOpClient.java +++ b/test/framework/src/main/java/org/elasticsearch/test/client/NoOpClient.java @@ -57,10 +57,15 @@ void doExecute(Action action, Request request, ActionListener