Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ private Connector createConnector(
catalogHandle,
openTelemetry,
createTracer(catalogHandle),
new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), catalogHandle, schedulerIncludeCoordinator),
new DefaultNodeManager(nodeManager, nodeInfo.getEnvironment(), schedulerIncludeCoordinator),
versionEmbedder,
typeManager,
new InternalMetadataProvider(metadata, typeManager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,31 @@
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.Node;
import io.trino.spi.NodeManager;
import io.trino.spi.connector.CatalogHandle;

import java.util.Set;

import static io.trino.metadata.NodeState.ACTIVE;
import static java.util.Objects.requireNonNull;

public class ConnectorAwareNodeManager
public class DefaultNodeManager
implements NodeManager
{
private final InternalNodeManager nodeManager;
private final String environment;
private final CatalogHandle catalogHandle;
private final boolean schedulerIncludeCoordinator;

public ConnectorAwareNodeManager(InternalNodeManager nodeManager, String environment, CatalogHandle catalogHandle, boolean schedulerIncludeCoordinator)
public DefaultNodeManager(InternalNodeManager nodeManager, String environment, boolean schedulerIncludeCoordinator)
{
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.environment = requireNonNull(environment, "environment is null");
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
this.schedulerIncludeCoordinator = schedulerIncludeCoordinator;
}

@Override
public Set<Node> getAllNodes()
{
return ImmutableSet.<Node>builder()
.addAll(nodeManager.getActiveCatalogNodes(catalogHandle))
.addAll(nodeManager.getNodes(ACTIVE))
// append current node (before connector is registered with the node
// in the discovery service) since current node should have connector always loaded
.add(nodeManager.getCurrentNode())
Expand All @@ -56,7 +54,7 @@ public Set<Node> getWorkerNodes()
ImmutableSet.Builder<Node> nodes = ImmutableSet.builder();
// getActiveConnectorNodes returns all nodes (including coordinators)
// that have connector registered
nodeManager.getActiveCatalogNodes(catalogHandle).stream()
nodeManager.getNodes(ACTIVE).stream()
.filter(node -> !node.isCoordinator() || schedulerIncludeCoordinator)
.forEach(nodes::add);
if (!nodeManager.getCurrentNode().isCoordinator() || schedulerIncludeCoordinator) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.trino.metadata.Split;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.CatalogHandle;

import java.net.InetAddress;
import java.net.UnknownHostException;
Expand All @@ -39,7 +38,6 @@
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;

import static com.google.common.base.Preconditions.checkArgument;
Expand All @@ -60,9 +58,9 @@ public NodeScheduler(NodeSelectorFactory nodeSelectorFactory)
this.nodeSelectorFactory = requireNonNull(nodeSelectorFactory, "nodeSelectorFactory is null");
}

public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle)
public NodeSelector createNodeSelector(Session session)
{
return nodeSelectorFactory.createNodeSelector(requireNonNull(session, "session is null"), requireNonNull(catalogHandle, "catalogHandle is null"));
return nodeSelectorFactory.createNodeSelector(requireNonNull(session, "session is null"));
}

public static List<InternalNode> getAllNodes(NodeMap nodeMap, boolean includeCoordinator)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,8 @@
package io.trino.execution.scheduler;

import io.trino.Session;
import io.trino.spi.connector.CatalogHandle;

import java.util.Optional;

public interface NodeSelectorFactory
{
NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle);
NodeSelector createNodeSelector(Session session);
}
Original file line number Diff line number Diff line change
Expand Up @@ -748,7 +748,7 @@ public synchronized void schedule()
TaskFailureReporter failureReporter = new TaskFailureReporter(distributedStagesScheduler);
queryStateMachine.addOutputTaskFailureListener(failureReporter);

InternalNode coordinator = nodeScheduler.createNodeSelector(queryStateMachine.getSession(), Optional.empty()).selectCurrentNode();
InternalNode coordinator = nodeScheduler.createNodeSelector(queryStateMachine.getSession()).selectCurrentNode();
for (StageExecution stageExecution : stageExecutions) {
Optional<RemoteTask> remoteTask = stageExecution.scheduleTask(
coordinator,
Expand Down Expand Up @@ -1084,9 +1084,7 @@ public void stateChanged(QueryState newState)
Entry<PlanNodeId, SplitSource> entry = getOnlyElement(splitSources.entrySet());
PlanNodeId planNodeId = entry.getKey();
SplitSource splitSource = entry.getValue();
Optional<CatalogHandle> catalogHandle = Optional.of(splitSource.getCatalogHandle())
.filter(catalog -> !catalog.getType().isInternal());
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogHandle);
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session);
SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks);

