Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@

import static io.trino.metadata.MetadataUtil.TableMetadataBuilder.tableMetadataBuilder;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.DRAINED;
import static io.trino.metadata.NodeState.DRAINING;
import static io.trino.metadata.NodeState.INACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static io.trino.spi.connector.SystemTable.Distribution.SINGLE_COORDINATOR;
Expand Down Expand Up @@ -81,6 +83,9 @@ public RecordCursor cursor(ConnectorTransactionHandle transactionHandle, Connect
addRows(table, allNodes.getActiveNodes(), ACTIVE);
addRows(table, allNodes.getInactiveNodes(), INACTIVE);
addRows(table, allNodes.getShuttingDownNodes(), SHUTTING_DOWN);
addRows(table, allNodes.getDrainingNodes(), DRAINING);
addRows(table, allNodes.getDrainedNodes(), DRAINED);

return table.build().cursor();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@
import static io.trino.SystemSessionProperties.getRetryPolicy;
import static io.trino.SystemSessionProperties.resourceOvercommit;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.DRAINED;
import static io.trino.metadata.NodeState.DRAINING;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static io.trino.spi.StandardErrorCode.CLUSTER_OUT_OF_MEMORY;
import static java.lang.Math.min;
Expand Down Expand Up @@ -433,6 +435,8 @@ private synchronized void updateNodes()
Set<InternalNode> aliveNodes = builder
.addAll(nodeManager.getNodes(ACTIVE))
.addAll(nodeManager.getNodes(SHUTTING_DOWN))
.addAll(nodeManager.getNodes(DRAINING))
.addAll(nodeManager.getNodes(DRAINED))
.build();

ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
Expand Down
25 changes: 23 additions & 2 deletions core/trino-main/src/main/java/io/trino/metadata/AllNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,22 @@ public class AllNodes
{
private final Set<InternalNode> activeNodes;
private final Set<InternalNode> inactiveNodes;
private final Set<InternalNode> drainingNodes;
private final Set<InternalNode> drainedNodes;
private final Set<InternalNode> shuttingDownNodes;
private final Set<InternalNode> activeCoordinators;

public AllNodes(Set<InternalNode> activeNodes, Set<InternalNode> inactiveNodes, Set<InternalNode> shuttingDownNodes, Set<InternalNode> activeCoordinators)
public AllNodes(Set<InternalNode> activeNodes,
Set<InternalNode> inactiveNodes,
Set<InternalNode> drainingNodes,
Set<InternalNode> drainedNodes,
Set<InternalNode> shuttingDownNodes,
Set<InternalNode> activeCoordinators)
{
this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null"));
this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null"));
this.drainedNodes = ImmutableSet.copyOf(requireNonNull(drainedNodes, "drainedNodes is null"));
this.drainingNodes = ImmutableSet.copyOf(requireNonNull(drainingNodes, "drainingNodes is null"));
this.shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
this.activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
}
Expand All @@ -50,6 +59,16 @@ public Set<InternalNode> getShuttingDownNodes()
return shuttingDownNodes;
}

public Set<InternalNode> getDrainedNodes()
{
return drainedNodes;
}

public Set<InternalNode> getDrainingNodes()
{
return drainingNodes;
}

public Set<InternalNode> getActiveCoordinators()
{
return activeCoordinators;
Expand All @@ -67,13 +86,15 @@ public boolean equals(Object o)
AllNodes allNodes = (AllNodes) o;
return Objects.equals(activeNodes, allNodes.activeNodes) &&
Objects.equals(inactiveNodes, allNodes.inactiveNodes) &&
Objects.equals(drainedNodes, allNodes.drainedNodes) &&
Objects.equals(drainingNodes, allNodes.drainingNodes) &&
Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) &&
Objects.equals(activeCoordinators, allNodes.activeCoordinators);
}

@Override
public int hashCode()
{
return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators);
return Objects.hash(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, activeCoordinators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import com.google.errorprone.annotations.ThreadSafe;
import com.google.errorprone.annotations.concurrent.GuardedBy;
Expand Down Expand Up @@ -56,7 +55,6 @@
import static io.airlift.concurrent.Threads.daemonThreadsNamed;
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.trino.connector.system.GlobalSystemConnector.CATALOG_HANDLE;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.INACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -169,6 +167,8 @@ private void pollWorkers()
AllNodes allNodes = getAllNodes();
Set<InternalNode> aliveNodes = ImmutableSet.<InternalNode>builder()
.addAll(allNodes.getActiveNodes())
.addAll(allNodes.getDrainingNodes())
.addAll(allNodes.getDrainedNodes())
.addAll(allNodes.getShuttingDownNodes())
.build();

Expand Down Expand Up @@ -216,6 +216,8 @@ private synchronized void refreshNodesInternal()

ImmutableSet.Builder<InternalNode> activeNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> inactiveNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> drainingNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> drainedNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> shuttingDownNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> coordinatorsBuilder = ImmutableSet.builder();
ImmutableSetMultimap.Builder<CatalogHandle, InternalNode> byCatalogHandleBuilder = ImmutableSetMultimap.builder();
Expand Down Expand Up @@ -250,6 +252,12 @@ private synchronized void refreshNodesInternal()
case INACTIVE:
inactiveNodesBuilder.add(node);
break;
case DRAINING:
drainingNodesBuilder.add(node);
break;
case DRAINED:
drainedNodesBuilder.add(node);
break;
case SHUTTING_DOWN:
shuttingDownNodesBuilder.add(node);
break;
Expand All @@ -260,12 +268,20 @@ private synchronized void refreshNodesInternal()
}

