Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
9b15fc1
Fix IntelliJ warnings in DiscoveryNodeManager
dain Jun 25, 2025
96a7208
Simplify coordinator classification in DiscoveryNodeManager
dain Jun 25, 2025
2ca0df6
Use enhanced switch statement in DiscoveryNodeManager
dain Jun 25, 2025
76b980d
Use ticker in TestDiscoveryNodeManager to eliminate wait times
dain Jun 25, 2025
7078ba5
Fix simulated node state in TestDiscoveryNodeManager
dain Jun 25, 2025
351d8cd
Fix bug in DiscoveryNodeManager where node initial state is ACTIVE
dain Jun 25, 2025
7598c40
Fix testing server startup
dain Jun 27, 2025
10efaeb
Convert TestDiscoveryNodeManager to simple parallel test
dain Jun 26, 2025
11c367d
Simplify DiscoveryNodeManager node registration
dain Jun 25, 2025
44cb85b
Add nodeId to ServerInfo
dain Jun 24, 2025
74d2c76
Simplify management of current InternalNode and NodeState
dain Jun 27, 2025
98cfe89
Bind InternalNode for current node
dain Jun 27, 2025
3e3390b
Load node information from remote server info
dain Jun 25, 2025
d4ca702
Add INVALID node state for nodes with wrong version or environment
dain Jun 26, 2025
91f7555
Abstract Airlift discovery from DiscoveryNodeManager
dain Jun 26, 2025
f6cb6d0
Update CatalogPruneTask to use InternalNodeManager instead of discovery
dain Jun 27, 2025
3438ba5
Move node management classes to new node module
dain Jun 26, 2025
e147255
Use TestingNodeManager in TestingConnectorContext
dain Jun 28, 2025
2db8d04
Rename InMemoryNodeManager to TestingInternalNodeManager
dain Jun 28, 2025
5fb6266
Use shared CURRENT_NODE constant in testing node managers
dain Jun 28, 2025
271dfb6
Remove InternalNodeManager getCurrentNode
dain Jun 28, 2025
bc31d5b
Split coordinator and worker node manager
dain Jun 28, 2025
658d05e
Use separate http client for CatalogPruneTask
dain Jul 1, 2025
08cf4cb
Add gone tracking to CoordinatorNodeManager
dain Jul 1, 2025
313285a
Simplify InternalNodeManager implementations with default methods
dain Jul 1, 2025
828fae4
Improve TestingInternalNodeManager internals to support all methods
dain Jul 1, 2025
8c1a5c2
Convert tests to use TestingInternalNodeManager
dain Jul 1, 2025
a844f0d
Convert AllNodes to a Java record
dain Jul 2, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ public class ServerInfo
// optional to maintain compatibility with older servers
private final Optional<Duration> uptime;
private final Optional<String> coordinatorId;
private final Optional<String> nodeId;

@JsonCreator
public ServerInfo(
Expand All @@ -43,14 +44,16 @@ public ServerInfo(
@JsonProperty("coordinator") boolean coordinator,
@JsonProperty("starting") boolean starting,
@JsonProperty("uptime") Optional<Duration> uptime,
@JsonProperty("coordinatorId") Optional<String> coordinatorId)
@JsonProperty("coordinatorId") Optional<String> coordinatorId,
@JsonProperty("nodeId") Optional<String> nodeId)
{
this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null");
this.environment = requireNonNull(environment, "environment is null");
this.coordinator = coordinator;
this.starting = starting;
this.uptime = requireNonNull(uptime, "uptime is null");
this.coordinatorId = requireNonNull(coordinatorId, "coordinatorId is null");
this.nodeId = requireNonNull(nodeId, "nodeId is null");
}

@JsonProperty
Expand Down Expand Up @@ -89,6 +92,12 @@ public Optional<String> getCoordinatorId()
return coordinatorId;
}

@JsonProperty
public Optional<String> getNodeId()
{
return nodeId;
}

