From 55f1b40a72900b4fde7eba89e674e1c9dd0d721b Mon Sep 17 00:00:00 2001 From: David Turner Date: Sun, 18 Aug 2024 20:29:41 +0100 Subject: [PATCH] Improve interrupt handling in tests The test utilities `waitUntil()`, `indexRandom()`, `startInParallel()` and `runInParallel()` all declare `InterruptedException` amongst the checked exceptions they throw, but in practice there's nothing useful to do with such an exception except to fail the test. With this change we handle the interrupt within the utility methods instead, avoiding exception-handling noise in callers. --- .../node/tasks/CancellableTasksTests.java | 14 ++- .../cluster/node/tasks/TestTaskPlugin.java | 20 ++--- .../elasticsearch/test/ESIntegTestCase.java | 50 +++++------ .../org/elasticsearch/test/ESTestCase.java | 41 +++++---- .../test/InternalTestCluster.java | 6 +- .../SearchableSnapshotsIntegTests.java | 8 +- .../SessionFactoryLoadBalancingTests.java | 85 +++++++++---------- 7 files changed, 101 insertions(+), 123 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java index e541fef65a0f9..64b9b4f0b69d8 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/CancellableTasksTests.java @@ -158,15 +158,11 @@ protected NodeResponse nodeOperation(CancellableNodeRequest request, Task task) if (shouldBlock) { // Simulate a job that takes forever to finish // Using periodic checks method to identify that the task was cancelled - try { - waitUntil(() -> { - ((CancellableTask) task).ensureNotCancelled(); - return false; - }); - fail("It should have thrown an exception"); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } + waitUntil(() -> { + ((CancellableTask) task).ensureNotCancelled(); + return false; + }); + fail("It should have thrown an exception"); } debugDelay("op4"); diff --git a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java index 16392b3f59baa..903ecfe2b2aa7 100644 --- a/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java +++ b/server/src/test/java/org/elasticsearch/action/admin/cluster/node/tasks/TestTaskPlugin.java @@ -283,16 +283,12 @@ protected void doExecute(Task task, NodesRequest request, ActionListener { - if (((CancellableTask) task).isCancelled()) { - throw new RuntimeException("Cancelled!"); - } - return ((TestTask) task).isBlocked() == false; - }); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - } + waitUntil(() -> { + if (((CancellableTask) task).isCancelled()) { + throw new RuntimeException("Cancelled!"); + } + return ((TestTask) task).isBlocked() == false; + }); } logger.info("Test task finished on the node {}", clusterService.localNode()); return new NodeResponse(clusterService.localNode()); @@ -301,9 +297,7 @@ protected NodeResponse nodeOperation(NodeRequest request, Task task) { public static class UnblockTestTaskResponse implements Writeable { - UnblockTestTaskResponse() { - - } + UnblockTestTaskResponse() {} UnblockTestTaskResponse(StreamInput in) {} diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java index aad3dcc457241..138bae5c8c876 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESIntegTestCase.java @@ -1197,23 +1197,19 @@ public static List> findTasks(Cl @Nullable public static DiscoveryNode waitAndGetHealthNode(InternalTestCluster internalCluster) { DiscoveryNode[] healthNode = new DiscoveryNode[1]; - try { - waitUntil(() -> { - ClusterState state = internalCluster.client() - .admin() - .cluster() - .prepareState() - .clear() - .setMetadata(true) - .setNodes(true) - .get() - .getState(); - healthNode[0] = HealthNode.findHealthNode(state); - return healthNode[0] != null; - }, 15, TimeUnit.SECONDS); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } + waitUntil(() -> { + ClusterState state = internalCluster.client() + .admin() + .cluster() + .prepareState() + .clear() + .setMetadata(true) + .setNodes(true) + .get() + .getState(); + healthNode[0] = HealthNode.findHealthNode(state); + return healthNode[0] != null; + }, 15, TimeUnit.SECONDS); return healthNode[0]; } @@ -1645,7 +1641,7 @@ protected static IndicesAdminClient indicesAdmin() { return admin().indices(); } - public void indexRandom(boolean forceRefresh, String index, int numDocs) throws InterruptedException { + public void indexRandom(boolean forceRefresh, String index, int numDocs) { IndexRequestBuilder[] builders = new IndexRequestBuilder[numDocs]; for (int i = 0; i < builders.length; i++) { builders[i] = prepareIndex(index).setSource("field", "value"); @@ -1656,11 +1652,11 @@ public void indexRandom(boolean forceRefresh, String index, int numDocs) throws /** * Convenience method that forwards to {@link #indexRandom(boolean, List)}. */ - public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) throws InterruptedException { + public void indexRandom(boolean forceRefresh, IndexRequestBuilder... builders) { indexRandom(forceRefresh, Arrays.asList(builders)); } - public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexRequestBuilder... builders) throws InterruptedException { + public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexRequestBuilder... builders) { indexRandom(forceRefresh, dummyDocuments, Arrays.asList(builders)); } @@ -1679,7 +1675,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, IndexReque * @param builders the documents to index. * @see #indexRandom(boolean, boolean, java.util.List) */ - public void indexRandom(boolean forceRefresh, List builders) throws InterruptedException { + public void indexRandom(boolean forceRefresh, List builders) { indexRandom(forceRefresh, forceRefresh, builders); } @@ -1695,7 +1691,7 @@ public void indexRandom(boolean forceRefresh, List builders * all documents are indexed. This is useful to produce deleted documents on the server side. * @param builders the documents to index. */ - public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) throws InterruptedException { + public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) { indexRandom(forceRefresh, dummyDocuments, true, builders); } @@ -1712,8 +1708,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, List builders) - throws InterruptedException { + public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean maybeFlush, List builders) { Random random = random(); Set indices = new HashSet<>(); builders = new ArrayList<>(builders); @@ -1771,7 +1766,7 @@ public void indexRandom(boolean forceRefresh, boolean dummyDocuments, boolean ma } } for (CountDownLatch operation : inFlightAsyncOperations) { - operation.await(); + safeAwait(operation); } final List actualErrors = new ArrayList<>(); for (Tuple tuple : errors) { @@ -1839,8 +1834,7 @@ private static CountDownLatch newLatch(List latches) { /** * Maybe refresh, force merge, or flush then always make sure there aren't too many in flight async operations. */ - private void postIndexAsyncActions(String[] indices, List inFlightAsyncOperations, boolean maybeFlush) - throws InterruptedException { + private void postIndexAsyncActions(String[] indices, List inFlightAsyncOperations, boolean maybeFlush) { if (rarely()) { if (rarely()) { indicesAdmin().prepareRefresh(indices) @@ -1860,7 +1854,7 @@ private void postIndexAsyncActions(String[] indices, List inFlig } while (inFlightAsyncOperations.size() > MAX_IN_FLIGHT_ASYNC_INDEXES) { int waitFor = between(0, inFlightAsyncOperations.size() - 1); - inFlightAsyncOperations.remove(waitFor).await(); + safeAwait(inFlightAsyncOperations.remove(waitFor)); } } diff --git a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java index 08709ff6459ce..58487d6552bcd 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/test/ESTestCase.java @@ -213,6 +213,7 @@ import static org.hamcrest.Matchers.emptyCollectionOf; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.in; import static org.hamcrest.Matchers.startsWith; /** @@ -1420,9 +1421,8 @@ public static void assertBusy(CheckedRunnable codeBlock, long maxWait * * @param breakSupplier determines whether to return immediately or continue waiting. * @return the last value returned by breakSupplier - * @throws InterruptedException if any sleep calls were interrupted. */ - public static boolean waitUntil(BooleanSupplier breakSupplier) throws InterruptedException { + public static boolean waitUntil(BooleanSupplier breakSupplier) { return waitUntil(breakSupplier, 10, TimeUnit.SECONDS); } @@ -1438,9 +1438,8 @@ public static boolean waitUntil(BooleanSupplier breakSupplier) throws Interrupte * @param maxWaitTime the maximum amount of time to wait * @param unit the unit of tie for maxWaitTime * @return the last value returned by breakSupplier - * @throws InterruptedException if any sleep calls were interrupted. */ - public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) throws InterruptedException { + public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, TimeUnit unit) { long maxTimeInMillis = TimeUnit.MILLISECONDS.convert(maxWaitTime, unit); long timeInMillis = 1; long sum = 0; @@ -1448,12 +1447,12 @@ public static boolean waitUntil(BooleanSupplier breakSupplier, long maxWaitTime, if (breakSupplier.getAsBoolean()) { return true; } - Thread.sleep(timeInMillis); + safeSleep(timeInMillis); sum += timeInMillis; timeInMillis = Math.min(AWAIT_BUSY_THRESHOLD, timeInMillis * 2); } timeInMillis = maxTimeInMillis - sum; - Thread.sleep(Math.max(timeInMillis, 0)); + safeSleep(Math.max(timeInMillis, 0)); return breakSupplier.getAsBoolean(); } @@ -2505,7 +2504,7 @@ public static T expectThrows(Class expectedType, Reques * Same as {@link #runInParallel(int, IntConsumer)} but also attempts to start all tasks at the same time by blocking execution on a * barrier until all threads are started and ready to execute their task. */ - public static void startInParallel(int numberOfTasks, IntConsumer taskFactory) throws InterruptedException { + public static void startInParallel(int numberOfTasks, IntConsumer taskFactory) { final CyclicBarrier barrier = new CyclicBarrier(numberOfTasks); runInParallel(numberOfTasks, i -> { safeAwait(barrier); @@ -2519,7 +2518,7 @@ public static void startInParallel(int numberOfTasks, IntConsumer taskFactory) t * @param numberOfTasks number of tasks to run in parallel * @param taskFactory task factory */ - public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) throws InterruptedException { + public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) { final ArrayList> futures = new ArrayList<>(numberOfTasks); final Thread[] threads = new Thread[numberOfTasks - 1]; for (int i = 0; i < numberOfTasks; i++) { @@ -2534,16 +2533,26 @@ public static void runInParallel(int numberOfTasks, IntConsumer taskFactory) thr threads[i].start(); } } - for (Thread thread : threads) { - thread.join(); - } Exception e = null; - for (Future future : futures) { - try { - future.get(); - } catch (Exception ex) { - e = ExceptionsHelper.useOrSuppress(e, ex); + try { + for (Thread thread : threads) { + // no sense in waiting for the rest of the threads, nor any futures, if interrupted, just bail out and fail + thread.join(); + } + for (Future future : futures) { + try { + future.get(); + } catch (InterruptedException interruptedException) { + // no sense in waiting for the rest of the futures if interrupted, just bail out and fail + Thread.currentThread().interrupt(); + throw interruptedException; + } catch (Exception executionException) { + e = ExceptionsHelper.useOrSuppress(e, executionException); + } } + } catch (InterruptedException interruptedException) { + Thread.currentThread().interrupt(); + e = ExceptionsHelper.useOrSuppress(e, interruptedException); } if (e != null) { throw new AssertionError(e); diff --git a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java index 0b69245177c7a..332df7123fd1b 100644 --- a/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java +++ b/test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java @@ -1744,11 +1744,7 @@ private synchronized void startAndPublishNodesAndClients(List nod .filter(nac -> nodes.containsKey(nac.name) == false) // filter out old masters .count(); rebuildUnicastHostFiles(nodeAndClients); // ensure that new nodes can find the existing nodes when they start - try { - runInParallel(nodeAndClients.size(), i -> nodeAndClients.get(i).startNode()); - } catch (InterruptedException e) { - throw new AssertionError("interrupted while starting nodes", e); - } + runInParallel(nodeAndClients.size(), i -> nodeAndClients.get(i).startNode()); nodeAndClients.forEach(this::publishNode); if (autoManageMasterNodes && newMasters > 0) { diff --git a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java index 56aec13cbab29..c99f2be0a6cad 100644 --- a/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java +++ b/x-pack/plugin/searchable-snapshots/src/internalClusterTest/java/org/elasticsearch/xpack/searchablesnapshots/SearchableSnapshotsIntegTests.java @@ -371,12 +371,8 @@ public void testCanMountSnapshotTakenWhileConcurrentlyIndexing() throws Exceptio for (int i = between(10, 10_000); i >= 0; i--) { indexRequestBuilders.add(prepareIndex(indexName).setSource("foo", randomBoolean() ? "bar" : "baz")); } - try { - safeAwait(cyclicBarrier); - indexRandom(true, true, indexRequestBuilders); - } catch (InterruptedException e) { - throw new AssertionError(e); - } + safeAwait(cyclicBarrier); + indexRandom(true, true, indexRequestBuilders); refresh(indexName); assertThat( indicesAdmin().prepareForceMerge(indexName).setOnlyExpungeDeletes(true).setFlush(true).get().getFailedShards(), diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java index 466d0e3428d50..6abf6c81b673e 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ldap/support/SessionFactoryLoadBalancingTests.java @@ -401,59 +401,52 @@ private PortBlockingRunnable( public void run() { final List openedSockets = new ArrayList<>(); final List failedAddresses = new ArrayList<>(); - try { - final boolean allSocketsOpened = waitUntil(() -> { - try { - final InetAddress[] allAddresses; - if (serverAddress instanceof Inet4Address) { - allAddresses = NetworkUtils.getAllIPV4Addresses(); - } else { - allAddresses = NetworkUtils.getAllIPV6Addresses(); - } - final List inetAddressesToBind = Arrays.stream(allAddresses) - .filter(addr -> openedSockets.stream().noneMatch(s -> addr.equals(s.getLocalAddress()))) - .filter(addr -> failedAddresses.contains(addr) == false) - .collect(Collectors.toList()); - for (InetAddress localAddress : inetAddressesToBind) { - try { - final Socket socket = openMockSocket(serverAddress, serverPort, localAddress, portToBind); - openedSockets.add(socket); - logger.debug("opened socket [{}]", socket); - } catch (NoRouteToHostException | ConnectException e) { - logger.debug(() -> "marking address [" + localAddress + "] as failed due to:", e); - failedAddresses.add(localAddress); - } - } - if (openedSockets.size() == 0) { - logger.debug("Could not open any sockets from the available addresses"); - return false; + + final boolean allSocketsOpened = waitUntil(() -> { + try { + final InetAddress[] allAddresses; + if (serverAddress instanceof Inet4Address) { + allAddresses = NetworkUtils.getAllIPV4Addresses(); + } else { + allAddresses = NetworkUtils.getAllIPV6Addresses(); + } + final List inetAddressesToBind = Arrays.stream(allAddresses) + .filter(addr -> openedSockets.stream().noneMatch(s -> addr.equals(s.getLocalAddress()))) + .filter(addr -> failedAddresses.contains(addr) == false) + .collect(Collectors.toList()); + for (InetAddress localAddress : inetAddressesToBind) { + try { + final Socket socket = openMockSocket(serverAddress, serverPort, localAddress, portToBind); + openedSockets.add(socket); + logger.debug("opened socket [{}]", socket); + } catch (NoRouteToHostException | ConnectException e) { + logger.debug(() -> "marking address [" + localAddress + "] as failed due to:", e); + failedAddresses.add(localAddress); } - return true; - } catch (IOException e) { - logger.debug(() -> "caught exception while opening socket on [" + portToBind + "]", e); + } + if (openedSockets.size() == 0) { + logger.debug("Could not open any sockets from the available addresses"); return false; } - }); - - if (allSocketsOpened) { - latch.countDown(); - } else { - success.set(false); - IOUtils.closeWhileHandlingException(openedSockets); - openedSockets.clear(); - latch.countDown(); - return; + return true; + } catch (IOException e) { + logger.debug(() -> "caught exception while opening socket on [" + portToBind + "]", e); + return false; } - } catch (InterruptedException e) { - logger.debug(() -> "interrupted while trying to open sockets on [" + portToBind + "]", e); - Thread.currentThread().interrupt(); + }); + + if (allSocketsOpened) { + latch.countDown(); + } else { + success.set(false); + IOUtils.closeWhileHandlingException(openedSockets); + openedSockets.clear(); + latch.countDown(); + return; } try { - closeLatch.await(); - } catch (InterruptedException e) { - logger.debug("caught exception while waiting for close latch", e); - Thread.currentThread().interrupt(); + safeAwait(closeLatch); } finally { logger.debug("closing sockets on [{}]", portToBind); IOUtils.closeWhileHandlingException(openedSockets);