Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
25 commits
Select commit Hold shift + click to select a range
87b8e6f
Add Redshift tests
dain Dec 11, 2022
0312463
Add Redshift schema, table, and column length checks
dain Dec 11, 2022
7eab69a
Implement proper type mapping for Redshift
dain Dec 11, 2022
1e8887e
Implement Redshift DELETE
dain Dec 11, 2022
e65585d
Add Redshift statistics
dain Dec 11, 2022
c225b8f
Add Redshift pushdown
dain Dec 10, 2022
be62233
Test SET PATH support by clients
wendigo Dec 7, 2022
dfe33c7
Fix HTTP_Status on OAuth2 refresh token redirect
huberty89 Dec 7, 2022
c056bc7
Fix recording of projection metrics
raunaqmorarka Dec 10, 2022
a46a510
Fix dereference operations for union type in Hive Connector
groupcache4321 Dec 2, 2022
0459735
Cleanup PartitioningExchanger
pettyjamesm Dec 9, 2022
1788829
Fix table name in TestDropTableTask
krvikash Dec 12, 2022
3c1c1fa
Document examples for datetime functions
rigogsilva Dec 12, 2022
58bd159
Decode path as URI in Delta Lake connector
jkylling Nov 25, 2022
35da905
Remove unused method from AstBuilder
ebyhr Dec 13, 2022
ce7b57f
Refactor BigQuery connector
ebyhr Nov 18, 2022
a660133
Fix projection pushdown when unsupported column exists in BigQuery
ebyhr Nov 19, 2022
cd8eac6
Update Iceberg to 1.1.0
Fokko Nov 17, 2022
b9d26a7
Document Top-N pushdown
findinpath Jul 5, 2021
f5b5fb8
Remove unnecessary override for `getTableProperties` method
findinpath Dec 14, 2022
ec8a8fd
Fix formatting
kasiafi Dec 8, 2022
7026432
Add requiredColumns field to TableFunctionAnalysis
kasiafi Nov 30, 2022
c3ee15f
Analyze table function's required input columns
kasiafi Nov 30, 2022
94e8eb5
Merge branch 'master' of https://github.com/trinodb/trino
dzhi-lyft Dec 22, 2022
bcef885
Coordinator-driven graceful decommission/recommission of worker nodes
dzhi-lyft Oct 14, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 41 additions & 2 deletions core/trino-main/src/main/java/io/trino/metadata/AllNodes.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,23 +23,49 @@
public class AllNodes
{
private final Set<InternalNode> activeNodes;
private final Set<InternalNode> decommissionedNodes;
private final Set<InternalNode> decommissioningNodes;
private final Set<InternalNode> inactiveNodes;
private final Set<InternalNode> shuttingDownNodes;
private final Set<InternalNode> activeCoordinators;
private final Set<InternalNode> aliveNodes;

public AllNodes(Set<InternalNode> activeNodes, Set<InternalNode> inactiveNodes, Set<InternalNode> shuttingDownNodes, Set<InternalNode> activeCoordinators)
public AllNodes(Set<InternalNode> activeNodes,
Set<InternalNode> decommissionedNodes,
Set<InternalNode> decommissioningNodes,
Set<InternalNode> inactiveNodes,
Set<InternalNode> shuttingDownNodes,
Set<InternalNode> activeCoordinators)
{
this.activeNodes = ImmutableSet.copyOf(requireNonNull(activeNodes, "activeNodes is null"));
this.decommissionedNodes = ImmutableSet.copyOf(requireNonNull(decommissionedNodes, "decommissionedNodes is null"));
this.decommissioningNodes = ImmutableSet.copyOf(requireNonNull(decommissioningNodes, "decommissioningNodes is null"));
this.inactiveNodes = ImmutableSet.copyOf(requireNonNull(inactiveNodes, "inactiveNodes is null"));
this.shuttingDownNodes = ImmutableSet.copyOf(requireNonNull(shuttingDownNodes, "shuttingDownNodes is null"));
this.activeCoordinators = ImmutableSet.copyOf(requireNonNull(activeCoordinators, "activeCoordinators is null"));
this.aliveNodes = ImmutableSet.<InternalNode>builder()
.addAll(activeNodes)
.addAll(decommissionedNodes)
.addAll(decommissioningNodes)
.addAll(shuttingDownNodes)
.build();
}

public Set<InternalNode> getActiveNodes()
{
return activeNodes;
}

public Set<InternalNode> getDecommissionedNodes()
{
return decommissionedNodes;
}

public Set<InternalNode> getDecommissioningNodes()
{
return decommissioningNodes;
}

public Set<InternalNode> getInactiveNodes()
{
return inactiveNodes;
Expand All @@ -50,6 +76,11 @@ public Set<InternalNode> getShuttingDownNodes()
return shuttingDownNodes;
}

public Set<InternalNode> getAliveNodes()
{
return aliveNodes;
}

public Set<InternalNode> getActiveCoordinators()
{
return activeCoordinators;
Expand All @@ -66,6 +97,8 @@ public boolean equals(Object o)
}
AllNodes allNodes = (AllNodes) o;
return Objects.equals(activeNodes, allNodes.activeNodes) &&
Objects.equals(decommissionedNodes, allNodes.decommissionedNodes) &&
Objects.equals(decommissioningNodes, allNodes.decommissioningNodes) &&
Objects.equals(inactiveNodes, allNodes.inactiveNodes) &&
Objects.equals(shuttingDownNodes, allNodes.shuttingDownNodes) &&
Objects.equals(activeCoordinators, allNodes.activeCoordinators);
Expand All @@ -74,6 +107,12 @@ public boolean equals(Object o)
@Override
public int hashCode()
{
return Objects.hash(activeNodes, inactiveNodes, shuttingDownNodes, activeCoordinators);
return Objects.hash(
activeNodes,
inactiveNodes,
shuttingDownNodes,
decommissioningNodes,
decommissionedNodes,
activeCoordinators);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.ImmutableSetMultimap;
import com.google.common.collect.SetMultimap;
import com.google.common.collect.Sets;
import com.google.common.collect.Sets.SetView;
import io.airlift.discovery.client.ServiceDescriptor;
import io.airlift.discovery.client.ServiceSelector;
Expand All @@ -43,6 +42,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Set;
Expand All @@ -59,6 +59,8 @@
import static io.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static io.trino.connector.system.GlobalSystemConnector.CATALOG_HANDLE;
import static io.trino.metadata.NodeState.ACTIVE;
import static io.trino.metadata.NodeState.DECOMMISSIONED;
import static io.trino.metadata.NodeState.DECOMMISSIONING;
import static io.trino.metadata.NodeState.INACTIVE;
import static io.trino.metadata.NodeState.SHUTTING_DOWN;
import static java.util.Locale.ENGLISH;
Expand Down Expand Up @@ -93,6 +95,9 @@ public final class DiscoveryNodeManager
@GuardedBy("this")
private Set<InternalNode> coordinators;

@GuardedBy("this")
private Set<String> nodesToBeDecommissioned = new HashSet<>();

@GuardedBy("this")
private final List<Consumer<AllNodes>> listeners = new ArrayList<>();

Expand Down Expand Up @@ -169,10 +174,7 @@ public void destroy()
private void pollWorkers()
{
AllNodes allNodes = getAllNodes();
Set<InternalNode> aliveNodes = ImmutableSet.<InternalNode>builder()
.addAll(allNodes.getActiveNodes())
.addAll(allNodes.getShuttingDownNodes())
.build();
Set<InternalNode> aliveNodes = allNodes.getAliveNodes();

ImmutableSet<String> aliveNodeIds = aliveNodes.stream()
.map(InternalNode::getNodeIdentifier)
Expand Down Expand Up @@ -215,9 +217,7 @@ private synchronized void refreshNodesInternal()
.filter(service -> !failureDetector.getFailed().contains(service))
.collect(toImmutableSet());

ImmutableSet.Builder<InternalNode> activeNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> inactiveNodesBuilder = ImmutableSet.builder();
ImmutableSet.Builder<InternalNode> shuttingDownNodesBuilder = ImmutableSet.builder();
ImmutableSetMultimap.Builder<NodeState, InternalNode> nodeStateMapBuilder = ImmutableSetMultimap.builder();
ImmutableSet.Builder<InternalNode> coordinatorsBuilder = ImmutableSet.builder();
ImmutableSetMultimap.Builder<CatalogHandle, InternalNode> byCatalogHandleBuilder = ImmutableSetMultimap.builder();

Expand All @@ -229,55 +229,65 @@ private synchronized void refreshNodesInternal()
InternalNode node = new InternalNode(service.getNodeId(), uri, nodeVersion, coordinator);
NodeState nodeState = getNodeState(node);

switch (nodeState) {
case ACTIVE:
activeNodesBuilder.add(node);
if (coordinator) {
coordinatorsBuilder.add(node);
}
// nodesToBeDecommissioned is the authoritative list of node to be decommissioned
// from coordinator perspective. Once a worker appears in the list,
// its state become DECOMMISSIONING even if worker has yet confirmed such
// so that no new tasks will be scheduled on it.
if (!coordinator && nodesToBeDecommissioned.contains(node.getNodeIdentifier())
&& nodeState == ACTIVE) {
log.debug("Treat " + node.getNodeIdentifier() + " as DECOMMISSIONING");
nodeState = DECOMMISSIONING;
}
Comment on lines 232 to 240
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I am not a big fan of this dichotomy. I would rather keep what workers report as a source of truth, and report that.

Also the quesion here may become obsolete if we would decide to drop NodesResource.refreshNodes as discused in https://github.com/trinodb/trino/pull/14876/files#r1051663508


// record available active nodes organized by catalog handle
String catalogHandleIds = service.getProperties().get("catalogHandleIds");
if (catalogHandleIds != null) {
catalogHandleIds = catalogHandleIds.toLowerCase(ENGLISH);
for (String catalogHandleId : CATALOG_HANDLE_ID_SPLITTER.split(catalogHandleIds)) {
byCatalogHandleBuilder.put(CatalogHandle.fromId(catalogHandleId), node);
}
// Add node to node state map.
nodeStateMapBuilder.put(nodeState, node);

if (nodeState == ACTIVE) {
if (coordinator) {
coordinatorsBuilder.add(node);
}

// record available active nodes organized by catalog handle
String catalogHandleIds = service.getProperties().get("catalogHandleIds");
if (catalogHandleIds != null) {
catalogHandleIds = catalogHandleIds.toLowerCase(ENGLISH);
for (String catalogHandleId : CATALOG_HANDLE_ID_SPLITTER.split(catalogHandleIds)) {
byCatalogHandleBuilder.put(CatalogHandle.fromId(catalogHandleId), node);
}
}

// always add system connector
byCatalogHandleBuilder.put(CATALOG_HANDLE, node);
break;
case INACTIVE:
inactiveNodesBuilder.add(node);
break;
case SHUTTING_DOWN:
shuttingDownNodesBuilder.add(node);
break;
default:
log.error("Unknown state %s for node %s", nodeState, node);
// always add system connector
byCatalogHandleBuilder.put(CATALOG_HANDLE, node);
}
}
}

if (allNodes != null) {
// nodes by catalog handle changes anytime a node adds or removes a catalog (note: this is not part of the listener system)
if (!allCatalogsOnAllNodes) {
activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build());
}

SetMultimap<NodeState, InternalNode> nodeStateMap = nodeStateMapBuilder.build();
AllNodes currAllNodes = new AllNodes(
nodeStateMap.get(ACTIVE),
nodeStateMap.get(DECOMMISSIONED),
nodeStateMap.get(DECOMMISSIONING),
nodeStateMap.get(INACTIVE),
nodeStateMap.get(SHUTTING_DOWN),
coordinatorsBuilder.build());

if (this.allNodes != null) {
// log node that are no longer active (but not shutting down)
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), Sets.union(activeNodesBuilder.build(), shuttingDownNodesBuilder.build()));
SetView<InternalNode> missingNodes = difference(allNodes.getActiveNodes(), currAllNodes.getAliveNodes());
for (InternalNode missingNode : missingNodes) {
log.info("Previously active node is missing: %s (last seen at %s)", missingNode.getNodeIdentifier(), missingNode.getHost());
}
}

// nodes by catalog handle changes anytime a node adds or removes a catalog (note: this is not part of the listener system)
if (!allCatalogsOnAllNodes) {
activeNodesByCatalogHandle = Optional.of(byCatalogHandleBuilder.build());
}

AllNodes allNodes = new AllNodes(activeNodesBuilder.build(), inactiveNodesBuilder.build(), shuttingDownNodesBuilder.build(), coordinatorsBuilder.build());
// only update if all nodes actually changed (note: this does not include the connectors registered with the nodes)
if (!allNodes.equals(this.allNodes)) {
if (!currAllNodes.equals(this.allNodes)) {
// assign allNodes to a local variable for use in the callback below
this.allNodes = allNodes;
this.allNodes = currAllNodes;
coordinators = coordinatorsBuilder.build();

// notify listeners
Expand All @@ -289,22 +299,19 @@ private synchronized void refreshNodesInternal()
private NodeState getNodeState(InternalNode node)
{
if (expectedNodeVersion.equals(node.getNodeVersion())) {
if (isNodeShuttingDown(node.getNodeIdentifier())) {
return SHUTTING_DOWN;
String nodeId = node.getNodeIdentifier();
Optional<NodeState> remoteNodeState = nodeStates.containsKey(nodeId)
? nodeStates.get(nodeId).getNodeState()
: Optional.empty();
if (remoteNodeState.isPresent()) {
return remoteNodeState.get();
}
// no remote node state
return ACTIVE;
}
return INACTIVE;
}

private boolean isNodeShuttingDown(String nodeId)
{
Optional<NodeState> remoteNodeState = nodeStates.containsKey(nodeId)
? nodeStates.get(nodeId).getNodeState()
: Optional.empty();
return remoteNodeState.isPresent() && remoteNodeState.get() == SHUTTING_DOWN;
}

@Override
public synchronized AllNodes getAllNodes()
{
Expand All @@ -317,6 +324,18 @@ public int getActiveNodeCount()
return getAllNodes().getActiveNodes().size();
}

@Managed
public int getDecommissionedNodeCount()
{
return getAllNodes().getDecommissionedNodes().size();
}

@Managed
public int getDecommissioningNodeCount()
{
return getAllNodes().getDecommissioningNodes().size();
}

Comment on lines 327 to 338
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how are the docomissioned/decomissioning nodes represented in Web UI? Will they be accounted for in the number of active nodes in the cluster?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is currently not reflected in the Web UI. I figured three files to touch to add them to UI.

  1. (Done) core/trino-main/src/main/java/io/trino/server/ui/ClusterStatsResource.java
  2. (Locally tried) core/trino-main/src/main/resources/webapp/src/components/ClusterHUD.jsx
  3. (Not sure how to generate this) core/trino-main/src/main/resources/webapp/dist/index.js

I tried "babel" to generate 3 from 2, but got error "Support for the experimental syntax 'jsx' isn't currently enabled". So this one is pending on how to generate the file in 3 (assuming it was generated by tool but not edited by hand).

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have tried to manually update No 3 but cluster-hud UI stuck in loading and I am not sure how to debug it (not familiar with React nor js in general). So 2 & 3 is not included for now, and may need tip on how to do No 3.

@Managed
public int getInactiveNodeCount()
{
Expand All @@ -335,6 +354,10 @@ public Set<InternalNode> getNodes(NodeState state)
switch (state) {
case ACTIVE:
return getAllNodes().getActiveNodes();
case DECOMMISSIONED:
return getAllNodes().getDecommissionedNodes();
case DECOMMISSIONING:
return getAllNodes().getDecommissioningNodes();
case INACTIVE:
return getAllNodes().getInactiveNodes();
case SHUTTING_DOWN:
Expand Down Expand Up @@ -407,4 +430,9 @@ private static boolean isCoordinator(ServiceDescriptor service)
{
return Boolean.parseBoolean(service.getProperties().get("coordinator"));
}

public synchronized void setNodesToExclude(Set<String> nodesToExclude)
{
this.nodesToBeDecommissioned = nodesToExclude;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,11 @@ public Set<InternalNode> getNodes(NodeState state)
{
switch (state) {
case ACTIVE:
return allNodes;
return getAllNodes().getActiveNodes();
case DECOMMISSIONED:
return getAllNodes().getDecommissionedNodes();
case DECOMMISSIONING:
return getAllNodes().getDecommissioningNodes();
case INACTIVE:
case SHUTTING_DOWN:
return ImmutableSet.of();
Expand All @@ -86,6 +90,8 @@ public AllNodes getAllNodes()
allNodes,
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(),
ImmutableSet.of(CURRENT_NODE));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
public enum NodeState
{
ACTIVE,
DECOMMISSIONED,
DECOMMISSIONING,
INACTIVE,
SHUTTING_DOWN
}
Original file line number Diff line number Diff line change
Expand Up @@ -341,6 +341,15 @@ protected void setup(Binder binder)

install(new QueryExecutionFactoryModule());

// nodes and queries for monitoring and auto-scaling
jaxrsBinder(binder).bind(NodesResource.class);
httpClientBinder(binder).bindHttpClient("nodes", ForNodes.class)
.withTracing()
.withConfigDefaults(config -> {
config.setIdleTimeout(new Duration(30, SECONDS));
config.setRequestTimeout(new Duration(10, SECONDS));
});

// cleanup
binder.bind(ExecutorCleanup.class).asEagerSingleton();
}
Expand Down
31 changes: 31 additions & 0 deletions core/trino-main/src/main/java/io/trino/server/ForNodes.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.trino.server;

import javax.inject.Qualifier;

import java.lang.annotation.Retention;
import java.lang.annotation.Target;

import static java.lang.annotation.ElementType.FIELD;
import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.PARAMETER;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

@Retention(RUNTIME)
@Target({FIELD, PARAMETER, METHOD})
@Qualifier
public @interface ForNodes
{
}
Loading