diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java index 6b62c15e7dc5..ada9aa00bc77 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/BinPackingNodeAllocatorService.java @@ -13,6 +13,8 @@ */ package io.trino.execution.scheduler; +import com.google.common.base.Stopwatch; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; @@ -42,6 +44,7 @@ import javax.annotation.concurrent.ThreadSafe; import javax.inject.Inject; +import java.time.Duration; import java.util.Deque; import java.util.HashMap; import java.util.HashSet; @@ -98,10 +101,12 @@ public class BinPackingNodeAllocatorService private final AtomicReference> maxNodePoolSize = new AtomicReference<>(Optional.empty()); private final boolean scheduleOnCoordinator; private final DataSize taskRuntimeMemoryEstimationOverhead; + private final Ticker ticker; private final ConcurrentMap allocatedMemory = new ConcurrentHashMap<>(); private final Deque pendingAcquires = new ConcurrentLinkedDeque<>(); private final Set fulfilledAcquires = newConcurrentHashSet(); + private final Duration allowedNoMatchingNodePeriod; @Inject public BinPackingNodeAllocatorService( @@ -113,7 +118,9 @@ public BinPackingNodeAllocatorService( this(nodeManager, requireNonNull(clusterMemoryManager, "clusterMemoryManager is null")::getWorkerMemoryInfo, nodeSchedulerConfig.isIncludeCoordinator(), - memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead()); + Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()), + memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(), + Ticker.systemTicker()); } @VisibleForTesting @@ -121,12 +128,16 @@ public BinPackingNodeAllocatorService( InternalNodeManager nodeManager, Supplier>> workerMemoryInfoSupplier, boolean scheduleOnCoordinator, - DataSize taskRuntimeMemoryEstimationOverhead) + Duration allowedNoMatchingNodePeriod, + DataSize taskRuntimeMemoryEstimationOverhead, + Ticker ticker) { this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null"); this.scheduleOnCoordinator = scheduleOnCoordinator; + this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null"); this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null"); + this.ticker = requireNonNull(ticker, "ticker is null"); } @PostConstruct @@ -225,6 +236,13 @@ synchronized void processPendingAcquires() iterator.remove(); break; case NONE_MATCHING: + Duration noMatchingNodePeriod = pendingAcquire.markNoMatchingNodeFound(); + + if (noMatchingNodePeriod.compareTo(allowedNoMatchingNodePeriod) <= 0) { + // wait some more time + break; + } + pendingAcquire.getFuture().setException(new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query")); iterator.remove(); break; @@ -251,7 +269,7 @@ public NodeAllocator getNodeAllocator(Session session) public NodeLease acquire(NodeRequirements requirements) { BinPackingNodeLease nodeLease = new BinPackingNodeLease(requirements.getMemory().toBytes()); - PendingAcquire pendingAcquire = new PendingAcquire(requirements, nodeLease); + PendingAcquire pendingAcquire = new PendingAcquire(requirements, nodeLease, ticker); pendingAcquires.add(pendingAcquire); wakeupProcessPendingAcquires(); return nodeLease; @@ -283,11 +301,13 @@ private static class PendingAcquire { private final NodeRequirements nodeRequirements; private final BinPackingNodeLease lease; + private final Stopwatch noMatchingNodeStopwatch; - private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease) + private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease, Ticker ticker) { this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null"); this.lease = requireNonNull(lease, "lease is null"); + this.noMatchingNodeStopwatch = Stopwatch.createUnstarted(ticker); } public NodeRequirements getNodeRequirements() @@ -309,6 +329,14 @@ public long getMemoryLease() { return nodeRequirements.getMemory().toBytes(); } + + public Duration markNoMatchingNodeFound() + { + if (!noMatchingNodeStopwatch.isRunning()) { + noMatchingNodeStopwatch.start(); + } + return noMatchingNodeStopwatch.elapsed(); + } } private class BinPackingNodeLease diff --git a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java index 58aa75844dd7..7da0bfdeee7d 100644 --- a/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java +++ b/core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSchedulerConfig.java @@ -17,12 +17,15 @@ import io.airlift.configuration.ConfigDescription; import io.airlift.configuration.DefunctConfig; import io.airlift.configuration.LegacyConfig; +import io.airlift.units.Duration; import javax.validation.constraints.DecimalMax; import javax.validation.constraints.DecimalMin; import javax.validation.constraints.Min; import javax.validation.constraints.NotNull; +import java.util.concurrent.TimeUnit; + import static java.util.Locale.ENGLISH; @DefunctConfig({"node-scheduler.location-aware-scheduling-enabled", "node-scheduler.multiple-tasks-per-node-enabled"}) @@ -48,6 +51,7 @@ public enum SplitsBalancingPolicy private int maxUnacknowledgedSplitsPerTask = 500; private int maxAbsoluteFullNodesPerQuery = Integer.MAX_VALUE; private double maxFractionFullNodesPerQuery = 0.5; + private Duration allowedNoMatchingNodePeriod = new Duration(2, TimeUnit.MINUTES); private NodeAllocatorType nodeAllocatorType = NodeAllocatorType.BIN_PACKING; @NotNull @@ -195,6 +199,19 @@ public double getMaxFractionFullNodesPerQuery() return maxFractionFullNodesPerQuery; } + @Config("node-scheduler.allowed-no-matching-node-period") + @ConfigDescription("How long scheduler should wait before failing a query for which hard task requirements (e.g. node exposing specific catalog) cannot be satisfied") + public NodeSchedulerConfig setAllowedNoMatchingNodePeriod(Duration allowedNoMatchingNodePeriod) + { + this.allowedNoMatchingNodePeriod = allowedNoMatchingNodePeriod; + return this; + } + + public Duration getAllowedNoMatchingNodePeriod() + { + return allowedNoMatchingNodePeriod; + } + public enum NodeAllocatorType { FIXED_COUNT, diff --git a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java index 3e97cd80afc8..b17eaf6b0ab9 100644 --- a/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java +++ b/core/trino-main/src/test/java/io/trino/execution/TestNodeSchedulerConfig.java @@ -14,6 +14,7 @@ package io.trino.execution; import com.google.common.collect.ImmutableMap; +import io.airlift.units.Duration; import io.trino.execution.scheduler.NodeSchedulerConfig; import org.testng.annotations.Test; @@ -24,6 +25,7 @@ import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults; import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM; import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE; +import static java.util.concurrent.TimeUnit.MINUTES; public class TestNodeSchedulerConfig { @@ -41,6 +43,7 @@ public void testDefaults() .setOptimizedLocalScheduling(true) .setMaxAbsoluteFullNodesPerQuery(Integer.MAX_VALUE) .setMaxFractionFullNodesPerQuery(0.5) + .setAllowedNoMatchingNodePeriod(new Duration(2, MINUTES)) .setNodeAllocatorType("bin_packing")); } @@ -58,6 +61,7 @@ public void testExplicitPropertyMappings() .put("node-scheduler.optimized-local-scheduling", "false") .put("node-scheduler.max-absolute-full-nodes-per-query", "17") .put("node-scheduler.max-fraction-full-nodes-per-query", "0.3") + .put("node-scheduler.allowed-no-matching-node-period", "1m") .put("node-scheduler.allocator-type", "fixed_count") .buildOrThrow(); @@ -72,6 +76,7 @@ public void testExplicitPropertyMappings() .setOptimizedLocalScheduling(false) .setMaxAbsoluteFullNodesPerQuery(17) .setMaxFractionFullNodesPerQuery(0.3) + .setAllowedNoMatchingNodePeriod(new Duration(1, MINUTES)) .setNodeAllocatorType("fixed_count"); assertFullMapping(properties, expected); diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java index 960db0102beb..deee63c8c038 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestBinPackingNodeAllocator.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.util.concurrent.Futures; +import io.airlift.testing.TestingTicker; import io.airlift.units.DataSize; import io.trino.Session; import io.trino.client.NodeVersion; @@ -32,18 +33,21 @@ import org.testng.annotations.Test; import java.net.URI; +import java.time.Duration; import java.util.Arrays; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; import static com.google.common.base.Preconditions.checkArgument; import static com.google.common.collect.ImmutableMap.toImmutableMap; import static io.airlift.concurrent.MoreFutures.getFutureValue; import static io.airlift.units.DataSize.Unit.GIGABYTE; import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.time.temporal.ChronoUnit.MINUTES; import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertFalse; @@ -82,6 +86,7 @@ public class TestBinPackingNodeAllocator private BinPackingNodeAllocatorService nodeAllocatorService; private ConcurrentHashMap> workerMemoryInfos; + private final TestingTicker ticker = new TestingTicker(); private void setupNodeAllocatorService(InMemoryNodeManager nodeManager) { @@ -103,7 +108,9 @@ private void setupNodeAllocatorService(InMemoryNodeManager nodeManager, DataSize nodeManager, () -> workerMemoryInfos, false, - taskRuntimeMemoryEstimationOverhead); + Duration.of(1, MINUTES), + taskRuntimeMemoryEstimationOverhead, + ticker); nodeAllocatorService.start(); } @@ -314,7 +321,14 @@ public void testNoMatchingNodeAvailable() try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) { // request a node with specific catalog (not present) - assertThatThrownBy(() -> Futures.getUnchecked(nodeAllocator.acquire(REQ_CATALOG_1_32.withMemory(DataSize.of(64, GIGABYTE))).getNode())) + NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1_32.withMemory(DataSize.of(64, GIGABYTE))); + assertNotAcquired(acquireNoMatching); + ticker.increment(59, TimeUnit.SECONDS); // still below timeout + nodeAllocatorService.processPendingAcquires(); + assertNotAcquired(acquireNoMatching); + ticker.increment(2, TimeUnit.SECONDS); // past 1 minute timeout + nodeAllocatorService.processPendingAcquires(); + assertThatThrownBy(() -> Futures.getUnchecked(acquireNoMatching.getNode())) .hasMessageContaining("No nodes available to run query"); // add node with specific catalog @@ -332,6 +346,8 @@ public void testNoMatchingNodeAvailable() nodeManager.removeNode(NODE_2); // TODO: make FullNodeCapableNodeAllocatorService react on node removed automatically nodeAllocatorService.processPendingAcquires(); + ticker.increment(61, TimeUnit.SECONDS); // wait past the timeout + nodeAllocatorService.processPendingAcquires(); // pending acquire2 should be completed now but with an exception assertEventually(() -> { diff --git a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java index 954cb3b1a37a..49f4c0f2ab6c 100644 --- a/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java +++ b/core/trino-main/src/test/java/io/trino/execution/scheduler/TestExponentialGrowthPartitionMemoryEstimator.java @@ -13,6 +13,7 @@ */ package io.trino.execution.scheduler; +import com.google.common.base.Ticker; import com.google.common.collect.ImmutableMap; import io.airlift.units.DataSize; import io.trino.Session; @@ -28,6 +29,7 @@ import org.testng.annotations.Test; import java.net.URI; +import java.time.Duration; import java.util.Optional; import static io.airlift.units.DataSize.Unit.GIGABYTE; @@ -35,6 +37,7 @@ import static io.trino.spi.StandardErrorCode.ADMINISTRATIVELY_PREEMPTED; import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY; import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT; +import static java.time.temporal.ChronoUnit.MINUTES; import static org.assertj.core.api.Assertions.assertThat; public class TestExponentialGrowthPartitionMemoryEstimator @@ -49,7 +52,9 @@ public void testEstimator() nodeManager, () -> ImmutableMap.of(new InternalNode("a-node", URI.create("local://blah"), NodeVersion.UNKNOWN, false).getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo(DataSize.ofBytes(0)))), false, - DataSize.ofBytes(0)); + Duration.of(1, MINUTES), + DataSize.ofBytes(0), + Ticker.systemTicker()); nodeAllocatorService.refreshNodePoolMemoryInfos(); PartitionMemoryEstimator estimator = nodeAllocatorService.createPartitionMemoryEstimator();