return newSourcePartitionedSchedulerAsStageScheduler(
Expand All @@ -1106,9 +1104,7 @@ public void stateChanged(QueryState newState)
.collect(toImmutableSet());
checkState(allCatalogHandles.size() <= 1, "table scans that are within one stage should read from same catalog");

Optional<CatalogHandle> catalogHandle = allCatalogHandles.size() == 1 ? Optional.of(getOnlyElement(allCatalogHandles)) : Optional.empty();

NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogHandle);
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session);
return new MultiSourcePartitionedScheduler(
stageExecution,
splitSources,
Expand All @@ -1130,7 +1126,7 @@ public void stateChanged(QueryState newState)
stageExecution,
sourceTasksProvider,
writerTasksProvider,
nodeScheduler.createNodeSelector(session, Optional.empty()),
nodeScheduler.createNodeSelector(session),
executor,
getWriterScalingMinDataProcessed(session),
partitionCount);
Expand All @@ -1152,15 +1148,13 @@ public void stateChanged(QueryState newState)

// contains local source
List<PlanNodeId> schedulingOrder = fragment.getPartitionedSources();
Optional<CatalogHandle> catalogHandle = partitioningHandle.getCatalogHandle();
checkArgument(catalogHandle.isPresent(), "No catalog handle for partitioning handle: %s", partitioningHandle);

BucketNodeMap bucketNodeMap;
List<InternalNode> stageNodeList;
if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
// no remote source
bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, partitionCount);
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogHandle).allNodes());
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session).allNodes());
Collections.shuffle(stageNodeList);
}
else {
Expand All @@ -1177,7 +1171,7 @@ public void stateChanged(QueryState newState)
stageNodeList,
bucketNodeMap,
splitBatchSize,
nodeScheduler.createNodeSelector(session, catalogHandle),
nodeScheduler.createNodeSelector(session),
dynamicFilterService,
tableExecuteContextManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,11 @@
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.CatalogHandle;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -115,14 +113,12 @@ public Map<String, CounterStat> getPlacementCountersByName()
}

@Override
public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle)
public NodeSelector createNodeSelector(Session session)
{
requireNonNull(catalogHandle, "catalogHandle is null");

// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
// done as close to when the split is about to be scheduled
Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(
() -> createNodeMap(catalogHandle),
this::createNodeMap,
5, TimeUnit.SECONDS);

return new TopologyAwareNodeSelector(
Expand All @@ -138,11 +134,9 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
networkTopology);
}

