Skip to content

Coordinator-driven graceful decommission/recommission of worker nodes#14876

Closed
dzhi-lyft wants to merge 25 commits intotrinodb:masterfrom
dzhi-lyft:graceful-decomrecom-20221103
Closed

Coordinator-driven graceful decommission/recommission of worker nodes#14876
dzhi-lyft wants to merge 25 commits intotrinodb:masterfrom
dzhi-lyft:graceful-decomrecom-20221103

Conversation

@dzhi-lyft
Copy link
Copy Markdown

Support logic in coordinator to graceful decommission and recommission of workers.

  1. Add endpoint to list node and their rich states (to be used by autoscaler).
  2. Add endpoint to refresh node with list of nodes to exluced/decommission.
  3. Add logic in coordinator to track and exclude nodes to decommission.
  4. Add DECOMMISSIONING and DECOMMISSIONED in NodeState
  5. Add handling of decommission and recommission request in workers.

reference: #9976

@dzhi-lyft
Copy link
Copy Markdown
Author

This was #10011 on top of latest code

@dzhi-lyft
Copy link
Copy Markdown
Author

The original PR was #10011, created on Nov 19, 2021. The PR has been running in Lyft's production Trino clusters since March 2022, initially on top of build 359.11, currently on build 365.24 (both use Java 11). It was very stable (at least we haven't encountered any problem in last 8 months). We build our autoscaler based on this PR to achieve pod-level autoscaling, and it has reduced total pod hours by 40% (over a few million dollars estimated) comparing to a previous naive cluster-wide auto-scaling that is purely based on time window. So we recommend this PR for Trino community.

This new PR applied the same code on top of latest code (about 350-day worth of commits).
Add @bitsondatadev @losipiuk @martint @dain, involvers in the original PR.

@dzhi-lyft
Copy link
Copy Markdown
Author

Comments from the original PR (hopefully this helps instead of confusing people).

@losipiuk losipiuk 5 hours ago
High level question. why do we need a concept of "decommissioned" node? Why can't we just kill nodes to be scaled down. And if we scale up, add new ones?

Member
@raunaqmorarka raunaqmorarka 5 hours ago
A node can stay in graceful shutdown state for a long time potentially while it is draining running tasks. During this time scaling requirements can change. Having the ability to bring back nodes from graceful shutdown state to active can be useful for getting nodes immediately rather than waiting to acquire containers/nodes, bring up Trino server process and have it join the cluster. It also potentially saves resources as we might have a situation of many nodes in graceful shutdown which don't do much work but we bring up new nodes because we can't "recommission" them.

Member
Author
@dzhi-lyft dzhi-lyft 4 hours ago
DECOMMISSIONED is a state of the Trino worker. Node (in our case the k8s pod) termination usually happens after. Once the worker is DECOMMISSIONED, the node/pod can be terminated gracefully without impact to any query. Basically DECOMMISSIONED is a logical state where no task will be scheduled on the worker, despite the TrinoServer process is still running.

In majority cases, the pod is terminated afterward but in rare cases, a DECOMMISSIONED node can be recommissioned, if the query traffic justifies, so we save the termination of a pod and allocation of a new one.

Member
Author
@dzhi-lyft dzhi-lyft 3 hours ago
@raunaqmorarka Just refreshed and noticed your comment. Your comment is on the point and I agree.

Member
@losipiuk losipiuk 2 hours ago
Ok thanks - DECOMISSIONING state indeed makes sense. Not sure about DECOMISSINED - as starting up new Trino process should be negilibible cost in most scenrios. But maybe it may be useful. I guess handling DECOMISSIONING and DECOMISSIONED is same anyway.

Member
@losipiuk losipiuk 2 hours ago
So the other question then? Maybe given the fact that we have fault-tolerant execution mode right now this is not really a crucial feature. With FTE you can just shoot the node, not wait for a graceful shutdown at all, or just wait for a limited time.
Some work will be wasted, but the query as a whole will survive. Would that be feasible for your use-case @dzhi-lyft ?

