Skip to content

feat: add SyncDesiredState RPC for full state reconciliation#5624

Merged
chronark merged 7 commits intomainfrom
04-07-feat_speed_up_deployments
Apr 7, 2026
Merged

feat: add SyncDesiredState RPC for full state reconciliation#5624
chronark merged 7 commits intomainfrom
04-07-feat_speed_up_deployments

Conversation

@chronark
Copy link
Copy Markdown
Collaborator

@chronark chronark commented Apr 7, 2026

What does this PR do?

Deployments were taking minutes to appear in k8s because the old WatchDeploymentChanges RPC handled both full sync and incremental changes on a single stream. On startup (or every periodic reconnect), krane sent version=0 which triggered the server to paginate
through every deployment, sentinel, and cilium policy before entering incremental mode — blocking real-time events for the entire duration.

This PR separates the unified deployment changes stream into two distinct endpoints to improve performance and reliability:

  1. WatchDeploymentChanges - Now handles only incremental changes from the deployment_changes outbox table. When starting with version=0, it jumps to the current max version instead of replaying all historical data.
  2. SyncDesiredState - New endpoint that streams the complete desired state for a region (all running deployments, active sentinels, and cilium policies) then closes the connection.

The changes also introduce a semaphore to limit concurrent dispatches in the krane watcher, preventing the Kubernetes API from being overwhelmed during full syncs.

Key improvements:

  • Faster startup times for krane agents (no more full replay on first connect)
  • Independent full sync operations that don't block real-time event delivery
  • Better resource management with bounded concurrency
  • More reliable state reconciliation through dedicated sync endpoint

Type of change

  • Enhancement (small improvements)
  • New feature (non-breaking change which adds functionality)

How should this be tested?

  • Deploy krane agents and verify they connect successfully to the new endpoints
  • Test that incremental changes are delivered in real-time via WatchDeploymentChanges
  • Verify that SyncDesiredState returns complete state and closes the stream
  • Monitor that concurrent dispatches are properly limited during full syncs
  • Confirm that agents can recover gracefully from connection failures

Checklist

Required

  • Filled out the "How to test" section in this PR
  • Read Contributing Guide
  • Self-reviewed my own code
  • Commented on my code in hard-to-understand areas
  • Ran pnpm build
  • Ran pnpm fmt
  • Ran make fmt on /go directory
  • Checked for warnings, there are none
  • Removed all console.logs
  • Merged the latest changes from main onto my branch with git pull origin main
  • My changes don't cause any responsiveness issues

Appreciated

  • Updated the Unkey Docs if changes were necessary

@vercel
Copy link
Copy Markdown

vercel bot commented Apr 7, 2026

The latest updates on your projects. Learn more about Vercel for GitHub.

Project Deployment Actions Updated (UTC)
dashboard Ready Ready Preview, Comment Apr 7, 2026 4:43pm

Request Review

@coderabbitai
Copy link
Copy Markdown
Contributor

coderabbitai bot commented Apr 7, 2026

📝 Walkthrough

Walkthrough

The changes introduce a new SyncDesiredState RPC to the controller service that streams a full snapshot of desired state (deployments, sentinels, cilium policies) and refactors the watcher to use separate periodic full-sync and incremental change-watch loops. Query filters now restrict deployment topologies to running status only. New metrics track sync operations and event processing across both paths.

Changes