@Override
public boolean equals(Object o)
{
Expand Down Expand Up @@ -119,6 +128,7 @@ public String toString()
.add("coordinator", coordinator)
.add("uptime", uptime.orElse(null))
.add("coordinatorId", coordinatorId.orElse(null))
.add("nodeId", nodeId.orElse(null))
.omitNullValues()
.toString();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,15 @@ public class TestServerInfo
@Test
public void testJsonRoundTrip()
{
assertJsonRoundTrip(new ServerInfo(UNKNOWN, "test", true, false, Optional.of(new Duration(2, MINUTES)), Optional.of("3sruz")));
assertJsonRoundTrip(new ServerInfo(UNKNOWN, "test", true, false, Optional.of(new Duration(2, MINUTES)), Optional.empty()));
assertJsonRoundTrip(new ServerInfo(UNKNOWN, "test", true, false, Optional.empty(), Optional.empty()));
assertJsonRoundTrip(new ServerInfo(UNKNOWN, "test", true, false, Optional.of(new Duration(2, MINUTES)), Optional.of("3sruz"), Optional.of("node-id")));
assertJsonRoundTrip(new ServerInfo(UNKNOWN, "test", true, false, Optional.of(new Duration(2, MINUTES)), Optional.empty(), Optional.of("node-id")));
assertJsonRoundTrip(new ServerInfo(UNKNOWN, "test", true, false, Optional.empty(), Optional.empty(), Optional.empty()));
}

@Test
public void testBackwardsCompatible()
{
ServerInfo newServerInfo = new ServerInfo(UNKNOWN, "test", true, false, Optional.empty(), Optional.empty());
ServerInfo newServerInfo = new ServerInfo(UNKNOWN, "test", true, false, Optional.empty(), Optional.empty(), Optional.empty());
ServerInfo legacyServerInfo = SERVER_INFO_CODEC.fromJson("{\"nodeVersion\":{\"version\":\"<unknown>\"},\"environment\":\"test\",\"coordinator\":true}");
assertThat(newServerInfo).isEqualTo(legacyServerInfo);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,17 @@
import com.google.common.collect.ImmutableSet;
import com.google.inject.Inject;
import io.airlift.concurrent.ThreadPoolExecutorMBean;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
import io.airlift.discovery.client.ServiceType;
import io.airlift.http.client.HttpClient;
import io.airlift.http.client.Request;
import io.airlift.http.client.Response;
import io.airlift.http.client.ResponseHandler;
import io.airlift.json.JsonCodec;
import io.airlift.log.Logger;
import io.airlift.node.NodeInfo;
import io.airlift.units.Duration;
import io.trino.metadata.CatalogManager;
import io.trino.metadata.ForNodeManager;
import io.trino.server.InternalCommunicationConfig;
import io.trino.node.AllNodes;
import io.trino.node.InternalNode;
import io.trino.node.InternalNodeManager;
import io.trino.spi.connector.CatalogHandle;
import io.trino.transaction.TransactionManager;
import jakarta.annotation.PostConstruct;
Expand All @@ -44,6 +41,7 @@
import java.util.Set;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Stream;

import static com.google.common.collect.ImmutableSet.toImmutableSet;
import static com.google.common.net.HttpHeaders.CONTENT_TYPE;
Expand All @@ -64,16 +62,15 @@ public class CatalogPruneTask
private final TransactionManager transactionManager;
private final CatalogManager catalogManager;
private final ConnectorServicesProvider connectorServicesProvider;
private final NodeInfo nodeInfo;
private final ServiceSelector selector;
private final InternalNode currentNode;
private final InternalNodeManager internalNodeManager;
private final HttpClient httpClient;

private final ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1, daemonThreadsNamed("catalog-prune"));
private final ThreadPoolExecutorMBean executorMBean = new ThreadPoolExecutorMBean(executor);

private final boolean enabled;
private final Duration updateInterval;
private final boolean httpsRequired;

private final AtomicBoolean started = new AtomicBoolean();

Expand All @@ -82,23 +79,20 @@ public CatalogPruneTask(
TransactionManager transactionManager,
CatalogManager catalogManager,
ConnectorServicesProvider connectorServicesProvider,
NodeInfo nodeInfo,
@ServiceType("trino") ServiceSelector selector,
@ForNodeManager HttpClient httpClient,
CatalogPruneTaskConfig catalogPruneTaskConfig,
InternalCommunicationConfig internalCommunicationConfig)
InternalNode currentNode,
InternalNodeManager internalNodeManager,
@ForCatalogPrune HttpClient httpClient,
CatalogPruneTaskConfig catalogPruneTaskConfig)
{
this.transactionManager = requireNonNull(transactionManager, "transactionManager is null");
this.catalogManager = requireNonNull(catalogManager, "catalogManager is null");
this.connectorServicesProvider = requireNonNull(connectorServicesProvider, "connectorServicesProvider is null");
this.nodeInfo = requireNonNull(nodeInfo, "nodeInfo is null");
this.selector = requireNonNull(selector, "selector is null");
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.internalNodeManager = requireNonNull(internalNodeManager, "internalNodeManager is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");

this.enabled = catalogPruneTaskConfig.isEnabled();
updateInterval = catalogPruneTaskConfig.getUpdateInterval();

this.httpsRequired = internalCommunicationConfig.isHttpsRequired();
}