@arhimondr any comments on this one?

Member
Author
@dzhi-lyft dzhi-lyft 11 minutes ago
Task retry on failures (FTE) would increase the robustness of Trino cluster in cases like node failure (unexpected termination, persistent error) etc. It is more of an independent feature as to node-level graceful decommission. The key flexibility this PR provides is bi-directional transition between ACTIVE and DECOMMISSIONING state so to reduce regret during auto-scaling.

We have autoscaler component periodically (every 40 seconds) polls status of the clusters and decides the desired node count based on recent cluster utilization, such desired count may change due to query traffic fluctuation (particularly in clusters where the query time out is 2 hours), and some nodes may transition from DECOMMISSIONING back to ACTIVE as needed. It avoids unnecessary termination of pods which sometime wait in their pre-stop hook for long time, and avoids reconciliation confusion in k8s controller with large number of pods in TERMINATING and new ones are requested.

@losipiuk losipiuk self-requested a review November 4, 2022 19:04
Copy link
Copy Markdown
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

skimmed. High level comments (and some nitpicks too)

Comment thread core/trino-main/src/main/java/io/trino/metadata/AllNodes.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/AutoScaleResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/DecommissionHandler.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/ServerInfoResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/AutoScaleResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/AutoScaleResource.java Outdated
@dzhi-lyft
Copy link
Copy Markdown
Author

dzhi-lyft commented Nov 8, 2022

@losipiuk Thanks for the comments. To prepare for the test of any code change, I am working on launching a test cluster with Trino 402 (our clusters are running Trino 365). Other than Java 17, there are plugin related interface change and some defunct properties. I am very close to have a test cluster ready and will work on the code again (after almost a year).

@dzhi-lyft
Copy link
Copy Markdown
Author

🔨

Copy link
Copy Markdown
Member

@losipiuk losipiuk left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some more comments, questions (partial review)

Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/metadata/DiscoveryNodeManager.java Outdated
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not convinced we need this one. Can we make it simpler and assume requests to decommision/recommision nodes are done by talking to nodes directly?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is to leverage and respect coordinator being the authoritative component that manages worker state. Comparing to directly talk to individual workers, the benefits are:

  1. request once to the coordinator with set of nodes to be decommissioned. coordinator will immediately stop scheduling new tasks to the node.
  2. coordinator already has communication channel with worker nodes, the external component (autoscaler or cluster manager etc) does not need to directly talk to individual end points of the worker, and rely on individual worker to notify coordinator its decommissioning status, and then coordinator no longer schedule task to the worker. The later is overall less efficient and may have split brain issue by design.
  3. ask once the coordinator to retrieve list of nodes and their authoritative states

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do no feel strongly agains but i would love another opionion on this one. @findepi / @electrum ?
It is not in-line with interface for scheduling node shutdown - we can do that only by drectly talking to the node.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The idea is similar to the graceful decommission support in Hadoop https://hadoop.apache.org/docs/stable/hadoop-yarn/hadoop-yarn-site/GracefulDecommission.html where ResourceManager is like the coordinator. It offers an example but not necessarily a justification of the design as YARN does not have Trino's current way of shutdown worker node concept.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So maybe we can get off without it. I do not see a huge benefit of proxying those requestes to worker nodes via coordinator.
@findepi please share your opinion here.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we make it simpler and assume requests to decommision/recommision nodes are done by talking to nodes directly?

That's more in line with what we do today.
Also, such design is more sustainable from cloud / k8s perspective, where it's way easier to just let the node manager to interrupt/kill a node, and much harder to configure it so it talks to the coordinator in a Trino-specific manner.

The idea is to leverage and respect coordinator being the authoritative component that manages worker state.