Cohort / File(s) Summary
Database Query Layer
pkg/db/queries/deployment_topology_list_all_by_region.sql, pkg/db/deployment_topology_list_all_by_region.sql_generated.go, pkg/db/querier_generated.go
Updated ListAllDeploymentTopologiesByRegion query to filter for desired_status = 'running' in addition to region and pagination filters. Updated method documentation to reference SyncDesiredState reconciliation instead of full-sync bootstrapping.
gRPC Protocol Definitions
svc/ctrl/proto/ctrl/v1/cluster.proto
Added new SyncDesiredState RPC endpoint and SyncDesiredStateRequest message. Added replay field to WatchDeploymentChangesRequest for controlling replay behavior on initial connection. Updated service documentation to clarify outbox-driven incremental polling semantics.
Controller Service Implementation
svc/ctrl/services/cluster/rpc_sync_desired_state.go, svc/ctrl/services/cluster/BUILD.bazel
Implemented new SyncDesiredState RPC that streams full desired-state snapshot via three paginated helpers (syncDeployments, syncSentinels, syncCiliumPolicies). Validates required headers, measures full-sync duration, and records success/error metrics.
Controller Watch Stream Refactoring
svc/ctrl/services/cluster/rpc_watch_deployment_changes.go
Removed full-sync path from incremental watch stream. When version_last_seen == 0 and replay is false, stream now starts from current max version. Updated state materialization to use primary DB for point lookups. Changed error handling to distinguish db.IsNotFound errors from other failures. Updated documentation to reflect incremental outbox-driven behavior.
Watcher Architecture Refactor
svc/krane/internal/watcher/watcher.go, svc/krane/internal/watcher/BUILD.bazel
Refactored watcher to run separate runPeriodicFullSync (via SyncDesiredState RPC) and runStream (incremental changes) loops concurrently. Added semaphore to cap concurrent event dispatches. Increased full-sync interval from 5 to 10 minutes. Changed stream reconnect to resume from versionLastSeen without triggering full sync. Dispatch now occurs in goroutines to prevent blocking receive loop.
Controller Service Metrics
svc/ctrl/pkg/metrics/prometheus.go
Added SyncDesiredStateTotal (status-labeled counter), SyncDesiredStateEventsSentTotal (resource-type-labeled counter), and DeploymentChangesProcessedTotal (resource-type and status-labeled counter). Updated FullSyncDurationSeconds documentation to reference SyncDesiredState RPC.
Watcher Metrics
svc/krane/pkg/metrics/prometheus.go
Added StreamConnectionsTotal, StreamEventsReceivedTotal, DispatchTotal, FullSyncEventsReceivedTotal, and FullSyncDurationSeconds metrics. Updated WatcherFullSyncsTotal help text to reflect 10-minute full-sync interval triggered via SyncDesiredState.
Testing Support
svc/krane/internal/testutil/mock_cluster_client.go
Added SyncDesiredStateFunc hook and SyncDesiredState method implementation to MockClusterClient for testing the new RPC.

Sequence Diagram(s)

sequenceDiagram
    actor Client
    participant Watcher
    participant Controller
    participant DB

    rect rgba(100, 150, 200, 0.5)
        Note over Watcher,DB: New SyncDesiredState Full-Sync Flow (Periodic)
        Watcher->>Watcher: runPeriodicFullSync() every 10min
        Watcher->>Controller: SyncDesiredState(region)
        activate Controller
        Controller->>DB: FindRegionByNameAndPlatform()
        DB-->>Controller: region
        Controller->>DB: ListDeploymentTopologies(region, pageSize)
        DB-->>Controller: deployments batch
        Controller->>Controller: Convert to DeploymentChangeEvents
        Controller-->>Watcher: Stream events
        Controller->>DB: ListSentinels(region, pageSize)
        DB-->>Controller: sentinels batch
        Controller->>Controller: Convert to ChangeEvents
        Controller-->>Watcher: Stream events
        Controller->>DB: ListCiliumPolicies(region, pageSize)
        DB-->>Controller: policies batch
        Controller-->>Watcher: Stream events
        deactivate Controller
        Watcher->>Watcher: Dispatch events via semaphore
        Watcher->>Client: Handle reconciliation
    end

    rect rgba(150, 200, 100, 0.5)
        Note over Watcher,DB: Incremental Change-Watch Flow (Continuous)
        Watcher->>Controller: WatchDeploymentChanges(version=0, replay=false)
        activate Controller
        Controller->>DB: GetDeploymentChangesMaxVersion()
        DB-->>Controller: current_version
        Controller->>Controller: Start polling from current_version
        loop Every poll interval
            Controller->>DB: FetchDeploymentChangeEvents(version, pageSize)
            DB-->>Controller: change events
            Controller-->>Watcher: Stream events
        end
        deactivate Controller
        Watcher->>Watcher: Acquire semaphore
        Watcher->>Watcher: Dispatch in goroutine
        Watcher->>Client: Event received
    end
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 50.00% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Title check ✅ Passed The title accurately and concisely describes the main addition: a new SyncDesiredState RPC endpoint for full state reconciliation, which is the primary feature introduced in this changeset.
Description check ✅ Passed The description comprehensively covers the purpose of the changes, type of change, testing approach, and includes all required checklist items marked as completed.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch 04-07-feat_speed_up_deployments

