From 1e516714d63a0b8e688c7aee5452b4db44aeac3d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Osipiuk?= Date: Thu, 22 Sep 2022 16:42:40 +0200 Subject: [PATCH] Reset allowed-no-matching-node-period timeout on new nodes in cluster We allow for situation when there are no nodes in the cluster which match task requirement for timeout governed by node-scheduler.allowed-no-matching-node-period config variable. Previously if new node appeared in the cluster, but could not be used because it did not have enough free resources at given moment timeout counter was not reset. This commit changes the behaviour. If new node appears and then disappears again, the no-nodes timeout will be recounted again from zero. --- .../BinPackingNodeAllocatorService.java | 6 +++ .../TestBinPackingNodeAllocator.java | 45 +++++++++++++++++++ 2 files changed, 51 insertions(+) diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 6a220213cea7..49d01a341fd2 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -247,6 +247,7 @@ synchronized void processPendingAcquires() iterator.remove(); break; case NOT_ENOUGH_RESOURCES_NOW: + pendingAcquire.resetNoMatchingNodeFound(); break; // nothing to be done default: throw new IllegalArgumentException("unknown status: " + result.getStatus()); @@ -339,6 +340,11 @@ public Duration markNoMatchingNodeFound() } return noMatchingNodeStopwatch.elapsed(); } + + public void resetNoMatchingNodeFound() + { + noMatchingNodeStopwatch.reset(); + } } private class BinPackingNodeLease diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index 4809abb676e1..e85c1eed5761 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -45,6 +45,7 @@ import static io.trino.testing.TestingHandles.createTestCatalogHandle; import static io.trino.testing.TestingSession.testSessionBuilder; import static java.time.temporal.ChronoUnit.MINUTES; +import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -349,6 +350,50 @@ public void testNoMatchingNodeAvailable() } } + @Test(timeOut = TEST_TIMEOUT) + public void testNoMatchingNodeAvailableTimeoutReset() + { + InMemoryNodeManager nodeManager = new InMemoryNodeManager(); + setupNodeAllocatorService(nodeManager); + + try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { + // request a node with specific catalog (not present) + NodeAllocator.NodeLease acquireNoMatching1 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + NodeAllocator.NodeLease acquireNoMatching2 = nodeAllocator.acquire(REQ_CATALOG_1, DataSize.of(64, GIGABYTE)); + assertNotAcquired(acquireNoMatching1); + assertNotAcquired(acquireNoMatching2); + + // wait for a while and add a node + ticker.increment(30, TimeUnit.SECONDS); // past 1 minute timeout + nodeManager.addNodes(NODE_2); + + // only one of the leases should be completed but timeout counter for period where no nodes + // are available should be reset for the other one + nodeAllocatorService.processPendingAcquires(); + assertThat(acquireNoMatching1.getNode().isDone() != acquireNoMatching2.getNode().isDone()) + .describedAs("exactly one of pending acquires should be completed") + .isTrue(); + + NodeAllocator.NodeLease theAcquireLease = acquireNoMatching1.getNode().isDone() ? acquireNoMatching1 : acquireNoMatching2; + NodeAllocator.NodeLease theNotAcquireLease = acquireNoMatching1.getNode().isDone() ? acquireNoMatching2 : acquireNoMatching1; + + // remove the node - we are again in situation where no matching nodes exist in cluster + nodeManager.removeNode(NODE_2); + theAcquireLease.release(); + nodeAllocatorService.processPendingAcquires(); + assertNotAcquired(theNotAcquireLease); + + ticker.increment(59, TimeUnit.SECONDS); // still below 1m timeout as the reset happened in previous step + nodeAllocatorService.processPendingAcquires(); + assertNotAcquired(theNotAcquireLease); + + ticker.increment(2, TimeUnit.SECONDS); + nodeAllocatorService.processPendingAcquires(); + assertThatThrownBy(() -> Futures.getUnchecked(theNotAcquireLease.getNode())) + .hasMessageContaining("No nodes available to run query"); + } + } + @Test(timeOut = TEST_TIMEOUT) public void testRemoveAcquiredNode() {