Skip to content

Commit 055635f

Browse files
committed
Assume all nodes have the same catalogs deployed
Remove support for workers with different catalogs deployed. This has been deprecated for a long while and is how the vast majority of users deploy. This feature feature creates a lot of complexity in the system which makes adding new features more difficult.
1 parent 0419b43 commit 055635f

24 files changed

+82
-442
lines changed

core/trino-main/src/main/java/io/trino/connector/DefaultCatalogFactory.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -197,7 +197,7 @@ private Connector createConnector(
197197
catalogHandle,
198198
openTelemetry,
199199
createTracer(catalogHandle),
200-
new ConnectorAwareNodeManager(nodeManager, nodeInfo.getEnvironment(), catalogHandle, schedulerIncludeCoordinator),
200+
new DefaultNodeManager(nodeManager, nodeInfo.getEnvironment(), schedulerIncludeCoordinator),
201201
versionEmbedder,
202202
typeManager,
203203
new InternalMetadataProvider(metadata, typeManager),

core/trino-main/src/main/java/io/trino/connector/ConnectorAwareNodeManager.java renamed to core/trino-main/src/main/java/io/trino/connector/DefaultNodeManager.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,33 +17,31 @@
1717
import io.trino.metadata.InternalNodeManager;
1818
import io.trino.spi.Node;
1919
import io.trino.spi.NodeManager;
20-
import io.trino.spi.connector.CatalogHandle;
2120

2221
import java.util.Set;
2322

23+
import static io.trino.metadata.NodeState.ACTIVE;
2424
import static java.util.Objects.requireNonNull;
2525

26-
public class ConnectorAwareNodeManager
26+
public class DefaultNodeManager
2727
implements NodeManager
2828
{
2929
private final InternalNodeManager nodeManager;
3030
private final String environment;
31-
private final CatalogHandle catalogHandle;
3231
private final boolean schedulerIncludeCoordinator;
3332

34-
public ConnectorAwareNodeManager(InternalNodeManager nodeManager, String environment, CatalogHandle catalogHandle, boolean schedulerIncludeCoordinator)
33+
public DefaultNodeManager(InternalNodeManager nodeManager, String environment, boolean schedulerIncludeCoordinator)
3534
{
3635
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
3736
this.environment = requireNonNull(environment, "environment is null");
38-
this.catalogHandle = requireNonNull(catalogHandle, "catalogHandle is null");
3937
this.schedulerIncludeCoordinator = schedulerIncludeCoordinator;
4038
}
4139

4240
@Override
4341
public Set<Node> getAllNodes()
4442
{
4543
return ImmutableSet.<Node>builder()
46-
.addAll(nodeManager.getActiveCatalogNodes(catalogHandle))
44+
.addAll(nodeManager.getNodes(ACTIVE))
4745
// append current node (before connector is registered with the node
4846
// in the discovery service) since current node should have connector always loaded
4947
.add(nodeManager.getCurrentNode())
@@ -56,7 +54,7 @@ public Set<Node> getWorkerNodes()
5654
ImmutableSet.Builder<Node> nodes = ImmutableSet.builder();
5755
// getActiveConnectorNodes returns all nodes (including coordinators)
5856
// that have connector registered
59-
nodeManager.getActiveCatalogNodes(catalogHandle).stream()
57+
nodeManager.getNodes(ACTIVE).stream()
6058
.filter(node -> !node.isCoordinator() || schedulerIncludeCoordinator)
6159
.forEach(nodes::add);
6260
if (!nodeManager.getCurrentNode().isCoordinator() || schedulerIncludeCoordinator) {

core/trino-main/src/main/java/io/trino/execution/scheduler/NodeScheduler.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import io.trino.metadata.Split;
2828
import io.trino.spi.HostAddress;
2929
import io.trino.spi.SplitWeight;
30-
import io.trino.spi.connector.CatalogHandle;
3130

3231
import java.net.InetAddress;
3332
import java.net.UnknownHostException;
@@ -39,7 +38,6 @@
3938
import java.util.List;
4039
import java.util.Map;
4140
import java.util.Objects;
42-
import java.util.Optional;
4341
import java.util.Set;
4442

4543
import static com.google.common.base.Preconditions.checkArgument;
@@ -60,9 +58,9 @@ public NodeScheduler(NodeSelectorFactory nodeSelectorFactory)
6058
this.nodeSelectorFactory = requireNonNull(nodeSelectorFactory, "nodeSelectorFactory is null");
6159
}
6260

63-
public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle)
61+
public NodeSelector createNodeSelector(Session session)
6462
{
65-
return nodeSelectorFactory.createNodeSelector(requireNonNull(session, "session is null"), requireNonNull(catalogHandle, "catalogHandle is null"));
63+
return nodeSelectorFactory.createNodeSelector(requireNonNull(session, "session is null"));
6664
}
6765

6866
public static List<InternalNode> getAllNodes(NodeMap nodeMap, boolean includeCoordinator)

core/trino-main/src/main/java/io/trino/execution/scheduler/NodeSelectorFactory.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,11 +14,8 @@
1414
package io.trino.execution.scheduler;
1515

1616
import io.trino.Session;
17-
import io.trino.spi.connector.CatalogHandle;
18-
19-
import java.util.Optional;
2017

2118
public interface NodeSelectorFactory
2219
{
23-
NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle);
20+
NodeSelector createNodeSelector(Session session);
2421
}

core/trino-main/src/main/java/io/trino/execution/scheduler/PipelinedQueryScheduler.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -748,7 +748,7 @@ public synchronized void schedule()
748748
TaskFailureReporter failureReporter = new TaskFailureReporter(distributedStagesScheduler);
749749
queryStateMachine.addOutputTaskFailureListener(failureReporter);
750750

751-
InternalNode coordinator = nodeScheduler.createNodeSelector(queryStateMachine.getSession(), Optional.empty()).selectCurrentNode();
751+
InternalNode coordinator = nodeScheduler.createNodeSelector(queryStateMachine.getSession()).selectCurrentNode();
752752
for (StageExecution stageExecution : stageExecutions) {
753753
Optional<RemoteTask> remoteTask = stageExecution.scheduleTask(
754754
coordinator,
@@ -1084,9 +1084,7 @@ public void stateChanged(QueryState newState)
10841084
Entry<PlanNodeId, SplitSource> entry = getOnlyElement(splitSources.entrySet());
10851085
PlanNodeId planNodeId = entry.getKey();
10861086
SplitSource splitSource = entry.getValue();
1087-
Optional<CatalogHandle> catalogHandle = Optional.of(splitSource.getCatalogHandle())
1088-
.filter(catalog -> !catalog.getType().isInternal());
1089-
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogHandle);
1087+
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session);
10901088
SplitPlacementPolicy placementPolicy = new DynamicSplitPlacementPolicy(nodeSelector, stageExecution::getAllTasks);
10911089

10921090
return newSourcePartitionedSchedulerAsStageScheduler(
@@ -1106,9 +1104,7 @@ public void stateChanged(QueryState newState)
11061104
.collect(toImmutableSet());
11071105
checkState(allCatalogHandles.size() <= 1, "table scans that are within one stage should read from same catalog");
11081106

1109-
Optional<CatalogHandle> catalogHandle = allCatalogHandles.size() == 1 ? Optional.of(getOnlyElement(allCatalogHandles)) : Optional.empty();
1110-
1111-
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session, catalogHandle);
1107+
NodeSelector nodeSelector = nodeScheduler.createNodeSelector(session);
11121108
return new MultiSourcePartitionedScheduler(
11131109
stageExecution,
11141110
splitSources,
@@ -1130,7 +1126,7 @@ public void stateChanged(QueryState newState)
11301126
stageExecution,
11311127
sourceTasksProvider,
11321128
writerTasksProvider,
1133-
nodeScheduler.createNodeSelector(session, Optional.empty()),
1129+
nodeScheduler.createNodeSelector(session),
11341130
executor,
11351131
getWriterScalingMinDataProcessed(session),
11361132
partitionCount);
@@ -1152,15 +1148,13 @@ public void stateChanged(QueryState newState)
11521148

11531149
// contains local source
11541150
List<PlanNodeId> schedulingOrder = fragment.getPartitionedSources();
1155-
Optional<CatalogHandle> catalogHandle = partitioningHandle.getCatalogHandle();
1156-
checkArgument(catalogHandle.isPresent(), "No catalog handle for partitioning handle: %s", partitioningHandle);
11571151

11581152
BucketNodeMap bucketNodeMap;
11591153
List<InternalNode> stageNodeList;
11601154
if (fragment.getRemoteSourceNodes().stream().allMatch(node -> node.getExchangeType() == REPLICATE)) {
11611155
// no remote source
11621156
bucketNodeMap = nodePartitioningManager.getBucketNodeMap(session, partitioningHandle, partitionCount);
1163-
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session, catalogHandle).allNodes());
1157+
stageNodeList = new ArrayList<>(nodeScheduler.createNodeSelector(session).allNodes());
11641158
Collections.shuffle(stageNodeList);
11651159
}
11661160
else {
@@ -1177,7 +1171,7 @@ public void stateChanged(QueryState newState)
11771171
stageNodeList,
11781172
bucketNodeMap,
11791173
splitBatchSize,
1180-
nodeScheduler.createNodeSelector(session, catalogHandle),
1174+
nodeScheduler.createNodeSelector(session),
11811175
dynamicFilterService,
11821176
tableExecuteContextManager);
11831177
}