Warning

There were issues while running some tools. Please review the errors and either fix the tool's configuration or disable the tool if it's a critical failure.

🔧 golangci-lint (2.11.4)

Command failed


Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Copy Markdown
Collaborator Author

chronark commented Apr 7, 2026

This stack of pull requests is managed by Graphite. Learn more about stacking.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 6

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
svc/ctrl/services/cluster/rpc_watch_deployment_changes.go (1)

118-134: ⚠️ Potential issue | 🔴 Critical

NotFound is not safe to treat as a no-op.

If the backing topology/policy row was deleted before this poller resolved it, the cursor advances with no Delete* event. Krane then keeps the stale resource forever. Persist enough tombstone data in deployment_changes, or emit deletes before removing the source row.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@svc/ctrl/services/cluster/rpc_watch_deployment_changes.go` around lines 118 -
134, The current handler treats db.IsNotFound(err) as a benign skip and advances
the cursor, which loses delete semantics and leaves stale resources; change the
db.IsNotFound branch in rpc_watch_deployment_changes.go (the code that appends
to events and uses ctrlv1.DeploymentChangeEvent, change, events, change.Pk) to
emit an explicit delete/tombstone event instead of a no-op: populate a
DeploymentChangeEvent that denotes deletion (include ResourceType, ResourceID,
RegionID, Version/Pk and a tombstone/delete kind or set fields required by
downstream to remove the resource) and append that to events so the cursor still
advances but downstream will remove the stale resource; alternatively ensure the
producer persists tombstone data into deployment_changes so the existing code
path can produce delete events. Ensure the delete event schema matches what
downstream consumers expect.
🧹 Nitpick comments (3)
pkg/semaphore/semaphore_test.go (1)

43-49: Consider using t.Run() for panic test scenarios.

Per coding guidelines, tests should use t.Run() for scenarios. These two panic tests could be grouped:

♻️ Optional: organize with t.Run()
-func TestNew_PanicsOnZero(t *testing.T) {
-	require.Panics(t, func() { New(0) })
-}
-
-func TestNew_PanicsOnNegative(t *testing.T) {
-	require.Panics(t, func() { New(-1) })
+func TestNew_InvalidInput(t *testing.T) {
+	t.Run("panics on zero", func(t *testing.T) {
+		require.Panics(t, func() { New(0) })
+	})
+	t.Run("panics on negative", func(t *testing.T) {
+		require.Panics(t, func() { New(-1) })
+	})
 }

As per coding guidelines: **/*_test.go: Organize tests using t.Run() for scenarios.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/semaphore/semaphore_test.go` around lines 43 - 49, Combine the two panic
tests into a single TestNew function that uses t.Run subtests for the scenarios;
replace TestNew_PanicsOnZero and TestNew_PanicsOnNegative with a TestNew that
calls t.Run("zero", func(t *testing.T) { require.Panics(t, func() { New(0) }) })
and t.Run("negative", func(t *testing.T) { require.Panics(t, func() { New(-1) })
}), keeping the same assertions and referencing the New constructor for clarity.
pkg/semaphore/semaphore.go (1)

19-25: Consider panic recovery for observability.

If fn panics, the slot is correctly released via defer, but the panic goes unobserved (silently crashes the goroutine). Consider recovering and logging:

♻️ Optional: add panic recovery
 func (s *Semaphore) Do(fn func()) {
 	s.ch <- struct{}{}
 	go func() {
-		defer func() { <-s.ch }()
+		defer func() {
+			<-s.ch
+			if r := recover(); r != nil {
+				// Log or handle panic as appropriate
+			}
+		}()
 		fn()
 	}()
 }
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/semaphore/semaphore.go` around lines 19 - 25, The Do method on Semaphore
currently launches fn in a goroutine and releases the slot via defer but does
not observe panics; update Semaphore.Do to wrap the goroutine body with a defer
that recovers from panic, logs the recovered error (and stack if available) via
your logger, and then continues to ensure the slot is still released;
specifically modify the anonymous goroutine in Semaphore.Do to call defer
recover logic before calling fn() so any panic from fn is captured and logged
while still using the existing defer <-s.ch to free the slot.
pkg/db/queries/deployment_topology_list_all_by_region.sql (1)

17-17: Add composite index for efficient filtered pagination.

The new desired_status = 'running' filter changes query selectivity. Currently, only a single-column status_idx exists on desired_status. For efficient pagination with region-based filtering, consider adding a composite index on (region_id, desired_status, pk) to support the multi-column WHERE conditions and pk > ? range scan used for pagination.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@pkg/db/queries/deployment_topology_list_all_by_region.sql` at line 17, The
query in deployment_topology_list_all_by_region.sql now filters on r.id
(region_id), dt.desired_status = 'running' and dt.pk > after_pk, but only a
single-column status_idx exists; add a composite index to support the
multi-column WHERE and range scan by creating an index on (region_id,
desired_status, pk) (or the equivalent columns on the deployment_topology table)
so the planner can use the index for region+status filtering and pk > ?
pagination; update migration/DDL to add this composite index and remove or keep
status_idx as appropriate.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@svc/ctrl/proto/ctrl/v1/cluster.proto`:
- Around line 40-44: SyncDesiredState currently reuses the existing
DeploymentChangeEvent response (which WatchDeploymentChanges also uses) and
breaks Buf's RPC_REQUEST_RESPONSE_UNIQUE rule and naming conventions; fix by
introducing a new message named SyncDesiredStateResponse that contains/wraps the
existing DeploymentChangeEvent payload (e.g., a field holding the event message)
and update the rpc signature for SyncDesiredState to return stream
SyncDesiredStateResponse instead of stream DeploymentChangeEvent; ensure no
other RPCs are changed and keep the original DeploymentChangeEvent type for
WatchDeploymentChanges.

In `@svc/ctrl/services/cluster/rpc_sync_desired_state.go`:
- Around line 80-101: The loop currently advances the page cursor (afterPk) only
after a successful conversion/send, which can cause an infinite loop when rows
fail conversion or send; move the assignment of afterPk =
row.DeploymentTopology.Pk to occur immediately at the top of each iteration
(before calling deploymentRowToState and before stream.Send) in the loop that
iterates rows (and likewise in the other similar loop around lines 125-136), so
the cursor always advances even for skipped/errored rows; keep the existing
error handling and continue/return logic unchanged.
- Around line 64-76: The periodic full sync (syncDeployments and likewise
syncCiliumPolicies) only iterates "running" rows so it never emits tombstones
for missed stop/delete outbox entries; update syncDeployments (and the logic
used by syncCiliumPolicies) to include delete states or otherwise detect
absences: either change the DB call (db.ListAllDeploymentTopologiesByRegion /
similar list for policies) to return rows with deleted/stopped state as well and
emit explicit delete/tombstone DeploymentChangeEvent messages for those rows, or
implement a diff/prune pass that lists current cluster objects versus the DB
results and emits delete events for items missing from the DB. Ensure emitted
events use the same event shape the consumers expect so stale
Deployments/Policies are removed.

In `@svc/krane/internal/testutil/mock_cluster_client.go`:
- Around line 84-88: The mock method MockClusterClient.SyncDesiredState
currently returns (nil, nil) when SyncDesiredStateFunc is not set which leads to
panics; change it to return a clear error instead (e.g., return nil,
fmt.Errorf("unstubbed MockClusterClient.SyncDesiredState called for %T", req) or
errors.New("unstubbed SyncDesiredState")) so tests fail predictably; update the
implementation in MockClusterClient.SyncDesiredState to check
SyncDesiredStateFunc and return the explicit error when nil, referencing
SyncDesiredStateFunc, SyncDesiredStateRequest, and
connect.ServerStreamForClient[ctrlv1.DeploymentChangeEvent] in your change.

In `@svc/krane/internal/watcher/watcher.go`:
- Around line 105-114: The code advances versionLastSeen and updates metrics
regardless of dispatch success; change it so the cursor only advances after a
successful s.dispatch. Specifically, inside the s.sem.Do closure (the call site
using s.sem.Do and s.dispatch), check the dispatch error and only set
versionLastSeen and call metrics.WatcherVersionLastSeen when err == nil; if
dispatch returns an error, keep versionLastSeen unchanged and ensure the error
path (the logger.Error call) triggers whatever reconnect/requeue logic the
watcher uses (or returns the error to the caller) so the event will be retried
rather than acknowledged.
- Around line 68-69: Currently runPeriodicFullSync is started immediately and
runStream after jitter, which can miss changes; instead sample a high‑water mark
(current max revision/version) before doing anything, pass that snapshot into
the full sync and then start the watch from that same high‑water mark so no
changes between sampling and watch startup are lost. Concretely: add a pre-start
call that reads the current version (use your store/manager method that returns
the latest revision), then have runPeriodicFullSync perform the full sync up to
that sampled version and have runStream start watching from that sampled
version; update s.runPeriodicFullSync and s.runStream call sites to accept the
sampled high‑water mark and use it to bound the sync and the watch start point.

---

Outside diff comments:
In `@svc/ctrl/services/cluster/rpc_watch_deployment_changes.go`:
- Around line 118-134: The current handler treats db.IsNotFound(err) as a benign
skip and advances the cursor, which loses delete semantics and leaves stale
resources; change the db.IsNotFound branch in rpc_watch_deployment_changes.go
(the code that appends to events and uses ctrlv1.DeploymentChangeEvent, change,
events, change.Pk) to emit an explicit delete/tombstone event instead of a
no-op: populate a DeploymentChangeEvent that denotes deletion (include
ResourceType, ResourceID, RegionID, Version/Pk and a tombstone/delete kind or
set fields required by downstream to remove the resource) and append that to
events so the cursor still advances but downstream will remove the stale
resource; alternatively ensure the producer persists tombstone data into
deployment_changes so the existing code path can produce delete events. Ensure
the delete event schema matches what downstream consumers expect.

---

Nitpick comments:
In `@pkg/db/queries/deployment_topology_list_all_by_region.sql`:
- Line 17: The query in deployment_topology_list_all_by_region.sql now filters
on r.id (region_id), dt.desired_status = 'running' and dt.pk > after_pk, but
only a single-column status_idx exists; add a composite index to support the
multi-column WHERE and range scan by creating an index on (region_id,
desired_status, pk) (or the equivalent columns on the deployment_topology table)
so the planner can use the index for region+status filtering and pk > ?
pagination; update migration/DDL to add this composite index and remove or keep
status_idx as appropriate.

In `@pkg/semaphore/semaphore_test.go`:
- Around line 43-49: Combine the two panic tests into a single TestNew function
that uses t.Run subtests for the scenarios; replace TestNew_PanicsOnZero and
TestNew_PanicsOnNegative with a TestNew that calls t.Run("zero", func(t
*testing.T) { require.Panics(t, func() { New(0) }) }) and t.Run("negative",
func(t *testing.T) { require.Panics(t, func() { New(-1) }) }), keeping the same
assertions and referencing the New constructor for clarity.

In `@pkg/semaphore/semaphore.go`:
- Around line 19-25: The Do method on Semaphore currently launches fn in a
goroutine and releases the slot via defer but does not observe panics; update
Semaphore.Do to wrap the goroutine body with a defer that recovers from panic,
logs the recovered error (and stack if available) via your logger, and then
continues to ensure the slot is still released; specifically modify the
anonymous goroutine in Semaphore.Do to call defer recover logic before calling
fn() so any panic from fn is captured and logged while still using the existing
defer <-s.ch to free the slot.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 81ef17e3-e3fa-4976-93db-3411ef714680

📥 Commits

Reviewing files that changed from the base of the PR and between 8a88f70 and 6bd5d81.

⛔ Files ignored due to path filters (4)
  • gen/proto/ctrl/v1/cluster.pb.go is excluded by !**/*.pb.go, !**/gen/**
  • gen/proto/ctrl/v1/ctrlv1connect/cluster.connect.go is excluded by !**/gen/**
  • gen/rpc/ctrl/cluster_generated.go is excluded by !**/gen/**
  • web/apps/dashboard/gen/proto/ctrl/v1/cluster_pb.ts is excluded by !**/gen/**
📒 Files selected for processing (14)
  • pkg/db/deployment_topology_list_all_by_region.sql_generated.go
  • pkg/db/querier_generated.go
  • pkg/db/queries/deployment_topology_list_all_by_region.sql
  • pkg/semaphore/BUILD.bazel
  • pkg/semaphore/doc.go
  • pkg/semaphore/semaphore.go
  • pkg/semaphore/semaphore_test.go
  • svc/ctrl/proto/ctrl/v1/cluster.proto
  • svc/ctrl/services/cluster/BUILD.bazel
  • svc/ctrl/services/cluster/rpc_sync_desired_state.go
  • svc/ctrl/services/cluster/rpc_watch_deployment_changes.go
  • svc/krane/internal/testutil/mock_cluster_client.go
  • svc/krane/internal/watcher/BUILD.bazel
  • svc/krane/internal/watcher/watcher.go


// When true and version_last_seen is 0, replay all changes from the
// beginning instead of jumping to the current max version. Default false.
bool replay = 3;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this even being used?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

no, I wasn't quite sure whether to keep it or not tbh

Comment on lines 38 to +44
rpc WatchDeploymentChanges(WatchDeploymentChangesRequest) returns (stream DeploymentChangeEvent);

// SyncDesiredState streams the full desired state for a region: all running
// deployments, active sentinels, and cilium policies. The server closes the
// stream after all state has been sent. Krane calls this on startup and
// periodically as a safety net to reconcile any drift.
rpc SyncDesiredState(SyncDesiredStateRequest) returns (stream DeploymentChangeEvent);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in one WatchDeploymentChangesRequest we specify the region in SyncDesiredStateRequest we have no params and take the region from the headers.

We should just do it the same way in both requests

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah this is fucked, I wanted to overhaul that entirely
@coderabbitai create a ticket for me

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧩 Analysis chain

🏁 Script executed:

#!/bin/bash
gh issue create \
  --repo unkeyed/unkey \
  --title "proto: unify region passing in ClusterService RPCs (WatchDeploymentChanges vs SyncDesiredState)" \
  --body "## Problem

There is an inconsistency in how region is passed across \`ClusterService\` RPCs in \`svc/ctrl/proto/ctrl/v1/cluster.proto\`:

- \`WatchDeploymentChangesRequest\` includes a \`region\` field in the protobuf message itself.
- \`SyncDesiredStateRequest\` has no parameters and instead reads the region from HTTP headers (\`X-Krane-Region\` / \`X-Krane-Platform\`).

This inconsistency makes the API harder to reason about and maintain — callers have to know which transport mechanism to use for each RPC.

## Desired Outcome

Decide on a single, consistent approach for passing region across all \`ClusterService\` RPCs and update both the proto definitions and server-side implementations accordingly. Options include:

1. Add a \`region\` field to \`SyncDesiredStateRequest\` (match \`WatchDeploymentChanges\`).
2. Remove the \`region\` field from \`WatchDeploymentChangesRequest\` and rely on headers for both (match \`SyncDesiredState\`).

## Context

- Flagged during code review of PR `#5624` (feat: add SyncDesiredState RPC for full state reconciliation).
- Review comment: https://github.com/unkeyed/unkey/pull/5624#discussion_r3046408363