That would be nice to have from programmer's perspective, but it's not possible.
For example, coordinator doesn't start new nodes. Coordinator does not -- and will not -- know how to provision a new VM/EC2 instances for example.
Also, coordinator still needs to account for workers shutting down.
"coordinator being the authoritative component that manages worker state" is a simplification we will never be able to leverage, so it's a complication

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new ability to decommission/recommission a set of nodes through refreshNodes() against the coordinator endpoint is not in conflict with the existing shutdown request directly to the worker node. Users use the current shutdown (like from a pod's pre-stop hook) can still use it as is. That said, let k8s delete a pod before decommission and rely on pre-stop hook to wait for task completion has limitations so we don't do it in our clusters. It is also compatible with the way that workers start and register into coordinator. The whole idea is almost identical to the Hadoop way of graceful decommission (so is not bizarre and abnormal at least).

The whole feature of this PR is to facilitate automatic and more efficient/intelligent auto-scaling of the cluster from an external component (autoscaler in our case). autoscaler periodically refreshes status of all nodes and recent queries, calculate cluster-wide utilization scores, decide the desired number of pods based on recent usage scores, select workers to decommission/recommission as necessary, and delete decommissioned pods as fit etc. The decisions are based on status of the whole cluster including all workers, rather than a local decision made by one individual worker.

The refreshNodes() way appears natural and efficient to decommission/recommission a set of nodes than individually talk to each worker directly. It is also one of the key features the PR is to provide and our logic relies on. I don't necessarily expect to be able to convince people but maybe helpful to make my opinion clear at least.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the clarification. I do not think it is bizzare or abnormal FWIW. Just I am not convinced that proxying state-change requests via coordinator buys us much vs directly talking to worker nodes. And it adds a bit of complexity to Trino which needs to be maintained afterward. If you want to build an autoscaler solution you still need to have an external process that talks to Trino API, and makes decisions should we scale up or down and what to kill. It feels to me like it does not change much for that external process whether it talks just to the coordinator or also to worker nodes. The process will know the worker nodes' endpoints anyway as it polls for worker information.

I would vote for simplifying the coordinator logic and just exposing R/O information, which is needed to make scale-up/down decisions. And assume actuation is done by external process directly talking to workers.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Status of all nodes is periodically pulled once from the coordinator through the v1/nodes endpoint instead of polling all workers (200 in our clusters) individually.

Copy link
Copy Markdown
Member

@jerryleooo jerryleooo Aug 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We had one case where talking to a worker failed (even ssh failed, which might be a hardware issue).
The worker keeps receiving tasks and causing errors, while we can not do anything. In such case, stopping the worker via coordinator would be very useful @losipiuk @dzhi-lyft @findepi

Comment thread core/trino-main/src/main/java/io/trino/server/QueriesResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/QueriesResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/QueriesResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/QueriesResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/QueriesResource.java Outdated
Comment thread core/trino-main/src/main/java/io/trino/server/QueriesResource.java Outdated
dain and others added 16 commits December 11, 2022 17:32
When refresh token is retrieved for UI, currently we were sending
HTTP Status 303, assuming that all the request will just repeat the
call on the Location header. When this works for GET/PUT verbs, it does
not for non-idempotent ones like POST, as every js http client should
do a GET on LOCATION after 303 on POST. Due to that I change it to 307, that
should force every client to repeat exactly the same request,
no matter the verb.

Co-authored-by: s2lomon <s2lomon@gmail.com>
Actual work is done in `pageProjectWork.process()` call while
`projection.project` only performs setup of projection.
So both `expressionProfiler` and `metrics.recordProjectionTime`
needed to be around that method.
Removes outdated comments and unnecessary methods in local exchange
PartitioningExchanger since the operator is no longer implemented
in a way that attempts to be thread-safe.
- Change ColumnHandle to BigQueryColumnHandle in BigQueryTableHandle
- Extract buildColumnHandles in BigQueryClient
ebyhr and others added 7 commits December 13, 2022 15:00
The new field allows the table function to declare during
analysis which columns from the input tables are necessary to
execute the function.

The required columns can be then validated by the analyzer.
This declaration can be also used by the optimizer to prune
any input columns that are not used by the table function.
@dzhi-lyft dzhi-lyft force-pushed the graceful-decomrecom-20221103 branch from 34312ba to 6b35179 Compare December 14, 2022 21:08
@dzhi-lyft
Copy link
Copy Markdown
Author

Rebased the PR to latest Trino code as of 2022-12-14.

@dzhi-lyft
Copy link
Copy Markdown
Author

🔨

@findepi
Copy link
Copy Markdown
Member

findepi commented Dec 15, 2022

Rebased the PR to latest Trino code as of 2022-12-14.

You don't need a rebase for that, since the CI runs on a merge between the PR and master anyway (this is also why CI won't run at all if there are merge conflicts)

To force CI re-run you can just add an empty commit (git commit --only --allow-empty -m empty).

@dzhi-lyft
Copy link
Copy Markdown
Author

Thanks for the tip. It's nice to see All checks PASS again! (I also re-verified the feature in our cluster with my private build based on latest Trino code).

@dzhi-lyft dzhi-lyft closed this Dec 15, 2022
@dzhi-lyft dzhi-lyft reopened this Dec 15, 2022
@dzhi-lyft
Copy link
Copy Markdown
Author

Accidentally clicked "Close with comment" instead of "Comment", feel dumb.

Comment on lines 232 to 240
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Honestly I am not a big fan of this dichotomy. I would rather keep what workers report as a source of truth, and report that.

Also the quesion here may become obsolete if we would decide to drop NodesResource.refreshNodes as discused in https://github.com/trinodb/trino/pull/14876/files#r1051663508

Support logic in coordinator to graceful decommission and recommission of workers.
  1. Add endpoint to list node and their rich states (to be used by autoscaler).
  2. Add endpoint to refresh node with list of nodes to exluced/decommission.
  3. Add logic in coordinator to track and exclude nodes to decommission.
  4. Add DECOMMISSIONING and DECOMMISSIONED in NodeState
  5. Add handling of decommission and recommission request in workers.

reference: trinodb#9976
@dzhi-lyft dzhi-lyft force-pushed the graceful-decomrecom-20221103 branch from 163d292 to bcef885 Compare December 22, 2022 18:35
@dzhi-lyft
Copy link
Copy Markdown
Author

@losipiuk @findepi Thanks for reviewing the PR. It helps us to have a better and more aligned PR once we move on to recent Trino release (from Trino 365). On the other hand, there are different opinions on the key idea of coordinator driven decommission. I am not convinced for the idea to directly talk to individual worker nor be able to convince reviewers on the coordinator driven way in the PR. There might be different opinion or more creative ideas arisen in the future to solve such puzzle. For now, I won't actively maintain this PR to be current and clean of CI error until there is a motivation to do so.

@losipiuk
Copy link
Copy Markdown
Member

@losipiuk @findepi Thanks for reviewing the PR. It helps us to have a better and more aligned PR once we move on to recent Trino release (from Trino 365). On the other hand, there are different opinions on the key idea of coordinator driven decommission. I am not convinced for the idea to directly talk to individual worker nor be able to convince reviewers on the coordinator driven way in the PR. There might be different opinion or more creative ideas arisen in the future to solve such puzzle. For now, I won't actively maintain this PR to be current and clean of CI error until there is a motivation to do so.

Thanks, @dzhi-lyft for working on that. It is a shame that it does not get merged in the end but I do not think we should merge if we do not believe it matches general the Trino picture. I very much believe it is useful for your org, and I hope (as you suggest) that the work is not wasted for you.
The question is should we keep the PR open if we have no intention on maintainign nor merging it. @bitsondatadev ?

@bitsondatadev
Copy link
Copy Markdown
Member

Thanks for your work @dzhi-lyft and for all the work of the maintainers here, especially @losipiuk. This will definitely be a good PR to reference if this topic comes back up again. As Daniel suggests he will not be maintaining this PR so let's close it. If anyone wants to continue this work to find a solution, I recommend having a conversation with the maintainers first. There seems to be a design discussion that must be resolved before any further time is spent coding.

Thanks all!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Development

Successfully merging this pull request may close these issues.