core/trino-main/src/main/java/io/trino/execution/scheduler/TopologyAwareNodeSelectorFactory.java

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -29,13 +29,11 @@
2929
import io.trino.metadata.InternalNodeManager;
3030
import io.trino.spi.HostAddress;
3131
import io.trino.spi.SplitWeight;
32-
import io.trino.spi.connector.CatalogHandle;
3332

3433
import java.net.InetAddress;
3534
import java.net.UnknownHostException;
3635
import java.util.List;
3736
import java.util.Map;
38-
import java.util.Optional;
3937
import java.util.Set;
4038
import java.util.concurrent.TimeUnit;
4139
import java.util.function.Supplier;
@@ -115,14 +113,12 @@ public Map<String, CounterStat> getPlacementCountersByName()
115113
}
116114

117115
@Override
118-
public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle)
116+
public NodeSelector createNodeSelector(Session session)
119117
{
120-
requireNonNull(catalogHandle, "catalogHandle is null");
121-
122118
// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
123119
// done as close to when the split is about to be scheduled
124120
Supplier<NodeMap> nodeMap = Suppliers.memoizeWithExpiration(
125-
() -> createNodeMap(catalogHandle),
121+
this::createNodeMap,
126122
5, TimeUnit.SECONDS);
127123

128124
return new TopologyAwareNodeSelector(
@@ -138,11 +134,9 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
138134
networkTopology);
139135
}
140136