@PostConstruct
Expand Down Expand Up @@ -133,8 +127,11 @@ public ThreadPoolExecutorMBean getExecutor()
@VisibleForTesting
public void pruneWorkerCatalogs()
{
Set<ServiceDescriptor> online = selector.selectAllServices().stream()
.filter(descriptor -> !nodeInfo.getNodeId().equals(descriptor.getNodeId()))
AllNodes allNodes = internalNodeManager.getAllNodes();
Set<URI> online = Stream.of(allNodes.activeNodes(), allNodes.inactiveNodes(), allNodes.drainingNodes(), allNodes.drainedNodes(), allNodes.shuttingDownNodes())
.flatMap(Set::stream)
.map(InternalNode::getInternalUri)
.filter(uri -> !uri.equals(currentNode.getInternalUri()))
.collect(toImmutableSet());

// send message to workers to trigger prune
Expand All @@ -145,13 +142,9 @@ public void pruneWorkerCatalogs()
connectorServicesProvider.pruneCatalogs(ImmutableSet.of());
}

void pruneWorkerCatalogs(Set<ServiceDescriptor> online, List<CatalogHandle> activeCatalogs)
void pruneWorkerCatalogs(Set<URI> online, List<CatalogHandle> activeCatalogs)
{
for (ServiceDescriptor service : online) {
URI uri = getHttpUri(service);
if (uri == null) {
continue;
}
for (URI uri : online) {
uri = uriBuilderFrom(uri).appendPath("/v1/task/pruneCatalogs").build();
Request request = preparePost()
.setUri(uri)
Expand Down Expand Up @@ -186,13 +179,4 @@ private List<CatalogHandle> getActiveCatalogs()
activeCatalogs.addAll(catalogManager.getActiveCatalogs());
return ImmutableList.copyOf(activeCatalogs.build());
}

private URI getHttpUri(ServiceDescriptor descriptor)
{
String url = descriptor.getProperties().get(httpsRequired ? "https" : "http");
if (url != null) {
return URI.create(url);
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,9 @@
import io.trino.connector.system.SystemConnector;
import io.trino.connector.system.SystemTablesProvider;
import io.trino.execution.scheduler.NodeSchedulerConfig;
import io.trino.metadata.InternalNodeManager;
import io.trino.metadata.Metadata;
import io.trino.node.InternalNode;
import io.trino.node.InternalNodeManager;
import io.trino.security.AccessControl;
import io.trino.spi.PageIndexerFactory;
import io.trino.spi.PageSorter;
Expand Down Expand Up @@ -58,6 +59,7 @@ public class DefaultCatalogFactory
private final Metadata metadata;
private final AccessControl accessControl;

private final InternalNode currentNode;
private final InternalNodeManager nodeManager;
private final PageSorter pageSorter;
private final PageIndexerFactory pageIndexerFactory;
Expand All @@ -76,6 +78,7 @@ public class DefaultCatalogFactory
public DefaultCatalogFactory(
Metadata metadata,
AccessControl accessControl,
InternalNode currentNode,
InternalNodeManager nodeManager,
PageSorter pageSorter,
PageIndexerFactory pageIndexerFactory,
Expand All @@ -89,6 +92,7 @@ public DefaultCatalogFactory(
{
this.metadata = requireNonNull(metadata, "metadata is null");
this.accessControl = requireNonNull(accessControl, "accessControl is null");
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.pageSorter = requireNonNull(pageSorter, "pageSorter is null");
this.pageIndexerFactory = requireNonNull(pageIndexerFactory, "pageIndexerFactory is null");
Expand Down Expand Up @@ -147,13 +151,13 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
createInformationSchemaCatalogHandle(catalogHandle),
new InformationSchemaConnector(
catalogHandle.getCatalogName().toString(),
nodeManager,
currentNode,
metadata,
accessControl,
maxPrefetchedInformationSchemaPrefixes));

SystemTablesProvider systemTablesProvider;
if (nodeManager.getCurrentNode().isCoordinator()) {
if (currentNode.isCoordinator()) {
systemTablesProvider = new CoordinatorSystemTablesProvider(
transactionManager,
metadata,
Expand All @@ -168,6 +172,7 @@ private CatalogConnector createCatalog(CatalogHandle catalogHandle, ConnectorNam
tracer,
createSystemTablesCatalogHandle(catalogHandle),
new SystemConnector(
currentNode,
nodeManager,
systemTablesProvider,
transactionId -> transactionManager.getConnectorTransaction(transactionId, catalogHandle),
Expand All @@ -193,7 +198,7 @@ private Connector createConnector(
catalogHandle,
openTelemetry,
createTracer(catalogHandle),
new DefaultNodeManager(nodeManager, schedulerIncludeCoordinator),
new DefaultNodeManager(currentNode, nodeManager, schedulerIncludeCoordinator),
versionEmbedder,
typeManager,
new InternalMetadataProvider(metadata, typeManager),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,23 +14,26 @@
package io.trino.connector;

import com.google.common.collect.ImmutableSet;
import io.trino.metadata.InternalNodeManager;
import io.trino.node.InternalNode;
import io.trino.node.InternalNodeManager;
import io.trino.spi.Node;
import io.trino.spi.NodeManager;

import java.util.Set;

import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.node.NodeState.ACTIVE;
import static java.util.Objects.requireNonNull;

public class DefaultNodeManager
implements NodeManager
{
private final InternalNode currentNode;
private final InternalNodeManager nodeManager;
private final boolean schedulerIncludeCoordinator;

public DefaultNodeManager(InternalNodeManager nodeManager, boolean schedulerIncludeCoordinator)
public DefaultNodeManager(InternalNode currentNode, InternalNodeManager nodeManager, boolean schedulerIncludeCoordinator)
{
this.currentNode = requireNonNull(currentNode, "currentNode is null");
this.nodeManager = requireNonNull(nodeManager, "nodeManager is null");
this.schedulerIncludeCoordinator = schedulerIncludeCoordinator;
}
Expand All @@ -42,7 +45,7 @@ public Set<Node> getAllNodes()
.addAll(nodeManager.getNodes(ACTIVE))
// append current node (before connector is registered with the node
// in the discovery service) since current node should have connector always loaded
.add(nodeManager.getCurrentNode())
.add(currentNode)
.build();
}

Expand All @@ -55,17 +58,17 @@ public Set<Node> getWorkerNodes()
nodeManager.getNodes(ACTIVE).stream()
.filter(node -> !node.isCoordinator() || schedulerIncludeCoordinator)
.forEach(nodes::add);
if (!nodeManager.getCurrentNode().isCoordinator() || schedulerIncludeCoordinator) {
if (!currentNode.isCoordinator() || schedulerIncludeCoordinator) {
// append current node (before connector is registered with the node
// in discovery service) since current node should have connector always loaded
nodes.add(getCurrentNode());
nodes.add(currentNode);
}
return nodes.build();
}

@Override
public Node getCurrentNode()
{
return nodeManager.getCurrentNode();
return currentNode;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,15 @@
import com.google.inject.Inject;
import com.google.inject.Scopes;
import io.airlift.configuration.AbstractConfigurationAwareModule;
import io.airlift.units.Duration;
import io.trino.connector.system.GlobalSystemConnector;
import io.trino.metadata.CatalogManager;
import io.trino.server.ServerConfig;
import io.trino.spi.catalog.CatalogStore;

import static io.airlift.configuration.ConfigBinder.configBinder;
import static io.trino.server.InternalCommunicationHttpClientModule.internalHttpClientModule;
import static java.util.concurrent.TimeUnit.SECONDS;

public class DynamicCatalogManagerModule
extends AbstractConfigurationAwareModule
Expand All @@ -41,6 +44,11 @@ protected void setup(Binder binder)

configBinder(binder).bindConfig(CatalogPruneTaskConfig.class);
binder.bind(CatalogPruneTask.class).in(Scopes.SINGLETON);
install(internalHttpClientModule("catalog-prune", ForCatalogPrune.class)
.withConfigDefaults(config -> {
config.setIdleTimeout(new Duration(30, SECONDS));
config.setRequestTimeout(new Duration(10, SECONDS));
}).build());
}
else {
binder.bind(WorkerDynamicCatalogManager.class).in(Scopes.SINGLETON);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.connector;

import com.google.inject.BindingAnnotation;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@BindingAnnotation
public @interface ForCatalogPrune
{
}
Loading