/cc `@chronark`" \
  --assignee chronark

Length of output: 44


@chronark, done! I've opened #5628 to track the overhaul. It captures the inconsistency between WatchDeploymentChangesRequest (region in proto field) vs SyncDesiredStateRequest (region from headers), and outlines the two options for unification.

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
svc/ctrl/services/cluster/rpc_sync_desired_state.go (1)

40-43: Consider CodeNotFound for missing region.

If FindRegionByNameAndPlatform returns sql.ErrNoRows, returning CodeInternal obscures the root cause. Distinguishing CodeNotFound helps clients understand whether to retry vs. fix config.

Suggested fix
 	if err != nil {
+		if errors.Is(err, sql.ErrNoRows) {
+			return connect.NewError(connect.CodeNotFound, fmt.Errorf("region %s/%s not found", platform, regionName))
+		}
 		logger.Error("failed to find region for SyncDesiredState", "error", err, "region_name", regionName, "platform", platform)
 		return connect.NewError(connect.CodeInternal, err)
 	}
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@svc/ctrl/services/cluster/rpc_sync_desired_state.go` around lines 40 - 43,
When FindRegionByNameAndPlatform returns sql.ErrNoRows in the SyncDesiredState
flow, don't map it to connect.CodeInternal; detect the not-found case using
errors.Is(err, sql.ErrNoRows) (or compare to sql.ErrNoRows) and return
connect.NewError(connect.CodeNotFound, err) while logging a clear "region not
found" message (keep the existing logger.Error call but adjust text/level as
appropriate); for all other errors continue returning connect.CodeInternal.
Ensure the code references the FindRegionByNameAndPlatform call and the error
handling block around it (and add an import for database/sql or use errors.Is as
needed).
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@svc/ctrl/services/cluster/rpc_sync_desired_state.go`:
- Around line 159-168: The loop updates afterPk after calling stream.Send which
is inconsistent with syncDeployments and syncSentinels; move the assignment so
afterPk = policy.Pk happens before calling stream.Send (inside the same for _,
policy := range rows loop) so the cursor advances prior to sending each
ctrlv1.DeploymentChangeEvent (the code using afterPk, ciliumPolicyToState and
stream.Send should be unchanged otherwise).

