diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 6a220213cea7..49d01a341fd2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -247,6 +247,7 @@ synchronized void processPendingAcquires() iterator.remove(); break; case NOT_ENOUGH_RESOURCES_NOW: + pendingAcquire.resetNoMatchingNodeFound(); break; // nothing to be done default: throw new IllegalArgumentException("unknown status: " + result.getStatus()); @@ -339,6 +340,11 @@ public Duration markNoMatchingNodeFound() } return noMatchingNodeStopwatch.elapsed(); } + + public void resetNoMatchingNodeFound() + { + noMatchingNodeStopwatch.reset(); + } } private class BinPackingNodeLease diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index 4809abb676e1..e85c1eed5761 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -45,6 +45,7 @@ import static io.trino.testing.TestingHandles.createTestCatalogHandle; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.time.temporal.ChronoUnit.MINUTES; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -349,6 +350,50 @@ public void testNoMatchingNodeAvailable() } } + @Test(timeOut = TEST_TIMEOUT) + public void testNoMatchingNodeAvailableTimeoutReset() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + // request a node with specific catalog (not present) + NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + assertNotAcquired(acquireNoMatching1); + assertNotAcquired(acquireNoMatching2); + + // wait for a while and add a node + ticker.increment(30, TimeUnit.SECONDS); // past 1 minute timeout + nodeManager.addNodes(NODE_2); + + // only one of the leases should be completed but timeout counter for period where no nodes + // are available should be reset for the other one + nodeAllocatorService.processPendingAcquires(); + assertThat(acquireNoMatching1.getNode().isDone() != acquireNoMatching2.getNode().isDone()) + .describedAs("exactly one of pending acquires should be completed") + .isTrue(); + + NodeAllocator.NodeLease theAcquireLease = acquireNoMatching1.getNode().isDone() ? acquireNoMatching1 : acquireNoMatching2; + NodeAllocator.NodeLease theNotAcquireLease = acquireNoMatching1.getNode().isDone() ? acquireNoMatching2 : acquireNoMatching1; + + // remove the node - we are again in situation where no matching nodes exist in cluster + nodeManager.removeNode(NODE_2); + theAcquireLease.release(); + nodeAllocatorService.processPendingAcquires(); + assertNotAcquired(theNotAcquireLease); + + ticker.increment(59, TimeUnit.SECONDS); // still below 1m timeout as the reset happened in previous step + nodeAllocatorService.processPendingAcquires(); + assertNotAcquired(theNotAcquireLease); + + ticker.increment(2, TimeUnit.SECONDS); + nodeAllocatorService.processPendingAcquires(); + assertThatThrownBy(() -> Futures.getUnchecked(theNotAcquireLease.getNode())) + .hasMessageContaining("No nodes available to run query"); + } + } + @Test(timeOut = TEST_TIMEOUT) public void testRemoveAcquiredNode() {