141-
private NodeMap createNodeMap(Optional<CatalogHandle> catalogHandle)
137+
private NodeMap createNodeMap()
142138
{
143-
Set<InternalNode> nodes = catalogHandle
144-
.map(nodeManager::getActiveCatalogNodes)
145-
.orElseGet(() -> nodeManager.getNodes(ACTIVE));
139+
Set<InternalNode> nodes = nodeManager.getNodes(ACTIVE);
146140

147141
Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
148142
.map(InternalNode::getNodeIdentifier)

core/trino-main/src/main/java/io/trino/execution/scheduler/UniformNodeSelectorFactory.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -28,11 +28,9 @@
2828
import io.trino.metadata.InternalNodeManager;
2929
import io.trino.spi.HostAddress;
3030
import io.trino.spi.SplitWeight;
31-
import io.trino.spi.connector.CatalogHandle;
3231

3332
import java.net.InetAddress;
3433
import java.net.UnknownHostException;
35-
import java.util.Optional;
3634
import java.util.Set;
3735
import java.util.concurrent.TimeUnit;
3836
import java.util.function.Supplier;
@@ -101,20 +99,18 @@ public UniformNodeSelectorFactory(
10199
}
102100

103101
@Override
104-
public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle> catalogHandle)
102+
public NodeSelector createNodeSelector(Session session)
105103
{
106-
requireNonNull(catalogHandle, "catalogHandle is null");
107-
108104
// this supplier is thread-safe. TODO: this logic should probably move to the scheduler since the choice of which node to run in should be
109105
// done as close to when the split is about to be scheduled
110106
Supplier<NodeMap> nodeMap;
111107
if (nodeMapMemoizationDuration.toMillis() > 0) {
112108
nodeMap = Suppliers.memoizeWithExpiration(
113-
() -> createNodeMap(catalogHandle),
109+
this::createNodeMap,
114110
nodeMapMemoizationDuration.toMillis(), MILLISECONDS);
115111
}
116112
else {
117-
nodeMap = () -> createNodeMap(catalogHandle);
113+
nodeMap = this::createNodeMap;
118114
}
119115

120116
return new UniformNodeSelector(
@@ -131,11 +127,9 @@ public NodeSelector createNodeSelector(Session session, Optional<CatalogHandle>
131127
optimizedLocalScheduling);
132128
}
133129

134-
private NodeMap createNodeMap(Optional<CatalogHandle> catalogHandle)
130+
private NodeMap createNodeMap()
135131
{
136-
Set<InternalNode> nodes = catalogHandle
137-
.map(nodeManager::getActiveCatalogNodes)
138-
.orElseGet(() -> nodeManager.getNodes(ACTIVE));
132+
Set<InternalNode> nodes = nodeManager.getNodes(ACTIVE);
139133

140134
Set<String> coordinatorNodeIds = nodeManager.getCoordinators().stream()
141135
.map(InternalNode::getNodeIdentifier)

core/trino-main/src/main/java/io/trino/execution/scheduler/faulttolerant/BinPackingNodeAllocatorService.java

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,6 @@
1515

1616
import com.google.common.base.Stopwatch;
1717
import com.google.common.base.Ticker;
18-
import com.google.common.cache.CacheBuilder;
19-
import com.google.common.cache.CacheLoader;
2018
import com.google.common.collect.HashMultimap;
2119
import com.google.common.collect.ImmutableList;
2220
import com.google.common.collect.ImmutableMap;
@@ -34,7 +32,6 @@
3432
import io.airlift.stats.DistributionStat;
3533
import io.airlift.units.DataSize;
3634
import io.trino.Session;
37-
import io.trino.cache.NonEvictableLoadingCache;
3835
import io.trino.execution.TaskId;
3936
import io.trino.execution.scheduler.NodeSchedulerConfig;
4037
import io.trino.memory.ClusterMemoryManager;
@@ -46,7 +43,6 @@
4643
import io.trino.spi.HostAddress;
4744
import io.trino.spi.QueryId;
4845
import io.trino.spi.TrinoException;
49-
import io.trino.spi.connector.CatalogHandle;
5046
import io.trino.spi.memory.MemoryPoolInfo;
5147
import jakarta.annotation.PostConstruct;
5248
import jakarta.annotation.PreDestroy;
@@ -86,7 +82,6 @@
8682
import static com.google.common.collect.ImmutableList.toImmutableList;
8783
import static com.google.common.collect.Sets.newConcurrentHashSet;
8884
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
89-
import static io.trino.cache.SafeCaches.buildNonEvictableCache;
9085
import static io.trino.execution.scheduler.faulttolerant.TaskExecutionClass.EAGER_SPECULATIVE;
9186
import static io.trino.execution.scheduler.faulttolerant.TaskExecutionClass.SPECULATIVE;
9287
import static io.trino.execution.scheduler.faulttolerant.TaskExecutionClass.STANDARD;
@@ -773,8 +768,6 @@ private static class BinPackingSimulation
773768
private final Set<String> nodesWithoutMemory;
774769
private final Map<String, Long> nodesRemainingMemoryRuntimeAdjusted;
775770
private final Map<String, Long> speculativeMemoryReserved;
776-
private final NonEvictableLoadingCache<CatalogHandle, List<InternalNode>> catalogNodes;
777-
private final NonEvictableLoadingCache<CatalogHandle, List<InternalNode>> catalogWorkerNodes;
778771

779772
private final Map<String, MemoryPoolInfo> nodeMemoryPoolInfos;
780773
private final boolean scheduleOnCoordinator;
@@ -800,22 +793,6 @@ public BinPackingSimulation(
800793

801794
this.workerNodesSorted = allNodesSorted.stream().filter(node -> !node.isCoordinator()).collect(toImmutableList());
802795

803-
this.catalogNodes = buildNonEvictableCache(
804-
CacheBuilder.newBuilder(),
805-
CacheLoader.from(catalogHandle -> {
806-
List<InternalNode> nodes = new ArrayList<>(allNodesSorted);
807-
nodes.retainAll(nodesSnapshot.getConnectorNodes(catalogHandle));
808-
return nodes;
809-
}));
810-
811-
this.catalogWorkerNodes = buildNonEvictableCache(
812-
CacheBuilder.newBuilder(),
813-
CacheLoader.from(catalogHandle -> {
814-
List<InternalNode> nodes = new ArrayList<>(workerNodesSorted);
815-
nodes.retainAll(nodesSnapshot.getConnectorNodes(catalogHandle));
816-
return nodes;
817-
}));
818-
819796
allNodesByAddress = Multimaps.index(nodesSnapshot.getAllNodes(), InternalNode::getHostAndPort);
820797

821798
this.ignoreAcquiredSpeculative = ignoreAcquiredSpeculative;
@@ -904,16 +881,16 @@ public ReserveResult tryReserve(PendingAcquire acquire)
904881
Collection<InternalNode> preferred = allNodesByAddress.get(address.get());
905882
if ((!preferred.isEmpty() && acquire.getNotEnoughResourcesPeriod().compareTo(exhaustedNodeWaitPeriod) < 0) || !requirements.isRemotelyAccessible()) {
906883
// use preferred node if available
907-
candidates = getCandidatesWithCoordinator(requirements).stream().filter(preferred::contains).collect(toImmutableList());
884+
candidates = getCandidatesWithCoordinator().stream().filter(preferred::contains).collect(toImmutableList());
908885
}
909886
else {
910887
// use all nodes if we do not have preferences or waited to long
911-
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator(requirements) : getCandidatesExceptCoordinator(requirements);
888+
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator() : getCandidatesExceptCoordinator();
912889
}
913890
}
914891
else {
915892
// standard candidates
916-
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator(requirements) : getCandidatesExceptCoordinator(requirements);
893+
candidates = scheduleOnCoordinator ? getCandidatesWithCoordinator() : getCandidatesExceptCoordinator();
917894
}
918895

919896
if (candidates.isEmpty()) {
@@ -962,18 +939,14 @@ public ReserveResult tryReserve(PendingAcquire acquire)
962939
return ReserveResult.NOT_ENOUGH_RESOURCES_NOW;
963940
}
964941

965-
private List<InternalNode> getCandidatesExceptCoordinator(NodeRequirements requirements)
942+
private List<InternalNode> getCandidatesExceptCoordinator()
966943
{
967-
return requirements.getCatalogHandle()
968-
.map(catalogWorkerNodes::getUnchecked)
969-
.orElse(workerNodesSorted);
944+
return workerNodesSorted;
970945
}
971946

972-
private List<InternalNode> getCandidatesWithCoordinator(NodeRequirements requirements)
947+
private List<InternalNode> getCandidatesWithCoordinator()
973948
{
974-
return requirements.getCatalogHandle()
975-
.map(catalogNodes::getUnchecked)
976-
.orElse(allNodesSorted);
949+
return allNodesSorted;
977950
}
978951

979952
private Comparator<InternalNode> resolveTiesWithSpeculativeMemory(Comparator<InternalNode> comparator)

0 commit comments

Comments
 (0)