---

Nitpick comments:
In `@svc/ctrl/services/cluster/rpc_sync_desired_state.go`:
- Around line 40-43: When FindRegionByNameAndPlatform returns sql.ErrNoRows in
the SyncDesiredState flow, don't map it to connect.CodeInternal; detect the
not-found case using errors.Is(err, sql.ErrNoRows) (or compare to sql.ErrNoRows)
and return connect.NewError(connect.CodeNotFound, err) while logging a clear
"region not found" message (keep the existing logger.Error call but adjust
text/level as appropriate); for all other errors continue returning
connect.CodeInternal. Ensure the code references the FindRegionByNameAndPlatform
call and the error handling block around it (and add an import for database/sql
or use errors.Is as needed).
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: d61b73b1-cb93-4751-995e-cdf0fa6631c3

📥 Commits

Reviewing files that changed from the base of the PR and between 6bd5d81 and ea6f501.

📒 Files selected for processing (1)
  • svc/ctrl/services/cluster/rpc_sync_desired_state.go

Copy link
Copy Markdown
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🧹 Nitpick comments (3)
svc/krane/pkg/metrics/prometheus.go (1)

114-122: Consider wider tail buckets.
If large-region syncs exceed 60s, add 120/300s buckets for better SLO signal.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@svc/krane/pkg/metrics/prometheus.go` around lines 114 - 122, The
FullSyncDurationSeconds histogram created via promauto.NewHistogram
(prometheus.HistogramOpts) currently caps buckets at 60s; extend the Buckets
slice for better tail coverage by adding larger bucket values (e.g., 120 and
300) so FullSyncDurationSeconds can capture long-running full syncs and improve
SLO signal.
svc/ctrl/pkg/metrics/prometheus.go (1)

9-19: Consider renaming the metric to match the new RPC.

The metric name deployment_changes_full_sync_duration_seconds references the old location but is now used exclusively by SyncDesiredState. Consider renaming to sync_desired_state_duration_seconds for clarity—though this would be a breaking change for existing dashboards/alerts.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@svc/ctrl/pkg/metrics/prometheus.go` around lines 9 - 19, The metric name
deployment_changes_full_sync_duration_seconds used by FullSyncDurationSeconds is
outdated and should be renamed to reflect the new RPC; update the HistogramOpts
Name to sync_desired_state_duration_seconds (and adjust Help text if desired) in
the FullSyncDurationSeconds declaration to match SyncDesiredState usage, and
note this is a breaking rename so update any dashboards/alerts that reference
the old metric name.
svc/ctrl/services/cluster/rpc_watch_deployment_changes.go (1)