private NodeMap createNodeMap(Optional<CatalogHandle> catalogHandle)
private NodeMap createNodeMap()
{
Set<InternalNode> nodes = catalogHandle
.map(nodeManager::getActiveCatalogNodes)
.orElseGet(() -> nodeManager.getNodes(ACTIVE));
Set<InternalNode> nodes = nodeManager.getNodes(ACTIVE);

Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
.map(InternalNode::getNodeIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,9 @@
import io.trino.metadata.InternalNodeManager;
import io.trino.spi.HostAddress;
import io.trino.spi.SplitWeight;
import io.trino.spi.connector.CatalogHandle;

import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand Down Expand Up @@ -101,20 +99,18 @@ public UniformNodeSelectorFactory(
}

@Override
public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle)
public NodeSelector createNodeSelector(Session session)
{
requireNonNull(catalogHandle, "catalogHandle is null");

// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
// done as close to when the split is about to be scheduled
Supplier<NodeMap> nodeMap;
if (nodeMapMemoizationDuration.toMillis() > 0) {
nodeMap = Suppliers.memoizeWithExpiration(
() -> createNodeMap(catalogHandle),
this::createNodeMap,
nodeMapMemoizationDuration.toMillis(), MILLISECONDS);
}
else {
nodeMap = () -> createNodeMap(catalogHandle);
nodeMap = this::createNodeMap;
}

return new UniformNodeSelector(
Expand All @@ -131,11 +127,9 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
optimizedLocalScheduling);
}

private NodeMap createNodeMap(Optional<CatalogHandle> catalogHandle)
private NodeMap createNodeMap()
{
Set<InternalNode> nodes = catalogHandle
.map(nodeManager::getActiveCatalogNodes)
.orElseGet(() -> nodeManager.getNodes(ACTIVE));
Set<InternalNode> nodes = nodeManager.getNodes(ACTIVE);

Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
.map(InternalNode::getNodeIdentifier)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@

import com.google.common.base.Stopwatch;
import com.google.common.base.Ticker;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.collect.HashMultimap;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
Expand All @@ -34,7 +32,6 @@
import io.airlift.stats.DistributionStat;
import io.airlift.units.DataSize;
import io.trino.Session;
import io.trino.cache.NonEvictableLoadingCache;
import io.trino.execution.TaskId;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.memory.ClusterMemoryManager;
Expand All @@ -46,7 +43,6 @@
import io.trino.spi.HostAddress;
import io.trino.spi.QueryId;
import io.trino.spi.TrinoException;
import io.trino.spi.connector.CatalogHandle;
import io.trino.spi.memory.MemoryPoolInfo;
import jakarta.annotation.PostConstruct;
import jakarta.annotation.PreDestroy;
Expand Down Expand Up @@ -86,7 +82,6 @@
import static com.google.common.collect.ImmutableList.toImmutableList;
import static com.google.common.collect.Sets.newConcurrentHashSet;
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
import static io.trino.execution.scheduler.faulttolerant.TaskExecutionClass.EAGER_SPECULATIVE;
import static io.trino.execution.scheduler.faulttolerant.TaskExecutionClass.SPECULATIVE;
import static io.trino.execution.scheduler.faulttolerant.TaskExecutionClass.STANDARD;
Expand Down Expand Up @@ -773,8 +768,6 @@ private static class BinPackingSimulation
private final Set<String> nodesWithoutMemory;
private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
private final Map<String, Long> speculativeMemoryReserved;
private final NonEvictableLoadingCache<CatalogHandle, List<InternalNode>> catalogNodes;
private final NonEvictableLoadingCache<CatalogHandle, List<InternalNode>> catalogWorkerNodes;

private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
private final boolean scheduleOnCoordinator;
Expand All @@ -800,22 +793,6 @@ public BinPackingSimulation(

this.workerNodesSorted = allNodesSorted.stream().filter(node -> !node.isCoordinator()).collect(toImmutableList());

this.catalogNodes = buildNonEvictableCache(
CacheBuilder.newBuilder(),
CacheLoader.from(catalogHandle -> {
List<InternalNode> nodes = new ArrayList<>(allNodesSorted);
nodes.retainAll(nodesSnapshot.getConnectorNodes(catalogHandle));
return nodes;
}));

this.catalogWorkerNodes = buildNonEvictableCache(
CacheBuilder.newBuilder(),
CacheLoader.from(catalogHandle -> {
List<InternalNode> nodes = new ArrayList<>(workerNodesSorted);
nodes.retainAll(nodesSnapshot.getConnectorNodes(catalogHandle));
return nodes;
}));

allNodesByAddress = Multimaps.index(nodesSnapshot.getAllNodes(), InternalNode::getHostAndPort);

this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative;
Expand Down Expand Up @@ -904,16 +881,16 @@ public ReserveResult tryReserve(PendingAcquire acquire)
Collection<InternalNode> preferred = allNodesByAddress.get(address.get());
if ((!preferred.isEmpty() && acquire.getNotEnoughResourcesPeriod().compareTo(exhaustedNodeWaitPeriod) < 0) || !requirements.isRemotelyAccessible()) {
// use preferred node if available
candidates = getCandidatesWithCoordinator(requirements).stream().filter(preferred::contains).collect(toImmutableList());
candidates = getCandidatesWithCoordinator().stream().filter(preferred::contains).collect(toImmutableList());
}
else {
// use all nodes if we do not have preferences or waited to long
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator(requirements) : getCandidatesExceptCoordinator(requirements);
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator() : getCandidatesExceptCoordinator();
}
}
else {
// standard candidates
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator(requirements) : getCandidatesExceptCoordinator(requirements);
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator() : getCandidatesExceptCoordinator();
}

if (candidates.isEmpty()) {
Expand Down Expand Up @@ -962,18 +939,14 @@ public ReserveResult tryReserve(PendingAcquire acquire)
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
}

private List<InternalNode> getCandidatesExceptCoordinator(NodeRequirements requirements)
private List<InternalNode> getCandidatesExceptCoordinator()
{
return requirements.getCatalogHandle()
.map(catalogWorkerNodes::getUnchecked)
.orElse(workerNodesSorted);
return workerNodesSorted;
}

private List<InternalNode> getCandidatesWithCoordinator(NodeRequirements requirements)
private List<InternalNode> getCandidatesWithCoordinator()
{
return requirements.getCatalogHandle()
.map(catalogNodes::getUnchecked)
.orElse(allNodesSorted);
return allNodesSorted;
}

private Comparator<InternalNode> resolveTiesWithSpeculativeMemory(Comparator<InternalNode> comparator)
Expand Down
Loading