Set<InternalNode> activeNodes = activeNodesBuilder.build();
Set<InternalNode> drainingNodes = drainingNodesBuilder.build();
Set<InternalNode> drainedNodes = drainedNodesBuilder.build();
Set<InternalNode> inactiveNodes = inactiveNodesBuilder.build();
Set<InternalNode> coordinators = coordinatorsBuilder.build();
Set<InternalNode> shuttingDownNodes = shuttingDownNodesBuilder.build();
if (allNodes != null) {
// log node that are no longer active (but not shutting down)
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), Sets.union(activeNodes, shuttingDownNodes));
Set<InternalNode> aliveNodes = ImmutableSet.<InternalNode>builder()
.addAll(activeNodes)
.addAll(drainingNodes)
.addAll(drainedNodes)
.addAll(shuttingDownNodes)
.build();
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), aliveNodes);
for (InternalNode missingNode : missingNodes) {
log.info("Previously active node is missing: %s (last seen at %s)", missingNode.getNodeIdentifier(), missingNode.getHost());
}
Expand All @@ -276,7 +292,7 @@ private synchronized void refreshNodesInternal()
activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build());
}

AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, shuttingDownNodes, coordinators);
AllNodes allNodes = new AllNodes(activeNodes, inactiveNodes, drainingNodes, drainedNodes, shuttingDownNodes, coordinators);
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
if (!allNodes.equals(this.allNodes)) {
// assign allNodes to a local variable for use in the callback below
Expand All @@ -292,21 +308,17 @@ private synchronized void refreshNodesInternal()
private NodeState getNodeState(InternalNode node)
{
if (expectedNodeVersion.equals(node.getNodeVersion())) {
if (isNodeShuttingDown(node.getNodeIdentifier())) {
return SHUTTING_DOWN;
}
return ACTIVE;
String nodeId = node.getNodeIdentifier();
// The empty case that is being set to a default value of ACTIVE is limited to the case where a node
// has announced itself but no state has yet been successfully retrieved. RemoteNodeState will retain
// the previously known state if any has been reported.
return Optional.ofNullable(nodeStates.get(nodeId))
.flatMap(RemoteNodeState::getNodeState)
.orElse(NodeState.ACTIVE);
}
return INACTIVE;
}

private boolean isNodeShuttingDown(String nodeId)
{
return Optional.ofNullable(nodeStates.get(nodeId))
.flatMap(RemoteNodeState::getNodeState)
.orElse(NodeState.ACTIVE) == SHUTTING_DOWN;
}

@Override
public synchronized AllNodes getAllNodes()
{
Expand All @@ -325,6 +337,18 @@ public int getInactiveNodeCount()
return getAllNodes().getInactiveNodes().size();
}

@Managed
public int getDrainingNodeCount()
{
return getAllNodes().getDrainingNodes().size();
}

@Managed
public int getDrainedNodeCount()
{
return getAllNodes().getDrainedNodes().size();
}

@Managed
public int getShuttingDownNodeCount()
{
Expand All @@ -337,6 +361,8 @@ public Set<InternalNode> getNodes(NodeState state)
return switch (state) {
case ACTIVE -> getAllNodes().getActiveNodes();
case INACTIVE -> getAllNodes().getInactiveNodes();
case DRAINING -> getAllNodes().getDrainingNodes();
case DRAINED -> getAllNodes().getDrainedNodes();
case SHUTTING_DOWN -> getAllNodes().getShuttingDownNodes();
};
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ public Set<InternalNode> getNodes(NodeState state)
{
return switch (state) {
case ACTIVE -> ImmutableSet.copyOf(allNodes);
case INACTIVE, SHUTTING_DOWN -> ImmutableSet.of();
case DRAINING, DRAINED, INACTIVE, SHUTTING_DOWN -> ImmutableSet.of();
};
}

Expand All @@ -84,6 +84,8 @@ public AllNodes getAllNodes()
ImmutableSet.copyOf(allNodes),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(CURRENT_NODE));
}

Expand Down
17 changes: 17 additions & 0 deletions core/trino-main/src/main/java/io/trino/metadata/NodeState.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,24 @@

public enum NodeState
{
/**
* Server is up and running ready to handle tasks
*/
ACTIVE,
/**
* Never used internally, might be used by discoveryNodeManager when communication error occurs
*/
INACTIVE,
/**
* A reversible graceful shutdown, can go to forward to DRAINED or back to ACTIVE.
*/
DRAINING,
/**
* All tasks are finished, server can be safely and quickly stopped. Can also go back to ACTIVE.
*/
DRAINED,
/**
* Graceful shutdown, non-reversible, when observed will drain and terminate
*/
SHUTTING_DOWN
}
Loading