137-146: RW connection for reads increases primary load.

Using s.db.RW() for point lookups is justified by replica lag concerns (per comment), but shifts read load to the primary. Monitor primary connection pool utilization; consider read-your-writes consistency patterns if this becomes a bottleneck.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@svc/ctrl/services/cluster/rpc_watch_deployment_changes.go` around lines 137 -
146, The current loadChangeEvent uses s.db.RW() for point reads which shifts
read load to primary; change the FindDeploymentTopologyByDeploymentAndRegion
call (and similar point-lookup calls in loadChangeEvent) to use the read-replica
connection s.db.RO() and implement a safe fallback: if the RO query returns no
row (or a not-found error) retry once against s.db.RW() to handle replica lag.
Keep the query function names
(db.Query.FindDeploymentTopologyByDeploymentAndRegion and other db.Query.* calls
in loadChangeEvent) and ensure the fallback only triggers on
not-found/replica-miss cases to avoid unnecessary primary load.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Nitpick comments:
In `@svc/ctrl/pkg/metrics/prometheus.go`:
- Around line 9-19: The metric name
deployment_changes_full_sync_duration_seconds used by FullSyncDurationSeconds is
outdated and should be renamed to reflect the new RPC; update the HistogramOpts
Name to sync_desired_state_duration_seconds (and adjust Help text if desired) in
the FullSyncDurationSeconds declaration to match SyncDesiredState usage, and
note this is a breaking rename so update any dashboards/alerts that reference
the old metric name.

In `@svc/ctrl/services/cluster/rpc_watch_deployment_changes.go`:
- Around line 137-146: The current loadChangeEvent uses s.db.RW() for point
reads which shifts read load to primary; change the
FindDeploymentTopologyByDeploymentAndRegion call (and similar point-lookup calls
in loadChangeEvent) to use the read-replica connection s.db.RO() and implement a
safe fallback: if the RO query returns no row (or a not-found error) retry once
against s.db.RW() to handle replica lag. Keep the query function names
(db.Query.FindDeploymentTopologyByDeploymentAndRegion and other db.Query.* calls
in loadChangeEvent) and ensure the fallback only triggers on
not-found/replica-miss cases to avoid unnecessary primary load.

In `@svc/krane/pkg/metrics/prometheus.go`:
- Around line 114-122: The FullSyncDurationSeconds histogram created via
promauto.NewHistogram (prometheus.HistogramOpts) currently caps buckets at 60s;
extend the Buckets slice for better tail coverage by adding larger bucket values
(e.g., 120 and 300) so FullSyncDurationSeconds can capture long-running full
syncs and improve SLO signal.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro

Run ID: 61c411a3-86a5-4ec3-924e-0d06fc012675

📥 Commits

Reviewing files that changed from the base of the PR and between ea6f501 and eacd556.

📒 Files selected for processing (6)
  • svc/ctrl/pkg/metrics/prometheus.go
  • svc/ctrl/services/cluster/rpc_sync_desired_state.go
  • svc/ctrl/services/cluster/rpc_watch_deployment_changes.go
  • svc/krane/internal/watcher/BUILD.bazel
  • svc/krane/internal/watcher/watcher.go
  • svc/krane/pkg/metrics/prometheus.go
✅ Files skipped from review due to trivial changes (1)
  • svc/krane/internal/watcher/BUILD.bazel
🚧 Files skipped from review as they are similar to previous changes (1)
  • svc/krane/internal/watcher/watcher.go

@chronark chronark merged commit 0587420 into main Apr 7, 2026
14 checks passed
@chronark chronark deleted the 04-07-feat_speed_up_deployments branch April 7, 2026 19:20
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants