Skip to content

Don't patch httproute status if it hasn't changed #12215

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Mar 18, 2024
Merged

Conversation

adleong
Copy link
Member

@adleong adleong commented Mar 7, 2024

Fixes #12104

The leader of the policy controllers (as determined by a lease resource) is in charge of keeping the HttpRoute resource statuses up to date. To accomplish this, it performs reconciliation every 10 seconds where it computes the desired status for each HttpRoute and patches the resource. When there are many HttpRoute resources in the cluster, most of which change infrequently, this results in a large number of redundant no-op patches: one per HttpRoute every 10 seconds.

These patches are stored in an in-memory unbounded queue while waiting to be dispatched to the Kube API. If the rate that these patches are generated exceeds the rate that the Kube API can process them (synchronously) then this queue will grow without bound, resulting in a memory leak.

To address this we do two things:

  • Use a bounded queue of size 10000. If the queue fills up, patches will be dropped. Statuses will be reconciled in a later iteration of the reconciliation loop once there is queue capacity.
  • Only generate a patch when the desired status differs from the current status. This greatly reduces the number of patches generated.

TODO in follow up PRs:

  • Add timeout and metrics to the Kube API patch calls
  • Add queue metrics for enqueues, dequeues, dropped patches, and avg queue depth

@adleong adleong requested a review from a team as a code owner March 7, 2024 00:33
Signed-off-by: Alex Leong <[email protected]>
@@ -159,8 +164,9 @@ async fn main() -> Result<()> {

// Build the status index which will maintain information necessary for
// updating the status field of policy resources.
let (updates_tx, updates_rx) = mpsc::unbounded_channel();
let status_index = status::Index::shared(hostname.clone(), claims.clone(), updates_tx);
let (updates_tx, updates_rx) = mpsc::channel(10000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to make this a constant to make it more visible? Also can you share the reasoning behind the number? As a point of comparison, in the endpoint translator the bounded queue size is just 100.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

queue size was picked mostly arbitrarily. I wanted this to be big enough to hold the most patches we could possibly process in a 10 second interval (the frequency of reconciles). If it took 1ms to apply a patch, this would be 10000. Very conservative estimate.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think that we should document this relationship. I'd also really like it if we could extract this hardcoded 10s timeout so that it's configured from the main. That way we can specify both of these constants in the same place -- ideally, up at the top of the file with the rest of the consts.

Signed-off-by: Alex Leong <[email protected]>
Comment on lines 135 to 142
match (self.leader, claim.is_current_for(&self.name)) {
(true, false) =>
tracing::debug!("Lease holder has changed; no longer the leader"),
(false, true) =>
tracing::debug!("Lease holder has changed; now the leader"),
_ => {}
}
self.leader = claim.is_current_for(&self.name);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
match (self.leader, claim.is_current_for(&self.name)) {
(true, false) =>
tracing::debug!("Lease holder has changed; no longer the leader"),
(false, true) =>
tracing::debug!("Lease holder has changed; now the leader"),
_ => {}
}
self.leader = claim.is_current_for(&self.name);
let was_leader = self.leader;
self.leader = claim.is_current_for(&self.name);
if was_leader != self.leader {
tracing::debug!(leader = %self.leader, "Leadership changed");
} else {
tracing::trace!(leader = %self.leader, "Leadership unchanged");
}

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Even at trace level, logging that leadership is unchanged every 10s does not seem particularly useful.

Comment on lines +352 to +358
if let Some(patch) = self.make_http_route_patch(id, route) {
if let Err(error) = self.updates.try_send(Update {
id: id.clone(),
patch,
}) {
tracing::error!(%id.namespace, route = ?id.gkn, %error, "Failed to send HTTPRoute patch")
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's useful to log the error message here. There are specifically two kinds of errors we can hit:

  • The receiver has been dropped, in which case we have no reason to log or do any further work.
  • The channel is at capacity.

If the channel is at capacity, what should we do? I assume we'll typically continue to log errors for each and every route. Is that desirable?

We could also use Sender::reserve() to eagerly ensure the channel has capacity.

Suggested change
if let Some(patch) = self.make_http_route_patch(id, route) {
if let Err(error) = self.updates.try_send(Update {
id: id.clone(),
patch,
}) {
tracing::error!(%id.namespace, route = ?id.gkn, %error, "Failed to send HTTPRoute patch")
}
let Ok(tx) = match self.updates.reserve() else {
tracing::error!("Cannot process additional status updates.")
return;
};
if let Some(patch) = self.make_http_route_patch(id, route) {
tx.send(Update {
id: id.clone(),
patch,
});

But should we go further? If we can't actually process status updates, should we relinquish our claim on the release?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

With the changes in this PR, we do not expect this queue to ever fill up under normal conditions. It should only run out of capacity in pathological cases like if there is an extremely high rate of real status changes or if the Kube API becomes extremely slow, unreachable, or we're unable to process patches for some other reason. In any of those cases, I think the log messages are helpful so that we can see that we're in such a pathological case. If the channel is at capacity we will log an error for each route which does not match its desired state, every 10 seconds, until capacity becomes available.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the benefit to using reserve? Is this to be able to skip doing the work of building the patch if capacity is not available? Not sure, but this seems like a premature optimization.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Relinquishing the claim if we can't process status updates sounds like a more robust behavior but it's also a larger change that isn't necessary to fix the immediate issue. I'd like to punt on that and say that pathological cases where we're unable to process status updates need to be remediated manually.

@@ -159,8 +164,9 @@ async fn main() -> Result<()> {

// Build the status index which will maintain information necessary for
// updating the status field of policy resources.
let (updates_tx, updates_rx) = mpsc::unbounded_channel();
let status_index = status::Index::shared(hostname.clone(), claims.clone(), updates_tx);
let (updates_tx, updates_rx) = mpsc::channel(10000);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do think that we should document this relationship. I'd also really like it if we could extract this hardcoded 10s timeout so that it's configured from the main. That way we can specify both of these constants in the same place -- ideally, up at the top of the file with the rest of the consts.

Signed-off-by: Alex Leong <[email protected]>
Signed-off-by: Alex Leong <[email protected]>
Copy link
Member

@olix0r olix0r left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've refreshed myself on the current state of the status manager and a few
(pre-existing) issues stand out that I think we should take this change into
consideration:

  1. Nothing constrains how long a patch operation may take. You've noted that the
    Kubernetes API being very slow is pathological and that can lead to issues
    that would cause this controller to back up. This raises a few important
    issues:
    • We need to bound this with a timeout so that we can properly size the queue.
    • We should measure the histogram of patch durations.
    • We should countthe number of failures (including timeouts).
  2. The receiver only processes updates while it is the leader. This means that
    if a process loses leadership while patches are enqueued, they will remain in
    the queue until the process regains leadership, and then the very old patches
    will be applied.
    • We need to continue processing patches, even if we lose leadership.
    • We should count the number of patches that are dropped.
  3. I'm wary about getting into an unhealthy state where the controller does
    nothing but emit error messages for every resource it encounters. If we fail
    fast, we can communicate more generally that reconcilation has been halted
    because the buffer is at capacity.
  4. The queue size metric, as implemented here, is of limited use: (1) the value
    is only updated before reconcilation starts, and (2) it will then be sampled
    by readers of the /metrics endpoint. The first issue is easy to avoid, but I
    also think this would be an excellent use of a Peak-EWMA measurement to avoid
    the latter as well.

I recognize that we'll probably want to merge parts of this in discrete PRs. Perhaps we split this PR itself into one that only changes the mpsc type, and we can address these other (required, imo) fixes, including metrics, in separate changes?

Comment on lines 208 to 209
let queue_size = index.updates.max_capacity() - index.updates.capacity();
index.metrics.update_queue_size.set(queue_size as i64);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will only update this metric on reconciliation or leadership change, right? And it will do so before calling reconcile, so we don't get the up-to-date information after the queue has been filled. We're really measuring the low-point here. We also aren't counting overflow events, so we have no way to understand if they are occurring in between reconciliations.

I think it would be preferable to track the queue size at the insertion and removal points, rather than snapshotted periodically.

Even then, this instantaneous value is going to be of limited utility due to the nature of metrics sampling. We should consider some more robust ways to measure this.

We can keep an EWMA Load average of the queue, giving us the average queue depth over the past ~decay. This allows us to understand queue health without being misled by sampling bias.

self.leader = claim.is_current_for(&self.name);
if was_leader != self.leader {
tracing::debug!(leader = %self.leader, "Leadership changed");
}
}
// If this policy controller is not the leader, it should
// process through the updates queue but not actually patch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't quite comment on the line I want, but this comment here does not accurately reflect what this code does, and that is a subtle bug lurking:

                Some(Update { id, patch}) = self.updates.recv(), if self.leader => {

This condition mans that we will only ever check the channel if we are currently the leader. This means that if there is a leadership change (and we lose leadership), the channel can end up holding patches indefinitely, and when leadership is resumed we'll process very old patches. This if condition must be moved within the body of the block, and we should drop updates while we're not the leader.

// If this policy controller is not the leader, it should
// process through the updates queue but not actually patch
// any resources.
if self.leader {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It may be desirable to log debug messages when we drop events.

@adleong adleong merged commit 66efb61 into main Mar 18, 2024
35 checks passed
@adleong adleong deleted the alex/status-leak branch March 18, 2024 21:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Policy Controller Memory Leak
3 participants