Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package io.trino.execution.scheduler;

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
Expand Down Expand Up @@ -42,6 +44,7 @@
import javax.annotation.concurrent.ThreadSafe;
import javax.inject.Inject;

import java.time.Duration;
import java.util.Deque;
import java.util.HashMap;
import java.util.HashSet;
Expand Down Expand Up @@ -98,10 +101,12 @@ public class BinPackingNodeAllocatorService
private final AtomicReference<Optional<DataSize>> maxNodePoolSize = new AtomicReference<>(Optional.empty());
private final boolean scheduleOnCoordinator;
private final DataSize taskRuntimeMemoryEstimationOverhead;
private final Ticker ticker;

private final ConcurrentMap<String, Long> allocatedMemory = new ConcurrentHashMap<>();
private final Deque<PendingAcquire> pendingAcquires = new ConcurrentLinkedDeque<>();
private final Set<BinPackingNodeLease> fulfilledAcquires = newConcurrentHashSet();
private final Duration allowedNoMatchingNodePeriod;

@Inject
public BinPackingNodeAllocatorService(
Expand All @@ -113,20 +118,26 @@ public BinPackingNodeAllocatorService(
this(nodeManager,
requireNonNull(clusterMemoryManager, "clusterMemoryManager is null")::getWorkerMemoryInfo,
nodeSchedulerConfig.isIncludeCoordinator(),
memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead());
Duration.ofMillis(nodeSchedulerConfig.getAllowedNoMatchingNodePeriod().toMillis()),
memoryManagerConfig.getFaultTolerantExecutionTaskRuntimeMemoryEstimationOverhead(),
Ticker.systemTicker());
}

@VisibleForTesting
BinPackingNodeAllocatorService(
InternalNodeManager nodeManager,
Supplier<Map<String, Optional<MemoryInfo>>> workerMemoryInfoSupplier,
boolean scheduleOnCoordinator,
DataSize taskRuntimeMemoryEstimationOverhead)
Duration allowedNoMatchingNodePeriod,
DataSize taskRuntimeMemoryEstimationOverhead,
Ticker ticker)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.workerMemoryInfoSupplier = requireNonNull(workerMemoryInfoSupplier, "workerMemoryInfoSupplier is null");
this.scheduleOnCoordinator = scheduleOnCoordinator;
this.allowedNoMatchingNodePeriod = requireNonNull(allowedNoMatchingNodePeriod, "allowedNoMatchingNodePeriod is null");
this.taskRuntimeMemoryEstimationOverhead = requireNonNull(taskRuntimeMemoryEstimationOverhead, "taskRuntimeMemoryEstimationOverhead is null");
this.ticker = requireNonNull(ticker, "ticker is null");
}

@PostConstruct
Expand Down Expand Up @@ -225,6 +236,13 @@ synchronized void processPendingAcquires()
iterator.remove();
break;
case NONE_MATCHING:
Duration noMatchingNodePeriod = pendingAcquire.markNoMatchingNodeFound();

if (noMatchingNodePeriod.compareTo(allowedNoMatchingNodePeriod) <= 0) {
// wait some more time
break;
}

pendingAcquire.getFuture().setException(new TrinoException(NO_NODES_AVAILABLE, "No nodes available to run query"));
iterator.remove();
break;
Expand All @@ -251,7 +269,7 @@ public NodeAllocator getNodeAllocator(Session session)
public NodeLease acquire(NodeRequirements requirements)
{
BinPackingNodeLease nodeLease = new BinPackingNodeLease(requirements.getMemory().toBytes());
PendingAcquire pendingAcquire = new PendingAcquire(requirements, nodeLease);
PendingAcquire pendingAcquire = new PendingAcquire(requirements, nodeLease, ticker);
pendingAcquires.add(pendingAcquire);
wakeupProcessPendingAcquires();
return nodeLease;
Expand Down Expand Up @@ -283,11 +301,13 @@ private static class PendingAcquire
{
private final NodeRequirements nodeRequirements;
private final BinPackingNodeLease lease;
private final Stopwatch noMatchingNodeStopwatch;

private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease)
private PendingAcquire(NodeRequirements nodeRequirements, BinPackingNodeLease lease, Ticker ticker)
{
this.nodeRequirements = requireNonNull(nodeRequirements, "nodeRequirements is null");
this.lease = requireNonNull(lease, "lease is null");
this.noMatchingNodeStopwatch = Stopwatch.createUnstarted(ticker);
}

public NodeRequirements getNodeRequirements()
Expand All @@ -309,6 +329,14 @@ public long getMemoryLease()
{
return nodeRequirements.getMemory().toBytes();
}

public Duration markNoMatchingNodeFound()
{
if (!noMatchingNodeStopwatch.isRunning()) {
noMatchingNodeStopwatch.start();
}
return noMatchingNodeStopwatch.elapsed();
}
}

