diff --git a/core/trino-main/src/main/java/io/trino/metadata/AllNodes.java b/core/trino-main/src/main/java/io/trino/metadata/AllNodes.java index cecf16b9c9e7..532af85693dc 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/AllNodes.java +++ b/core/trino-main/src/main/java/io/trino/metadata/AllNodes.java @@ -23,16 +23,32 @@ public class AllNodes { private final Set activeNodes; + private final Set decommissionedNodes; + private final Set decommissioningNodes; private final Set inactiveNodes; private final Set shuttingDownNodes; private final Set activeCoordinators; + private final Set aliveNodes; - public AllNodes(Set activeNodes, Set inactiveNodes, Set shuttingDownNodes, Set activeCoordinators) + public AllNodes(Set activeNodes, + Set decommissionedNodes, + Set decommissioningNodes, + Set inactiveNodes, + Set shuttingDownNodes, + Set activeCoordinators) { this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null")); + this.decommissionedNodes = ImmutableSet.copyOf(requireNonNull(decommissionedNodes, "decommissionedNodes is null")); + this.decommissioningNodes = ImmutableSet.copyOf(requireNonNull(decommissioningNodes, "decommissioningNodes is null")); this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null")); this.shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null")); this.activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null")); + this.aliveNodes = ImmutableSet.builder() + .addAll(activeNodes) + .addAll(decommissionedNodes) + .addAll(decommissioningNodes) + .addAll(shuttingDownNodes) + .build(); } public Set getActiveNodes() @@ -40,6 +56,16 @@ public Set getActiveNodes() return activeNodes; } + public Set getDecommissionedNodes() + { + return decommissionedNodes; + } + + public Set getDecommissioningNodes() + { + return decommissioningNodes; + } + public Set getInactiveNodes() { return inactiveNodes; @@ -50,6 +76,11 @@ public Set getShuttingDownNodes() return shuttingDownNodes; } + public Set getAliveNodes() + { + return aliveNodes; + } + public Set getActiveCoordinators() { return activeCoordinators; @@ -66,6 +97,8 @@ public boolean equals(Object o) } AllNodes allNodes = (AllNodes) o; return Objects.equals(activeNodes, allNodes.activeNodes) && + Objects.equals(decommissionedNodes, allNodes.decommissionedNodes) && + Objects.equals(decommissioningNodes, allNodes.decommissioningNodes) && Objects.equals(inactiveNodes, allNodes.inactiveNodes) && Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) && Objects.equals(activeCoordinators, allNodes.activeCoordinators); @@ -74,6 +107,12 @@ public boolean equals(Object o) @Override public int hashCode() { - return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators); + return Objects.hash( + activeNodes, + inactiveNodes, + shuttingDownNodes, + decommissioningNodes, + decommissionedNodes, + activeCoordinators); } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java b/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java index 394bdb8a6bc2..2a106e28bd9e 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableSet; import com.google.common.collect.ImmutableSetMultimap; import com.google.common.collect.SetMultimap; -import com.google.common.collect.Sets; import com.google.common.collect.Sets.SetView; import io.airlift.discovery.client.ServiceDescriptor; import io.airlift.discovery.client.ServiceSelector; @@ -43,6 +42,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; import java.util.Optional; import java.util.Set; @@ -59,6 +59,8 @@ import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; import static io.trino.connector.system.GlobalSystemConnector.CATALOG_HANDLE; import static io.trino.metadata.NodeState.ACTIVE; +import static io.trino.metadata.NodeState.DECOMMISSIONED; +import static io.trino.metadata.NodeState.DECOMMISSIONING; import static io.trino.metadata.NodeState.INACTIVE; import static io.trino.metadata.NodeState.SHUTTING_DOWN; import static java.util.Locale.ENGLISH; @@ -93,6 +95,9 @@ public final class DiscoveryNodeManager @GuardedBy("this") private Set coordinators; + @GuardedBy("this") + private Set nodesToBeDecommissioned = new HashSet<>(); + @GuardedBy("this") private final List> listeners = new ArrayList<>(); @@ -169,10 +174,7 @@ public void destroy() private void pollWorkers() { AllNodes allNodes = getAllNodes(); - Set aliveNodes = ImmutableSet.builder() - .addAll(allNodes.getActiveNodes()) - .addAll(allNodes.getShuttingDownNodes()) - .build(); + Set aliveNodes = allNodes.getAliveNodes(); ImmutableSet aliveNodeIds = aliveNodes.stream() .map(InternalNode::getNodeIdentifier) @@ -215,9 +217,7 @@ private synchronized void refreshNodesInternal() .filter(service -> !failureDetector.getFailed().contains(service)) .collect(toImmutableSet()); - ImmutableSet.Builder activeNodesBuilder = ImmutableSet.builder(); - ImmutableSet.Builder inactiveNodesBuilder = ImmutableSet.builder(); - ImmutableSet.Builder shuttingDownNodesBuilder = ImmutableSet.builder(); + ImmutableSetMultimap.Builder nodeStateMapBuilder = ImmutableSetMultimap.builder(); ImmutableSet.Builder coordinatorsBuilder = ImmutableSet.builder(); ImmutableSetMultimap.Builder byCatalogHandleBuilder = ImmutableSetMultimap.builder(); @@ -229,55 +229,65 @@ private synchronized void refreshNodesInternal() InternalNode node = new InternalNode(service.getNodeId(), uri, nodeVersion, coordinator); NodeState nodeState = getNodeState(node); - switch (nodeState) { - case ACTIVE: - activeNodesBuilder.add(node); - if (coordinator) { - coordinatorsBuilder.add(node); - } + // nodesToBeDecommissioned is the authoritative list of node to be decommissioned + // from coordinator perspective. Once a worker appears in the list, + // its state become DECOMMISSIONING even if worker has yet confirmed such + // so that no new tasks will be scheduled on it. + if (!coordinator && nodesToBeDecommissioned.contains(node.getNodeIdentifier()) + && nodeState == ACTIVE) { + log.debug("Treat " + node.getNodeIdentifier() + " as DECOMMISSIONING"); + nodeState = DECOMMISSIONING; + } - // record available active nodes organized by catalog handle - String catalogHandleIds = service.getProperties().get("catalogHandleIds"); - if (catalogHandleIds != null) { - catalogHandleIds = catalogHandleIds.toLowerCase(ENGLISH); - for (String catalogHandleId : CATALOG_HANDLE_ID_SPLITTER.split(catalogHandleIds)) { - byCatalogHandleBuilder.put(CatalogHandle.fromId(catalogHandleId), node); - } + // Add node to node state map. + nodeStateMapBuilder.put(nodeState, node); + + if (nodeState == ACTIVE) { + if (coordinator) { + coordinatorsBuilder.add(node); + } + + // record available active nodes organized by catalog handle + String catalogHandleIds = service.getProperties().get("catalogHandleIds"); + if (catalogHandleIds != null) { + catalogHandleIds = catalogHandleIds.toLowerCase(ENGLISH); + for (String catalogHandleId : CATALOG_HANDLE_ID_SPLITTER.split(catalogHandleIds)) { + byCatalogHandleBuilder.put(CatalogHandle.fromId(catalogHandleId), node); } + } - // always add system connector - byCatalogHandleBuilder.put(CATALOG_HANDLE, node); - break; - case INACTIVE: - inactiveNodesBuilder.add(node); - break; - case SHUTTING_DOWN: - shuttingDownNodesBuilder.add(node); - break; - default: - log.error("Unknown state %s for node %s", nodeState, node); + // always add system connector + byCatalogHandleBuilder.put(CATALOG_HANDLE, node); } } } - if (allNodes != null) { + // nodes by catalog handle changes anytime a node adds or removes a catalog (note: this is not part of the listener system) + if (!allCatalogsOnAllNodes) { + activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build()); + } + + SetMultimap nodeStateMap = nodeStateMapBuilder.build(); + AllNodes currAllNodes = new AllNodes( + nodeStateMap.get(ACTIVE), + nodeStateMap.get(DECOMMISSIONED), + nodeStateMap.get(DECOMMISSIONING), + nodeStateMap.get(INACTIVE), + nodeStateMap.get(SHUTTING_DOWN), + coordinatorsBuilder.build()); + + if (this.allNodes != null) { // log node that are no longer active (but not shutting down) - SetView missingNodes = difference(allNodes.getActiveNodes(), Sets.union(activeNodesBuilder.build(), shuttingDownNodesBuilder.build())); + SetView missingNodes = difference(allNodes.getActiveNodes(), currAllNodes.getAliveNodes()); for (InternalNode missingNode : missingNodes) { log.info("Previously active node is missing: %s (last seen at %s)", missingNode.getNodeIdentifier(), missingNode.getHost()); } } - // nodes by catalog handle changes anytime a node adds or removes a catalog (note: this is not part of the listener system) - if (!allCatalogsOnAllNodes) { - activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build()); - } - - AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build()); // only update if all nodes actually changed (note: this does not include the connectors registered with the nodes) - if (!allNodes.equals(this.allNodes)) { + if (!currAllNodes.equals(this.allNodes)) { // assign allNodes to a local variable for use in the callback below - this.allNodes = allNodes; + this.allNodes = currAllNodes; coordinators = coordinatorsBuilder.build(); // notify listeners @@ -289,22 +299,19 @@ private synchronized void refreshNodesInternal() private NodeState getNodeState(InternalNode node) { if (expectedNodeVersion.equals(node.getNodeVersion())) { - if (isNodeShuttingDown(node.getNodeIdentifier())) { - return SHUTTING_DOWN; + String nodeId = node.getNodeIdentifier(); + Optional remoteNodeState = nodeStates.containsKey(nodeId) + ? nodeStates.get(nodeId).getNodeState() + : Optional.empty(); + if (remoteNodeState.isPresent()) { + return remoteNodeState.get(); } + // no remote node state return ACTIVE; } return INACTIVE; } - private boolean isNodeShuttingDown(String nodeId) - { - Optional remoteNodeState = nodeStates.containsKey(nodeId) - ? nodeStates.get(nodeId).getNodeState() - : Optional.empty(); - return remoteNodeState.isPresent() && remoteNodeState.get() == SHUTTING_DOWN; - } - @Override public synchronized AllNodes getAllNodes() { @@ -317,6 +324,18 @@ public int getActiveNodeCount() return getAllNodes().getActiveNodes().size(); } + @Managed + public int getDecommissionedNodeCount() + { + return getAllNodes().getDecommissionedNodes().size(); + } + + @Managed + public int getDecommissioningNodeCount() + { + return getAllNodes().getDecommissioningNodes().size(); + } + @Managed public int getInactiveNodeCount() { @@ -335,6 +354,10 @@ public Set getNodes(NodeState state) switch (state) { case ACTIVE: return getAllNodes().getActiveNodes(); + case DECOMMISSIONED: + return getAllNodes().getDecommissionedNodes(); + case DECOMMISSIONING: + return getAllNodes().getDecommissioningNodes(); case INACTIVE: return getAllNodes().getInactiveNodes(); case SHUTTING_DOWN: @@ -407,4 +430,9 @@ private static boolean isCoordinator(ServiceDescriptor service) { return Boolean.parseBoolean(service.getProperties().get("coordinator")); } + + public synchronized void setNodesToExclude(Set nodesToExclude) + { + this.nodesToBeDecommissioned = nodesToExclude; + } } diff --git a/core/trino-main/src/main/java/io/trino/metadata/InMemoryNodeManager.java b/core/trino-main/src/main/java/io/trino/metadata/InMemoryNodeManager.java index a2e17c960d30..751ea9e91134 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/InMemoryNodeManager.java +++ b/core/trino-main/src/main/java/io/trino/metadata/InMemoryNodeManager.java @@ -59,7 +59,11 @@ public Set getNodes(NodeState state) { switch (state) { case ACTIVE: - return allNodes; + return getAllNodes().getActiveNodes(); + case DECOMMISSIONED: + return getAllNodes().getDecommissionedNodes(); + case DECOMMISSIONING: + return getAllNodes().getDecommissioningNodes(); case INACTIVE: case SHUTTING_DOWN: return ImmutableSet.of(); @@ -86,6 +90,8 @@ public AllNodes getAllNodes() allNodes, ImmutableSet.of(), ImmutableSet.of(), + ImmutableSet.of(), + ImmutableSet.of(), ImmutableSet.of(CURRENT_NODE)); } diff --git a/core/trino-main/src/main/java/io/trino/metadata/NodeState.java b/core/trino-main/src/main/java/io/trino/metadata/NodeState.java index 4bf511968a31..286ad269268e 100644 --- a/core/trino-main/src/main/java/io/trino/metadata/NodeState.java +++ b/core/trino-main/src/main/java/io/trino/metadata/NodeState.java @@ -16,6 +16,8 @@ public enum NodeState { ACTIVE, + DECOMMISSIONED, + DECOMMISSIONING, INACTIVE, SHUTTING_DOWN } diff --git a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java index 610d3b09a11c..da50f559de08 100644 --- a/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java +++ b/core/trino-main/src/main/java/io/trino/server/CoordinatorModule.java @@ -341,6 +341,15 @@ protected void setup(Binder binder) install(new QueryExecutionFactoryModule()); + // nodes and queries for monitoring and auto-scaling + jaxrsBinder(binder).bind(NodesResource.class); + httpClientBinder(binder).bindHttpClient("nodes", ForNodes.class) + .withTracing() + .withConfigDefaults(config -> { + config.setIdleTimeout(new Duration(30, SECONDS)); + config.setRequestTimeout(new Duration(10, SECONDS)); + }); + // cleanup binder.bind(ExecutorCleanup.class).asEagerSingleton(); } diff --git a/core/trino-main/src/main/java/io/trino/server/ForNodes.java b/core/trino-main/src/main/java/io/trino/server/ForNodes.java new file mode 100644 index 000000000000..c6eb6c3a05c2 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/server/ForNodes.java @@ -0,0 +1,31 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.server; + +import javax.inject.Qualifier; + +import java.lang.annotation.Retention; +import java.lang.annotation.Target; + +import static java.lang.annotation.ElementType.FIELD; +import static java.lang.annotation.ElementType.METHOD; +import static java.lang.annotation.ElementType.PARAMETER; +import static java.lang.annotation.RetentionPolicy.RUNTIME; + +@Retention(RUNTIME) +@Target({FIELD, PARAMETER, METHOD}) +@Qualifier +public @interface ForNodes +{ +} diff --git a/core/trino-main/src/main/java/io/trino/server/NodeStatus.java b/core/trino-main/src/main/java/io/trino/server/NodeStatus.java index cce607262301..c331ed918e03 100644 --- a/core/trino-main/src/main/java/io/trino/server/NodeStatus.java +++ b/core/trino-main/src/main/java/io/trino/server/NodeStatus.java @@ -37,6 +37,7 @@ public class NodeStatus private final long heapUsed; private final long heapAvailable; private final long nonHeapUsed; + private final long startTimeEpoch; @JsonCreator public NodeStatus( @@ -53,7 +54,8 @@ public NodeStatus( @JsonProperty("systemCpuLoad") double systemCpuLoad, @JsonProperty("heapUsed") long heapUsed, @JsonProperty("heapAvailable") long heapAvailable, - @JsonProperty("nonHeapUsed") long nonHeapUsed) + @JsonProperty("nonHeapUsed") long nonHeapUsed, + @JsonProperty("startTimeEpoch") long startTimeEpoch) { this.nodeId = requireNonNull(nodeId, "nodeId is null"); this.nodeVersion = requireNonNull(nodeVersion, "nodeVersion is null"); @@ -69,6 +71,7 @@ public NodeStatus( this.heapUsed = heapUsed; this.heapAvailable = heapAvailable; this.nonHeapUsed = nonHeapUsed; + this.startTimeEpoch = startTimeEpoch; } @JsonProperty @@ -154,4 +157,10 @@ public long getNonHeapUsed() { return nonHeapUsed; } + + @JsonProperty + public long getStartTimeEpoch() + { + return startTimeEpoch; + } } diff --git a/core/trino-main/src/main/java/io/trino/server/NodesResource.java b/core/trino-main/src/main/java/io/trino/server/NodesResource.java new file mode 100644 index 000000000000..bcfe63bcdc49 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/server/NodesResource.java @@ -0,0 +1,389 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.server; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.base.Joiner; +import com.google.common.collect.ImmutableSet; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import io.airlift.http.client.BodyGenerator; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.HttpClient.HttpResponseFuture; +import io.airlift.http.client.Request; +import io.airlift.http.client.StaticBodyGenerator; +import io.airlift.http.client.StatusResponseHandler; +import io.airlift.http.client.StatusResponseHandler.StatusResponse; +import io.airlift.log.Logger; +import io.trino.metadata.AllNodes; +import io.trino.metadata.DiscoveryNodeManager; +import io.trino.metadata.InternalNode; +import io.trino.metadata.NodeState; +import io.trino.server.security.ResourceSecurity; + +import javax.annotation.Nullable; +import javax.annotation.PostConstruct; +import javax.annotation.PreDestroy; +import javax.inject.Inject; +import javax.ws.rs.Consumes; +import javax.ws.rs.GET; +import javax.ws.rs.PUT; +import javax.ws.rs.Path; +import javax.ws.rs.Produces; +import javax.ws.rs.core.Response; + +import java.net.URI; +import java.net.URISyntaxException; +import java.nio.charset.Charset; +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import java.util.TreeMap; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import static com.google.common.collect.ImmutableSet.toImmutableSet; +import static com.google.common.collect.Sets.difference; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.airlift.concurrent.Threads.threadsNamed; +import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static io.airlift.http.client.Request.Builder.preparePut; +import static io.trino.metadata.NodeState.ACTIVE; +import static io.trino.metadata.NodeState.DECOMMISSIONED; +import static io.trino.metadata.NodeState.DECOMMISSIONING; +import static io.trino.metadata.NodeState.INACTIVE; +import static io.trino.metadata.NodeState.SHUTTING_DOWN; +import static io.trino.server.security.ResourceSecurity.AccessType.PUBLIC; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; +import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; +import static javax.ws.rs.core.MediaType.TEXT_PLAIN; + +// NodesResource expose coordinator endpoints to facilitate the auto-scaling of cluster. +// These endpoints include: +// 1. /v1/nodes --- list of all alive nodes with NodeState and NodeStatus; +// 2. /v1/nodes/refreshnodes --- refresh with list of nodes to exclude (decommission); +@Path("/v1/nodes") +public class NodesResource +{ + private static Logger log = Logger.get(NodesResource.class); + + private final DiscoveryNodeManager nodeManager; + private final HttpClient httpClient; + + // Set of worker nodes to exclude (decommission). + Set nodesToExclude = new HashSet<>(); + + // Executor to periodically and asynchronously poll NodeStatus of all workers. + private final ScheduledExecutorService nodeStatusExecutor; + + // Poll worker status once every 15 seconds, a balance between freshness and cost. + private static final int POLL_NODESTATUS_SEC = 15; + + // Map from NodeId to RemoteNodeStatus. + private final ConcurrentHashMap nodeStatuses = new ConcurrentHashMap<>(); + + private AtomicInteger numRefreshNodes = new AtomicInteger(); + private AtomicInteger numUpdateStateOk = new AtomicInteger(); + private AtomicInteger numUpdateStateFailed = new AtomicInteger(); + + @Inject + public NodesResource(DiscoveryNodeManager nodeManager, @ForNodes HttpClient httpClient) + { + log.info("Construct NodesResource"); + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.httpClient = httpClient; + this.nodeStatusExecutor = newSingleThreadScheduledExecutor(threadsNamed("autoscale-executor-%s")); + } + + @PostConstruct + public void startPollingNodeStatus() + { + nodeStatusExecutor.scheduleWithFixedDelay(() -> { + try { + pollWorkers(); + } + catch (Exception e) { + log.error(e, "Error polling state of nodes"); + } + }, 5, POLL_NODESTATUS_SEC, TimeUnit.SECONDS); + pollWorkers(); + } + + // Poll /v1/status of all alive workers. + private void pollWorkers() + { + AllNodes allNodes = nodeManager.getAllNodes(); + + Set aliveNodes = allNodes.getAliveNodes(); + + ImmutableSet aliveNodeIds = aliveNodes.stream() + .map(InternalNode::getNodeIdentifier) + .collect(toImmutableSet()); + + Set deadNodes = difference(nodeStatuses.keySet(), aliveNodeIds).immutableCopy(); + nodeStatuses.keySet().removeAll(deadNodes); + + // Add new nodes + for (InternalNode node : aliveNodes) { + URI statusUri = uriBuilderFrom(node.getInternalUri()).appendPath("/v1/status").build(); + nodeStatuses.putIfAbsent(node.getNodeIdentifier(), new RemoteNodeStatus(httpClient, statusUri)); + } + + // Schedule refresh + nodeStatuses.values().forEach(RemoteNodeStatus::asyncRefresh); + } + + @PreDestroy + public void stop() + { + nodeStatusExecutor.shutdownNow(); + } + + // Gets list of all nodes where each is modeled as NodeInfo. + @ResourceSecurity(PUBLIC) + @GET + @Produces(APPLICATION_JSON) + public List getNodes() + { + final AllNodes nodes = nodeManager.getAllNodes(); + TreeMap asmp = new TreeMap<>(); + addToNodeMap(nodes.getActiveNodes(), ACTIVE, asmp); + addToNodeMap(nodes.getShuttingDownNodes(), SHUTTING_DOWN, asmp); + addToNodeMap(nodes.getInactiveNodes(), INACTIVE, asmp); + addToNodeMap(nodes.getDecommissioningNodes(), DECOMMISSIONING, asmp); + addToNodeMap(nodes.getDecommissionedNodes(), DECOMMISSIONED, asmp); + return new ArrayList<>(asmp.values()); + } + + // NodeInfo is a bundle of (InternalNode, NodeState, NodeStatus) + // where NodeStatus is polled from v1/status of the node. + public static class NodeInfo + { + private final String nodeId; + private final String uri; + private final boolean coordinator; + private final NodeState state; + private final NodeStatus status; + private final long statusTime; + + @JsonCreator + public NodeInfo( + @JsonProperty("nodeId") String nodeId, + @JsonProperty("uri") String uri, + @JsonProperty("coordinator") boolean coordinator, + @JsonProperty("state") NodeState state, + @JsonProperty("status") NodeStatus status, + @JsonProperty("statusTime") long statusTime) + { + this.nodeId = requireNonNull(nodeId, "nodeId is null"); + this.uri = uri; + this.coordinator = coordinator; + this.state = state; + this.status = status; + this.statusTime = statusTime; + } + + @JsonProperty + public String getNodeId() + { + return nodeId; + } + + @JsonProperty + public String getUri() + { + return uri; + } + + @JsonProperty + public boolean getCoordinator() + { + return coordinator; + } + + @JsonProperty + public NodeState getState() + { + return state; + } + + @JsonProperty + public NodeStatus getStatus() + { + return status; + } + + @JsonProperty + public long getStatusTime() + { + return statusTime; + } + } + + // Given an absolute list of nodes to exclude (a.k.a. decommission), which means: + // 1. The desired state for worker appear in the exclude list is DECOMMISSIONED; + // 2. The desired state for worker that does not appear in the exclude list is ACTIVE; + // Initiate decommission/recommission actions as appropriate to have all worker nodes + // move toward their desired state. Specifically: + // 1. A worker to exclude will be honored within seconds with no new task dispatch. + // asyncUpdateState will be called for the worker to wait for pending tasks and + // later report as DECOMMISSIONED upon completion. + // 2. A worker that was previously excluded but no longer will qualify within seconds + // for new task dispatch. asyncUpdateState will be called for the worker to be + // back to ACTIVE. + @ResourceSecurity(PUBLIC) + @PUT + @Path("refreshnodes") + @Consumes(APPLICATION_JSON) + @Produces(TEXT_PLAIN) + public Response refreshNodes(List exclude) + { + numRefreshNodes.incrementAndGet(); + log.info(numRefreshNodes.get() + " refreshNodes " + Joiner.on(',').join(exclude)); + + TreeMap asnm = getId2NodeInfoMap(); + // Assume nodesToExclude are comma separated list of nodeIds + Set nodesToExclude = parseNodesToExclude(exclude, asnm.keySet()); + if (!nodesToExclude.equals(this.nodesToExclude)) { + this.nodesToExclude = nodesToExclude; + nodeManager.setNodesToExclude(nodesToExclude); + } + + for (NodeInfo node : asnm.values()) { + if (node.coordinator) { + continue; + } + // Decommission ACTIVE nodes that appear in nodesToExclude. + // Note that for now we update state during each refresh: + // 1. worker handle decommission efficiently if it is already DN or DD state. + // 2. we didn't track whether the previous update was successful + // 3. ensure DECOMMISSIONING state on worker just in case. + if (nodesToExclude.contains(node.nodeId)) { + asyncUpdateState(node, DECOMMISSIONING); + } + + // Recommission DN/DD nodes that do not appear in nodesToExclude + if ((node.state == DECOMMISSIONING || node.state == DECOMMISSIONED) + && !nodesToExclude.contains(node.nodeId)) { + asyncUpdateState(node, ACTIVE); + } + } + + return Response.ok().type(TEXT_PLAIN) + .entity(String.format("refreshNodes [%s] OK", Joiner.on(',').join(exclude))) + .build(); + } + + // Parse given list of node to exclude into a set and log unknown ones. + private static Set parseNodesToExclude(List exclude, Set nodes) + { + ImmutableSet.Builder nodesToExclude = ImmutableSet.builder(); + for (String node : exclude) { + if (!nodes.contains(node)) { + log.info("parseNodesToExclude unknown node " + node); + } + nodesToExclude.add(node); + } + return nodesToExclude.build(); + } + + // Get map from nodeId to AutoScaleNode for all nodes. + private TreeMap getId2NodeInfoMap() + { + final AllNodes nodes = nodeManager.getAllNodes(); + TreeMap nodeMap = new TreeMap<>(); + addToNodeMap(nodes.getActiveNodes(), ACTIVE, nodeMap); + addToNodeMap(nodes.getShuttingDownNodes(), SHUTTING_DOWN, nodeMap); + addToNodeMap(nodes.getInactiveNodes(), INACTIVE, nodeMap); + addToNodeMap(nodes.getDecommissioningNodes(), DECOMMISSIONING, nodeMap); + addToNodeMap(nodes.getDecommissionedNodes(), DECOMMISSIONED, nodeMap); + return nodeMap; + } + + // Add all nodes with a specific NodeState into nmap. + private void addToNodeMap( + Set nodes, NodeState state, TreeMap nmap) + { + for (InternalNode node : nodes) { + String nodeId = node.getNodeIdentifier(); + String uri = node.getInternalUri().toString(); + RemoteNodeStatus rns = nodeStatuses.get(nodeId); + NodeStatus status = rns != null && rns.getNodeStatus().isPresent() + ? rns.getNodeStatus().get() : null; + nmap.put(nodeId, new NodeInfo( + nodeId, uri, node.isCoordinator(), state, status, + rns == null ? 0 : rns.getLastUpdateTime())); + } + } + + // Asynchronously update state of a specific worker, basically execute HTTP put + // request against /v1/info/state endpoint on the remote worker. + private synchronized void asyncUpdateState(NodeInfo node, NodeState state) + { + log.info(String.format("asyncUpdateState %s %s", node.nodeId, state)); + Request request = getUpdateStateRequest(node, state); + HttpResponseFuture responseFuture = httpClient.executeAsync( + request, StatusResponseHandler.createStatusResponseHandler()); + + Futures.addCallback(responseFuture, new FutureCallback() + { + @Override + public void onSuccess(@Nullable StatusResponse result) + { + numUpdateStateOk.incrementAndGet(); + log.info(String.format("OK async updated %s %s", request.getUri(), state)); + } + + @Override + public void onFailure(Throwable t) + { + numUpdateStateFailed.incrementAndGet(); + log.info(String.format("Error async updated %s %s %s", + request.getUri(), state, t.getMessage())); + } + }, directExecutor()); + } + + private synchronized Request getUpdateStateRequest(NodeInfo node, NodeState state) + { + // http://10.43.31.106:8081 -> http://10.43.31.106:8081/v1/info/state + URI infoStateUri = uriBuilderFrom(getUri(node.getUri())).appendPath("/v1/info/state").build(); + + // Note that the quote in "" is needed as otherwise + // Unrecognized token 'DECOMMISSION': was expecting ('true', 'false' or 'null') + BodyGenerator bodyGenerator = StaticBodyGenerator.createStaticBodyGenerator( + "\"" + state + "\"", Charset.defaultCharset()); + return preparePut() + .setUri(infoStateUri) + .setHeader(CONTENT_TYPE, "application/json") + .setBodyGenerator(bodyGenerator) + .build(); + } + + private static URI getUri(String uri) + { + try { + return new URI(uri); + } + catch (URISyntaxException e) { + throw new RuntimeException(e.getMessage()); + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/server/QueryResource.java b/core/trino-main/src/main/java/io/trino/server/QueryResource.java index 77073dda812f..fdaa52526ef1 100644 --- a/core/trino-main/src/main/java/io/trino/server/QueryResource.java +++ b/core/trino-main/src/main/java/io/trino/server/QueryResource.java @@ -31,6 +31,7 @@ import javax.ws.rs.PUT; import javax.ws.rs.Path; import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; import javax.ws.rs.QueryParam; import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; @@ -42,13 +43,16 @@ import java.util.NoSuchElementException; import java.util.Optional; +import static com.google.common.collect.ImmutableList.toImmutableList; import static io.trino.connector.system.KillQueryProcedure.createKillQueryException; import static io.trino.connector.system.KillQueryProcedure.createPreemptQueryException; import static io.trino.security.AccessControlUtil.checkCanKillQueryOwnedBy; import static io.trino.security.AccessControlUtil.checkCanViewQueryOwnedBy; import static io.trino.security.AccessControlUtil.filterQueries; import static io.trino.server.security.ResourceSecurity.AccessType.AUTHENTICATED_USER; +import static io.trino.server.security.ResourceSecurity.AccessType.MANAGEMENT_READ; import static java.util.Objects.requireNonNull; +import static javax.ws.rs.core.MediaType.APPLICATION_JSON; /** * Manage queries scheduled on this node @@ -168,4 +172,28 @@ private Response failQuery(QueryId queryId, TrinoException queryException, HttpS return Response.status(Status.GONE).build(); } } + + // Get BasicQueryInfo of all pending and recently ended queries. + // Here recently is defined as ended on or after maxEndAgeSec (default value 0) seconds ago. + @ResourceSecurity(MANAGEMENT_READ) + @GET + @Path("all") + @Produces(APPLICATION_JSON) + public List getAllQueryInfos(@QueryParam("maxEndAgeSec") int maxEndAgeSec) + { + // If maxEndAgeSec is negative, return all queries cached. + // Otherwise, return queries not ended or ended within maxEndAgeSec seconds. + // Specifically if maxEndAgeSec is 0, return all queries not ended. + if (maxEndAgeSec < 0) { + return dispatchManager.getQueries(); + } + else { + long endCutoff = System.currentTimeMillis() - 1000L * maxEndAgeSec; + return dispatchManager.getQueries().stream() + .filter(v -> v.getQueryStats() == null + || v.getQueryStats().getEndTime() == null + || v.getQueryStats().getEndTime().getMillis() >= endCutoff) + .collect(toImmutableList()); + } + } } diff --git a/core/trino-main/src/main/java/io/trino/server/RemoteNodeStatus.java b/core/trino-main/src/main/java/io/trino/server/RemoteNodeStatus.java new file mode 100644 index 000000000000..c2cab4d529d2 --- /dev/null +++ b/core/trino-main/src/main/java/io/trino/server/RemoteNodeStatus.java @@ -0,0 +1,134 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.server; + +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import io.airlift.http.client.FullJsonResponseHandler.JsonResponse; +import io.airlift.http.client.HttpClient; +import io.airlift.http.client.HttpClient.HttpResponseFuture; +import io.airlift.http.client.Request; +import io.airlift.json.JsonCodec; +import io.airlift.log.Logger; +import io.airlift.units.Duration; + +import javax.annotation.Nullable; +import javax.annotation.concurrent.ThreadSafe; + +import java.net.URI; +import java.util.Optional; +import java.util.concurrent.Future; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import static com.google.common.net.MediaType.JSON_UTF_8; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static io.airlift.http.client.FullJsonResponseHandler.createFullJsonResponseHandler; +import static io.airlift.http.client.HttpStatus.OK; +import static io.airlift.http.client.Request.Builder.prepareGet; +import static io.airlift.json.JsonCodec.jsonCodec; +import static io.airlift.units.Duration.nanosSince; +import static java.util.Objects.requireNonNull; +import static java.util.concurrent.TimeUnit.SECONDS; +import static javax.ws.rs.core.HttpHeaders.CONTENT_TYPE; + +@ThreadSafe +public class RemoteNodeStatus +{ + private static final Logger log = Logger.get(RemoteNodeStatus.class); + private static final JsonCodec NODE_STATUS_CODEC = jsonCodec(NodeStatus.class); + + private final HttpClient httpClient; + private final URI statusUri; + private final AtomicReference> nodeStatus = new AtomicReference<>(Optional.empty()); + private final AtomicReference> future = new AtomicReference<>(); + private final AtomicLong lastUpdateNanos = new AtomicLong(); + private final AtomicLong lastWarningLogged = new AtomicLong(); + private boolean lastUpdateSuccess = true; + // Last time in epoch the remote status was successful obtained + private long lastUpdateTime; + + public RemoteNodeStatus(HttpClient httpClient, URI statusUri) + { + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + this.statusUri = requireNonNull(statusUri, "statusUri is null"); + } + + public Optional getNodeStatus() + { + return nodeStatus.get(); + } + + // Whether the latest refresh was successful. + public boolean isLastUpdateSuccess() + { + return lastUpdateSuccess; + } + + // Gets the last time in epoch the remote status was successfully obtained. + public long getLastUpdateTime() + { + return lastUpdateTime; + } + + public synchronized void asyncRefresh() + { + Duration sinceUpdate = nanosSince(lastUpdateNanos.get()); + if (nanosSince(lastWarningLogged.get()).toMillis() > 1_000 && + sinceUpdate.toMillis() > 10_000 && + future.get() != null) { + log.warn("NodeStatus request to %s has not returned in %s", + statusUri, sinceUpdate.toString(SECONDS)); + lastWarningLogged.set(System.nanoTime()); + } + if (sinceUpdate.toMillis() > 5_000 && future.get() == null) { + Request request = prepareGet() + .setUri(statusUri) + .setHeader(CONTENT_TYPE, JSON_UTF_8.toString()) + .build(); + HttpResponseFuture> responseFuture = httpClient.executeAsync( + request, createFullJsonResponseHandler(NODE_STATUS_CODEC)); + future.compareAndSet(null, responseFuture); + + Futures.addCallback(responseFuture, new FutureCallback<>() + { + @Override + public void onSuccess(@Nullable JsonResponse result) + { + lastUpdateTime = System.currentTimeMillis(); + lastUpdateNanos.set(System.nanoTime()); + future.compareAndSet(responseFuture, null); + if (result != null) { + if (result.hasValue()) { + nodeStatus.set(Optional.ofNullable(result.getValue())); + } + if (result.getStatusCode() != OK.code()) { + log.warn("Error fetching node status from %s returned status %d", + statusUri, result.getStatusCode()); + return; + } + } + } + + @Override + public void onFailure(Throwable t) + { + log.warn("Error fetching node status from %s: %s", statusUri, t.getMessage()); + lastUpdateNanos.set(System.nanoTime()); + future.compareAndSet(responseFuture, null); + } + }, directExecutor()); + } + } +} diff --git a/core/trino-main/src/main/java/io/trino/server/Server.java b/core/trino-main/src/main/java/io/trino/server/Server.java index faa9fae6527f..5fee8a12845e 100644 --- a/core/trino-main/src/main/java/io/trino/server/Server.java +++ b/core/trino-main/src/main/java/io/trino/server/Server.java @@ -119,7 +119,7 @@ private void doStart(String trinoVersion) new CatalogManagerModule(), new TransactionManagerModule(), new ServerMainModule(trinoVersion), - new GracefulShutdownModule(), + new UpdateNodeStateModule(), new WarningCollectorModule()); modules.addAll(getAdditionalModules()); diff --git a/core/trino-main/src/main/java/io/trino/server/ServerInfoResource.java b/core/trino-main/src/main/java/io/trino/server/ServerInfoResource.java index 525cf192bf4f..82a939570912 100644 --- a/core/trino-main/src/main/java/io/trino/server/ServerInfoResource.java +++ b/core/trino-main/src/main/java/io/trino/server/ServerInfoResource.java @@ -26,21 +26,18 @@ import javax.ws.rs.Path; import javax.ws.rs.Produces; import javax.ws.rs.WebApplicationException; -import javax.ws.rs.core.MediaType; import javax.ws.rs.core.Response; import java.util.Optional; +import java.util.concurrent.atomic.AtomicBoolean; +import static com.google.common.base.Preconditions.checkState; import static io.airlift.units.Duration.nanosSince; -import static io.trino.metadata.NodeState.ACTIVE; -import static io.trino.metadata.NodeState.SHUTTING_DOWN; import static io.trino.server.security.ResourceSecurity.AccessType.MANAGEMENT_WRITE; import static io.trino.server.security.ResourceSecurity.AccessType.PUBLIC; -import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static javax.ws.rs.core.MediaType.APPLICATION_JSON; import static javax.ws.rs.core.MediaType.TEXT_PLAIN; -import static javax.ws.rs.core.Response.Status.BAD_REQUEST; @Path("/v1/info") public class ServerInfoResource @@ -48,17 +45,20 @@ public class ServerInfoResource private final NodeVersion version; private final String environment; private final boolean coordinator; - private final GracefulShutdownHandler shutdownHandler; + private final UpdateNodeStateHandler updateNodeStateHandler; private final StartupStatus startupStatus; private final long startTime = System.nanoTime(); + private final AtomicBoolean startupComplete = new AtomicBoolean(); @Inject - public ServerInfoResource(NodeVersion nodeVersion, NodeInfo nodeInfo, ServerConfig serverConfig, GracefulShutdownHandler shutdownHandler, StartupStatus startupStatus) + public ServerInfoResource(NodeVersion nodeVersion, NodeInfo nodeInfo, + ServerConfig serverConfig, UpdateNodeStateHandler updateNodeStateHandler, + StartupStatus startupStatus) { this.version = requireNonNull(nodeVersion, "nodeVersion is null"); this.environment = nodeInfo.getEnvironment(); this.coordinator = serverConfig.isCoordinator(); - this.shutdownHandler = requireNonNull(shutdownHandler, "shutdownHandler is null"); + this.updateNodeStateHandler = requireNonNull(updateNodeStateHandler, "updateNodeStateHandler is null"); this.startupStatus = requireNonNull(startupStatus, "startupStatus is null"); } @@ -80,23 +80,7 @@ public Response updateState(NodeState state) throws WebApplicationException { requireNonNull(state, "state is null"); - switch (state) { - case SHUTTING_DOWN: - shutdownHandler.requestShutdown(); - return Response.ok().build(); - case ACTIVE: - case INACTIVE: - throw new WebApplicationException(Response - .status(BAD_REQUEST) - .type(MediaType.TEXT_PLAIN) - .entity(format("Invalid state transition to %s", state)) - .build()); - default: - return Response.status(BAD_REQUEST) - .type(TEXT_PLAIN) - .entity(format("Invalid state %s", state)) - .build(); - } + return updateNodeStateHandler.updateState(state); } @ResourceSecurity(PUBLIC) @@ -105,10 +89,7 @@ public Response updateState(NodeState state) @Produces(APPLICATION_JSON) public NodeState getServerState() { - if (shutdownHandler.isShutdownRequested()) { - return SHUTTING_DOWN; - } - return ACTIVE; + return updateNodeStateHandler.getServerState(); } @ResourceSecurity(PUBLIC) @@ -123,4 +104,9 @@ public Response getServerCoordinator() // return 404 to allow load balancers to only send traffic to the coordinator return Response.status(Response.Status.NOT_FOUND).build(); } + + public void startupComplete() + { + checkState(startupComplete.compareAndSet(false, true), "Server startup already marked as complete"); + } } diff --git a/core/trino-main/src/main/java/io/trino/server/StatusResource.java b/core/trino-main/src/main/java/io/trino/server/StatusResource.java index ea2d39d0b1aa..6cf70877f212 100644 --- a/core/trino-main/src/main/java/io/trino/server/StatusResource.java +++ b/core/trino-main/src/main/java/io/trino/server/StatusResource.java @@ -45,6 +45,7 @@ public class StatusResource private final int logicalCores; private final LocalMemoryManager memoryManager; private final MemoryMXBean memoryMXBean; + private final long startTimeEpoch = System.currentTimeMillis(); private OperatingSystemMXBean operatingSystemMXBean; @@ -92,6 +93,7 @@ public NodeStatus getStatus() operatingSystemMXBean == null ? 0 : operatingSystemMXBean.getSystemCpuLoad(), memoryMXBean.getHeapMemoryUsage().getUsed(), memoryMXBean.getHeapMemoryUsage().getMax(), - memoryMXBean.getNonHeapMemoryUsage().getUsed()); + memoryMXBean.getNonHeapMemoryUsage().getUsed(), + startTimeEpoch); } } diff --git a/core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java b/core/trino-main/src/main/java/io/trino/server/UpdateNodeStateHandler.java similarity index 54% rename from core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java rename to core/trino-main/src/main/java/io/trino/server/UpdateNodeStateHandler.java index 1d23f9cb2b21..f64754d755e1 100644 --- a/core/trino-main/src/main/java/io/trino/server/GracefulShutdownHandler.java +++ b/core/trino-main/src/main/java/io/trino/server/UpdateNodeStateHandler.java @@ -18,9 +18,13 @@ import io.airlift.units.Duration; import io.trino.execution.SqlTaskManager; import io.trino.execution.TaskInfo; +import io.trino.metadata.NodeState; import javax.annotation.concurrent.GuardedBy; import javax.inject.Inject; +import javax.ws.rs.WebApplicationException; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -33,16 +37,23 @@ import static com.google.common.collect.ImmutableList.toImmutableList; import static com.google.common.util.concurrent.Uninterruptibles.sleepUninterruptibly; import static io.airlift.concurrent.Threads.threadsNamed; +import static io.trino.metadata.NodeState.ACTIVE; +import static io.trino.metadata.NodeState.DECOMMISSIONED; +import static io.trino.metadata.NodeState.DECOMMISSIONING; +import static io.trino.metadata.NodeState.SHUTTING_DOWN; +import static java.lang.String.format; import static java.lang.Thread.currentThread; import static java.util.Objects.requireNonNull; import static java.util.concurrent.Executors.newSingleThreadExecutor; import static java.util.concurrent.Executors.newSingleThreadScheduledExecutor; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.SECONDS; +import static javax.ws.rs.core.MediaType.TEXT_PLAIN; +import static javax.ws.rs.core.Response.Status.BAD_REQUEST; -public class GracefulShutdownHandler +public class UpdateNodeStateHandler { - private static final Logger log = Logger.get(GracefulShutdownHandler.class); + private static final Logger log = Logger.get(UpdateNodeStateHandler.class); private static final Duration LIFECYCLE_STOP_TIMEOUT = new Duration(30, SECONDS); private final ScheduledExecutorService shutdownHandler = newSingleThreadScheduledExecutor(threadsNamed("shutdown-handler-%s")); @@ -52,12 +63,15 @@ public class GracefulShutdownHandler private final boolean isCoordinator; private final ShutdownAction shutdownAction; private final Duration gracePeriod; + private final ScheduledExecutorService executor = newSingleThreadScheduledExecutor( + threadsNamed("decommission-handler-%s")); + private NodeState currState = NodeState.ACTIVE; @GuardedBy("this") private boolean shutdownRequested; @Inject - public GracefulShutdownHandler( + public UpdateNodeStateHandler( SqlTaskManager sqlTaskManager, ServerConfig serverConfig, ShutdownAction shutdownAction, @@ -70,6 +84,62 @@ public GracefulShutdownHandler( this.gracePeriod = serverConfig.getGracePeriod(); } + public NodeState getServerState() + { + return currState; + } + + public synchronized Response updateState(NodeState state) + throws WebApplicationException + { + requireNonNull(state, "state is null"); + log.info(String.format("Entre updateState %s -> %s", currState, state)); + + // Supported state transitions: + // 1. ? -> ? + // 2. * -> SHUTTING_DOWN + // 3. ACTIVE -> DECOMMISSIONING + // 4. DECOMMISSIONING, DECOMMISSIONED -> ACTIVE + + if (currState == state || (state == DECOMMISSIONING && currState == DECOMMISSIONED)) { + return Response.ok().build(); + } + + // Prefer using a switch instead of a chained if-else for enums + switch (state) { + case SHUTTING_DOWN: + requestShutdown(); + currState = SHUTTING_DOWN; + return Response.ok().build(); + case DECOMMISSIONING: + if (currState == ACTIVE) { + requestDecommission(); + currState = DECOMMISSIONING; + return Response.ok().build(); + } + break; + case ACTIVE: + if (currState == DECOMMISSIONING || currState == DECOMMISSIONED) { + currState = ACTIVE; + return Response.ok().build(); + } + break; + case INACTIVE: + break; + default: + return Response.status(BAD_REQUEST).type(TEXT_PLAIN) + .entity(format("Invalid state %s", state)) + .build(); + } + + // Bad request once here. + throw new WebApplicationException(Response + .status(BAD_REQUEST) + .type(MediaType.TEXT_PLAIN) + .entity(format("Invalid state transition from %s to %s", currState, state)) + .build()); + } + public synchronized void requestShutdown() { log.info("Shutdown requested"); @@ -89,33 +159,7 @@ public synchronized void requestShutdown() private void shutdown() { - List activeTasks = getActiveTasks(); - - // At this point no new tasks should be scheduled by coordinator on this worker node. - // Wait for all remaining tasks to finish. - while (activeTasks.size() > 0) { - CountDownLatch countDownLatch = new CountDownLatch(activeTasks.size()); - - for (TaskInfo taskInfo : activeTasks) { - sqlTaskManager.addStateChangeListener(taskInfo.getTaskStatus().getTaskId(), newState -> { - if (newState.isDone()) { - countDownLatch.countDown(); - } - }); - } - - log.info("Waiting for all tasks to finish"); - - try { - countDownLatch.await(); - } - catch (InterruptedException e) { - log.warn("Interrupted while waiting for all tasks to finish"); - currentThread().interrupt(); - } - - activeTasks = getActiveTasks(); - } + waitActiveTasksToFinish(sqlTaskManager); // wait for another grace period for all task states to be observed by the coordinator sleepUninterruptibly(gracePeriod.toMillis(), MILLISECONDS); @@ -143,7 +187,37 @@ private void shutdown() shutdownAction.onShutdown(); } - private List getActiveTasks() + static void waitActiveTasksToFinish(SqlTaskManager sqlTaskManager) + { + // At this point no new tasks should be scheduled by coordinator on this worker node. + // Wait for all remaining tasks to finish. + while (true) { + List activeTasks = getActiveTasks(sqlTaskManager); + log.info("Waiting for " + activeTasks.size() + " active tasks to finish"); + if (activeTasks.isEmpty()) { + break; + } + CountDownLatch countDownLatch = new CountDownLatch(activeTasks.size()); + + for (TaskInfo taskInfo : activeTasks) { + sqlTaskManager.addStateChangeListener(taskInfo.getTaskStatus().getTaskId(), newState -> { + if (newState.isDone()) { + countDownLatch.countDown(); + } + }); + } + + try { + countDownLatch.await(); + } + catch (InterruptedException e) { + log.warn("Interrupted while waiting for all tasks to finish"); + currentThread().interrupt(); + } + } + } + + private static List getActiveTasks(SqlTaskManager sqlTaskManager) { return sqlTaskManager.getAllTaskInfo() .stream() @@ -151,8 +225,35 @@ private List getActiveTasks() .collect(toImmutableList()); } - public synchronized boolean isShutdownRequested() + public synchronized void requestDecommission() { - return shutdownRequested; + log.info("enter requestDecommission " + getServerState()); + if (isCoordinator) { + throw new UnsupportedOperationException("Cannot decommission coordinator"); + } + + // The decommission is normally initiated by the coordinator. + // Here we wait a short grace period of 10 seconds for coordinator to no longer + // assign new tasks to this worker node, before wait active tasks to finish. + executor.schedule(new Runnable() { + @Override + public void run() + { + waitActiveTasksToFinish(sqlTaskManager); + log.info("complete waitActiveTasksToFinish " + getServerState()); + NodeState state = onDecommissioned(); + log.info("onDecommissioned " + state); + } + }, 10000, MILLISECONDS); + } + + // callback used by decommissionHandler + NodeState onDecommissioned() + { + log.info("onDecommissioned " + (currState == null ? "null" : currState)); + if (currState == NodeState.DECOMMISSIONING) { + currState = NodeState.DECOMMISSIONED; + } + return currState; } } diff --git a/core/trino-main/src/main/java/io/trino/server/GracefulShutdownModule.java b/core/trino-main/src/main/java/io/trino/server/UpdateNodeStateModule.java similarity index 89% rename from core/trino-main/src/main/java/io/trino/server/GracefulShutdownModule.java rename to core/trino-main/src/main/java/io/trino/server/UpdateNodeStateModule.java index cdb83a90e1f7..8e924c8c2a8d 100644 --- a/core/trino-main/src/main/java/io/trino/server/GracefulShutdownModule.java +++ b/core/trino-main/src/main/java/io/trino/server/UpdateNodeStateModule.java @@ -17,13 +17,13 @@ import com.google.inject.Scopes; import io.airlift.configuration.AbstractConfigurationAwareModule; -public class GracefulShutdownModule +public class UpdateNodeStateModule extends AbstractConfigurationAwareModule { @Override protected void setup(Binder binder) { binder.bind(ShutdownAction.class).to(DefaultShutdownAction.class).in(Scopes.SINGLETON); - binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON); + binder.bind(UpdateNodeStateHandler.class).in(Scopes.SINGLETON); } } diff --git a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java index f33bf471986d..42c25a181a64 100644 --- a/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java +++ b/core/trino-main/src/main/java/io/trino/server/testing/TestingTrinoServer.java @@ -65,12 +65,12 @@ import io.trino.security.AccessControlConfig; import io.trino.security.AccessControlManager; import io.trino.security.GroupProviderManager; -import io.trino.server.GracefulShutdownHandler; import io.trino.server.PluginManager; import io.trino.server.Server; import io.trino.server.ServerMainModule; import io.trino.server.SessionPropertyDefaults; import io.trino.server.ShutdownAction; +import io.trino.server.UpdateNodeStateHandler; import io.trino.server.security.CertificateAuthenticatorManager; import io.trino.server.security.ServerSecurityModule; import io.trino.spi.ErrorType; @@ -168,7 +168,7 @@ public static Builder builder() private final DispatchManager dispatchManager; private final SqlQueryManager queryManager; private final SqlTaskManager taskManager; - private final GracefulShutdownHandler gracefulShutdownHandler; + private final UpdateNodeStateHandler updateNodeStateHandler; private final ShutdownAction shutdownAction; private final MBeanServer mBeanServer; private final boolean coordinator; @@ -273,7 +273,7 @@ private TestingTrinoServer( binder.bind(GroupProvider.class).to(TestingGroupProvider.class).in(Scopes.SINGLETON); binder.bind(AccessControl.class).to(AccessControlManager.class).in(Scopes.SINGLETON); binder.bind(ShutdownAction.class).to(TestShutdownAction.class).in(Scopes.SINGLETON); - binder.bind(GracefulShutdownHandler.class).in(Scopes.SINGLETON); + binder.bind(UpdateNodeStateHandler.class).in(Scopes.SINGLETON); binder.bind(ProcedureTester.class).in(Scopes.SINGLETON); binder.bind(ExchangeManagerRegistry.class).in(Scopes.SINGLETON); }); @@ -352,7 +352,7 @@ private TestingTrinoServer( localMemoryManager = injector.getInstance(LocalMemoryManager.class); nodeManager = injector.getInstance(InternalNodeManager.class); serviceSelectorManager = injector.getInstance(ServiceSelectorManager.class); - gracefulShutdownHandler = injector.getInstance(GracefulShutdownHandler.class); + updateNodeStateHandler = injector.getInstance(UpdateNodeStateHandler.class); taskManager = injector.getInstance(SqlTaskManager.class); shutdownAction = injector.getInstance(ShutdownAction.class); mBeanServer = injector.getInstance(MBeanServer.class); @@ -589,9 +589,9 @@ public MBeanServer getMbeanServer() return mBeanServer; } - public GracefulShutdownHandler getGracefulShutdownHandler() + public UpdateNodeStateHandler getUpdateNodeStateHandler() { - return gracefulShutdownHandler; + return updateNodeStateHandler; } public SqlTaskManager getTaskManager() diff --git a/core/trino-main/src/main/java/io/trino/server/ui/ClusterStatsResource.java b/core/trino-main/src/main/java/io/trino/server/ui/ClusterStatsResource.java index 83b0c2434b75..eb65224055c0 100644 --- a/core/trino-main/src/main/java/io/trino/server/ui/ClusterStatsResource.java +++ b/core/trino-main/src/main/java/io/trino/server/ui/ClusterStatsResource.java @@ -63,6 +63,8 @@ public ClusterStats getClusterStats() long activeNodes = nodeManager.getNodes(NodeState.ACTIVE).stream() .filter(node -> isIncludeCoordinator || !node.isCoordinator()) .count(); + long decommissioningNodes = nodeManager.getNodes(NodeState.DECOMMISSIONING).stream().count(); + long decommissionedNodes = nodeManager.getNodes(NodeState.DECOMMISSIONED).stream().count(); long activeCoordinators = nodeManager.getNodes(NodeState.ACTIVE).stream() .filter(InternalNode::isCoordinator) @@ -105,6 +107,8 @@ else if (query.getState() == QueryState.RUNNING) { queuedQueries, activeCoordinators, activeNodes, + decommissioningNodes, + decommissionedNodes, runningDrivers, totalAvailableProcessors, memoryReservation, @@ -121,6 +125,8 @@ public static class ClusterStats private final long activeCoordinators; private final long activeWorkers; + private final long decommissioningWorkers; + private final long decommissionedWorkers; private final long runningDrivers; private final long totalAvailableProcessors; @@ -138,6 +144,8 @@ public ClusterStats( @JsonProperty("queuedQueries") long queuedQueries, @JsonProperty("activeCoordinators") long activeCoordinators, @JsonProperty("activeWorkers") long activeWorkers, + @JsonProperty("decommissioningWorkers") long decommissioningWorkers, + @JsonProperty("decommissionedWorkers") long decommissionedWorkers, @JsonProperty("runningDrivers") long runningDrivers, @JsonProperty("totalAvailableProcessors") long totalAvailableProcessors, @JsonProperty("reservedMemory") double reservedMemory, @@ -150,6 +158,8 @@ public ClusterStats( this.queuedQueries = queuedQueries; this.activeCoordinators = activeCoordinators; this.activeWorkers = activeWorkers; + this.decommissioningWorkers = decommissioningWorkers; + this.decommissionedWorkers = decommissionedWorkers; this.runningDrivers = runningDrivers; this.totalAvailableProcessors = totalAvailableProcessors; this.reservedMemory = reservedMemory; @@ -188,6 +198,18 @@ public long getActiveWorkers() return activeWorkers; } + @JsonProperty + public long getDecommissioningWorkers() + { + return decommissioningWorkers; + } + + @JsonProperty + public long getDecommissionedWorkers() + { + return decommissionedWorkers; + } + @JsonProperty public long getRunningDrivers() { diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java index 891aadd0e95e..f13a4bf189d8 100644 --- a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAdlsConnectorSmokeTest.java @@ -50,6 +50,7 @@ import static java.util.Objects.requireNonNull; import static java.util.regex.Matcher.quoteReplacement; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; public class TestDeltaLakeAdlsConnectorSmokeTest extends BaseDeltaLakeConnectorSmokeTest @@ -113,6 +114,14 @@ public void removeTestData() assertThat(azureContainerClient.listBlobsByHierarchy(bucketName + "/").stream()).hasSize(0); } + @Override + public void testPathUriDecoding() + { + // TODO https://github.com/trinodb/trino/issues/15376 AzureBlobFileSystem doesn't expect URI as the path argument + assertThatThrownBy(super::testPathUriDecoding) + .hasStackTraceContaining("The specified path does not exist"); + } + @Override protected void registerTableFromResources(String table, String resourcePath, QueryRunner queryRunner) { diff --git a/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java b/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java index 9a949f3e7c8e..3867ada07f5e 100644 --- a/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java +++ b/testing/trino-tests/src/test/java/io/trino/tests/TestGracefulShutdown.java @@ -98,7 +98,7 @@ public void testShutdown() MILLISECONDS.sleep(500); } - worker.getGracefulShutdownHandler().requestShutdown(); + worker.getUpdateNodeStateHandler().requestShutdown(); Futures.allAsList(queryFutures).get(); @@ -124,7 +124,7 @@ public void testCoordinatorShutdown() .filter(TestingTrinoServer::isCoordinator) .collect(onlyElement()); - assertThatThrownBy(coordinator.getGracefulShutdownHandler()::requestShutdown) + assertThatThrownBy(coordinator.getUpdateNodeStateHandler()::requestShutdown) .isInstanceOf(UnsupportedOperationException.class) .hasMessage("Cannot shutdown coordinator"); }