private class BinPackingNodeLease
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import io.airlift.configuration.ConfigDescription;
import io.airlift.configuration.DefunctConfig;
import io.airlift.configuration.LegacyConfig;
import io.airlift.units.Duration;

import javax.validation.constraints.DecimalMax;
import javax.validation.constraints.DecimalMin;
import javax.validation.constraints.Min;
import javax.validation.constraints.NotNull;

import java.util.concurrent.TimeUnit;

import static java.util.Locale.ENGLISH;

@DefunctConfig({"node-scheduler.location-aware-scheduling-enabled", "node-scheduler.multiple-tasks-per-node-enabled"})
Expand All @@ -48,6 +51,7 @@ public enum SplitsBalancingPolicy
private int maxUnacknowledgedSplitsPerTask = 500;
private int maxAbsoluteFullNodesPerQuery = Integer.MAX_VALUE;
private double maxFractionFullNodesPerQuery = 0.5;
private Duration allowedNoMatchingNodePeriod = new Duration(2, TimeUnit.MINUTES);
private NodeAllocatorType nodeAllocatorType = NodeAllocatorType.BIN_PACKING;

@NotNull
Expand Down Expand Up @@ -195,6 +199,19 @@ public double getMaxFractionFullNodesPerQuery()
return maxFractionFullNodesPerQuery;
}

@Config("node-scheduler.allowed-no-matching-node-period")
@ConfigDescription("How long scheduler should wait before failing a query for which hard task requirements (e.g. node exposing specific catalog) cannot be satisfied")
public NodeSchedulerConfig setAllowedNoMatchingNodePeriod(Duration allowedNoMatchingNodePeriod)
{
this.allowedNoMatchingNodePeriod = allowedNoMatchingNodePeriod;
return this;
}

public Duration getAllowedNoMatchingNodePeriod()
{
return allowedNoMatchingNodePeriod;
}

public enum NodeAllocatorType
{
FIXED_COUNT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package io.trino.execution;

import com.google.common.collect.ImmutableMap;
import io.airlift.units.Duration;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import org.testng.annotations.Test;

Expand All @@ -24,6 +25,7 @@
import static io.airlift.configuration.testing.ConfigAssertions.recordDefaults;
import static io.trino.execution.scheduler.NodeSchedulerConfig.NodeSchedulerPolicy.UNIFORM;
import static io.trino.execution.scheduler.NodeSchedulerConfig.SplitsBalancingPolicy.NODE;
import static java.util.concurrent.TimeUnit.MINUTES;

public class TestNodeSchedulerConfig
{
Expand All @@ -41,6 +43,7 @@ public void testDefaults()
.setOptimizedLocalScheduling(true)
.setMaxAbsoluteFullNodesPerQuery(Integer.MAX_VALUE)
.setMaxFractionFullNodesPerQuery(0.5)
.setAllowedNoMatchingNodePeriod(new Duration(2, MINUTES))
.setNodeAllocatorType("bin_packing"));
}

Expand All @@ -58,6 +61,7 @@ public void testExplicitPropertyMappings()
.put("node-scheduler.optimized-local-scheduling", "false")
.put("node-scheduler.max-absolute-full-nodes-per-query", "17")
.put("node-scheduler.max-fraction-full-nodes-per-query", "0.3")
.put("node-scheduler.allowed-no-matching-node-period", "1m")
.put("node-scheduler.allocator-type", "fixed_count")
.buildOrThrow();

Expand All @@ -72,6 +76,7 @@ public void testExplicitPropertyMappings()
.setOptimizedLocalScheduling(false)
.setMaxAbsoluteFullNodesPerQuery(17)
.setMaxFractionFullNodesPerQuery(0.3)
.setAllowedNoMatchingNodePeriod(new Duration(1, MINUTES))
.setNodeAllocatorType("fixed_count");

assertFullMapping(properties, expected);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.util.concurrent.Futures;
import io.airlift.testing.TestingTicker;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.client.NodeVersion;
Expand All @@ -32,18 +33,21 @@
import org.testng.annotations.Test;

import java.net.URI;
import java.time.Duration;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;

import static com.google.common.base.Preconditions.checkArgument;
import static com.google.common.collect.ImmutableMap.toImmutableMap;
import static io.airlift.concurrent.MoreFutures.getFutureValue;
import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.trino.testing.TestingSession.testSessionBuilder;
import static java.time.temporal.ChronoUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
Expand Down Expand Up @@ -82,6 +86,7 @@ public class TestBinPackingNodeAllocator

private BinPackingNodeAllocatorService nodeAllocatorService;
private ConcurrentHashMap<String, Optional<MemoryInfo>> workerMemoryInfos;
private final TestingTicker ticker = new TestingTicker();

private void setupNodeAllocatorService(InMemoryNodeManager nodeManager)
{
Expand All @@ -103,7 +108,9 @@ private void setupNodeAllocatorService(InMemoryNodeManager nodeManager, DataSize
nodeManager,
() -> workerMemoryInfos,
false,
taskRuntimeMemoryEstimationOverhead);
Duration.of(1, MINUTES),
taskRuntimeMemoryEstimationOverhead,
ticker);
nodeAllocatorService.start();
}

Expand Down Expand Up @@ -314,7 +321,14 @@ public void testNoMatchingNodeAvailable()

try (NodeAllocator nodeAllocator = nodeAllocatorService.getNodeAllocator(SESSION)) {
// request a node with specific catalog (not present)
assertThatThrownBy(() -> Futures.getUnchecked(nodeAllocator.acquire(REQ_CATALOG_1_32.withMemory(DataSize.of(64, GIGABYTE))).getNode()))
NodeAllocator.NodeLease acquireNoMatching = nodeAllocator.acquire(REQ_CATALOG_1_32.withMemory(DataSize.of(64, GIGABYTE)));
assertNotAcquired(acquireNoMatching);
ticker.increment(59, TimeUnit.SECONDS); // still below timeout
nodeAllocatorService.processPendingAcquires();
assertNotAcquired(acquireNoMatching);
ticker.increment(2, TimeUnit.SECONDS); // past 1 minute timeout
nodeAllocatorService.processPendingAcquires();
assertThatThrownBy(() -> Futures.getUnchecked(acquireNoMatching.getNode()))
.hasMessageContaining("No nodes available to run query");

// add node with specific catalog
Expand All @@ -332,6 +346,8 @@ public void testNoMatchingNodeAvailable()
nodeManager.removeNode(NODE_2);
// TODO: make FullNodeCapableNodeAllocatorService react on node removed automatically
nodeAllocatorService.processPendingAcquires();
ticker.increment(61, TimeUnit.SECONDS); // wait past the timeout
nodeAllocatorService.processPendingAcquires();

// pending acquire2 should be completed now but with an exception
assertEventually(() -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/
package io.trino.execution.scheduler;

import com.google.common.base.Ticker;
import com.google.common.collect.ImmutableMap;
import io.airlift.units.DataSize;
import io.trino.Session;
Expand All @@ -28,13 +29,15 @@
import org.testng.annotations.Test;

import java.net.URI;
import java.time.Duration;
import java.util.Optional;

import static io.airlift.units.DataSize.Unit.GIGABYTE;
import static io.airlift.units.DataSize.Unit.MEGABYTE;
import static io.trino.spi.StandardErrorCode.ADMINISTRATIVELY_PREEMPTED;
import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static io.trino.spi.StandardErrorCode.EXCEEDED_LOCAL_MEMORY_LIMIT;
import static java.time.temporal.ChronoUnit.MINUTES;
import static org.assertj.core.api.Assertions.assertThat;

public class TestExponentialGrowthPartitionMemoryEstimator
Expand All @@ -49,7 +52,9 @@ public void testEstimator()
nodeManager,
() -> ImmutableMap.of(new InternalNode("a-node", URI.create("local://blah"), NodeVersion.UNKNOWN, false).getNodeIdentifier(), Optional.of(buildWorkerMemoryInfo(DataSize.ofBytes(0)))),
false,
DataSize.ofBytes(0));
Duration.of(1, MINUTES),
DataSize.ofBytes(0),
Ticker.systemTicker());
nodeAllocatorService.refreshNodePoolMemoryInfos();
PartitionMemoryEstimator estimator = nodeAllocatorService.createPartitionMemoryEstimator();

Expand Down