[management] aggregate grpc metrics by accountID#5486
Conversation
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughAdds AccountDurationAggregator (new telemetry component) and integrates it into GRPCMetrics to record per-account sync/login durations, compute per-account P95s from histograms, and periodically flush those P95s into per-account P95 histograms. Changes
Sequence DiagramsequenceDiagram
participant Client as gRPC Client
participant Metrics as GRPCMetrics
participant Agg as AccountDurationAggregator
participant Reader as ManualReader
participant P95H as P95 Histogram
Client->>Metrics: Request (sync/login)
Metrics->>Agg: Record(accountID, duration)
Agg->>Agg: create/update per-account histogram & record value
loop Every FlushInterval
Agg->>Reader: Collect()
Reader-->>Agg: histogram data points
Agg->>Agg: extract account_id, compute P95, prune stale
Agg->>P95H: Record computed P95 per account
end
Client->>Metrics: Shutdown
Metrics->>Agg: Shutdown()
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~20 minutes Possibly related PRs
Suggested reviewers
🚥 Pre-merge checks | ✅ 2 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 133-139: The p95 target-rank calculation using
uint64(float64(dp.Count) * 0.95) can truncate to 0 for small dp.Count (e.g., 1);
change calculation of targetCount in the logic around dp.Count/targetCount so
you compute targetRank = uint64(math.Ceil(0.95 * float64(dp.Count))) and then
enforce at least 1 (if targetRank == 0 { targetRank = 1 }) before using it in
the cumulative loop over dp.BucketCounts and dp.Bounds; also add import "math".
- Around line 108-113: The current logic only applies MaxAge eviction when
accHist exists but still proceeds to append P95 for non-tracked accounts,
allowing evicted accounts to be re-emitted; update the control flow around
a.accounts lookup (the accHist/exists check) so that if an accountID is not
present in a.accounts you skip any P95 emission (continue), and if it is present
you still apply the now.Sub(accHist.lastUpdate) > a.MaxAge staleness check and
delete+continue on staleness; locate references to a.accounts, accHist,
accountID and the P95 emission path in account_aggregator.go and ensure P95 is
only appended for tracked, non-stale accHist entries.
ℹ️ Review info
Configuration used: defaults
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
management/server/telemetry/account_aggregator.gomanagement/server/telemetry/grpc_metrics.go
There was a problem hiding this comment.
♻️ Duplicate comments (2)
management/server/telemetry/account_aggregator.go (2)
150-156:⚠️ Potential issue | 🟠 MajorUse ceil-based rank for P95 target count.
Line 150 truncates the rank. For small samples (
dp.Count == 1), this yields0, which can select the wrong bucket.🔧 Proposed fix
import ( "context" + "math" "sync" "time" @@ - targetCount := uint64(float64(dp.Count) * 0.95) + targetCount := uint64(math.Ceil(float64(dp.Count) * 0.95)) + if targetCount == 0 { + targetCount = 1 + } var cumulativeCount uint64🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 150 - 156, The P95 rank calculation currently truncates the target rank by using uint64(float64(dp.Count) * 0.95), which yields 0 for small samples; change the computation of targetCount to use a ceiling-based rank (e.g., uint64(math.Ceil(float64(dp.Count) * 0.95))) so the rank for dp.Count==1 becomes 1, then keep the existing cumulative loop over dp.BucketCounts and dp.Bounds to find the bucket; update imports to include math if necessary and ensure targetCount is at least 1 when dp.Count > 0.
115-121:⚠️ Potential issue | 🟠 MajorSkip P95 emission for non-tracked accounts.
Line 139 returns
falsewhen the account is missing, so the flow can still append a P95 at Line 120 for accounts already evicted froma.accounts.🔧 Proposed fix
func (a *AccountDurationAggregator) processDataPoint(dataPoint metricdata.HistogramDataPoint[int64], now time.Time, p95s *[]int64) { accountID := extractAccountID(dataPoint) if accountID == "" { return } - if a.isStaleAccount(accountID, now) { + accHist, exists := a.accounts[accountID] + if !exists { + return + } + if now.Sub(accHist.lastUpdate) > a.MaxAge { delete(a.accounts, accountID) return } if p95 := calculateP95FromHistogram(dataPoint); p95 > 0 { *p95s = append(*p95s, p95) } } // isStaleAccount checks if an account hasn't been updated recently func (a *AccountDurationAggregator) isStaleAccount(accountID string, now time.Time) bool { accHist, exists := a.accounts[accountID] if !exists { - return false + return true } return now.Sub(accHist.lastUpdate) > a.MaxAge }Also applies to: 136-140
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 115 - 121, The code may append P95 for accounts that are no longer tracked because isStaleAccount can return false for a missing account; fix by explicitly checking that the account is present in a.accounts before calculating/appending p95. In the method containing the snippet, after the stale check and delete, add a presence guard like "if _, ok := a.accounts[accountID]; !ok { return }" (or call a dedicated isTracked method) and apply the same presence guard around the other similar blocks (uses of calculateP95FromHistogram, p95s append) so P95 is only emitted for currently tracked accounts.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 150-156: The P95 rank calculation currently truncates the target
rank by using uint64(float64(dp.Count) * 0.95), which yields 0 for small
samples; change the computation of targetCount to use a ceiling-based rank
(e.g., uint64(math.Ceil(float64(dp.Count) * 0.95))) so the rank for dp.Count==1
becomes 1, then keep the existing cumulative loop over dp.BucketCounts and
dp.Bounds to find the bucket; update imports to include math if necessary and
ensure targetCount is at least 1 when dp.Count > 0.
- Around line 115-121: The code may append P95 for accounts that are no longer
tracked because isStaleAccount can return false for a missing account; fix by
explicitly checking that the account is present in a.accounts before
calculating/appending p95. In the method containing the snippet, after the stale
check and delete, add a presence guard like "if _, ok := a.accounts[accountID];
!ok { return }" (or call a dedicated isTracked method) and apply the same
presence guard around the other similar blocks (uses of
calculateP95FromHistogram, p95s append) so P95 is only emitted for currently
tracked accounts.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
management/server/telemetry/account_aggregator.go (1)
137-141:⚠️ Potential issue | 🟠 MajorNon-tracked accounts can still emit P95 values.
isStaleAccountreturnsfalsewhen an account is missing (Line 139-140), soprocessDataPointcan still append a P95 for accounts already removed froma.accounts.🔧 Suggested fix
func (a *AccountDurationAggregator) isStaleAccount(accountID string, now time.Time) bool { accHist, exists := a.accounts[accountID] if !exists { - return false + return true } return now.Sub(accHist.lastUpdate) > a.MaxAge }Also applies to: 116-123
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 137 - 141, The isStaleAccount method on AccountDurationAggregator currently returns false when an accountID is not found, allowing processDataPoint to treat non-tracked accounts as active; change isStaleAccount(accountID string, now time.Time) to return true when the account is missing (i.e., if !exists { return true }) so missing/removed accounts are considered stale and won’t have P95 values appended; update any equivalent missing-account checks (the duplicate block around lines 116-123) to the same behavior to prevent non-tracked accounts emitting metrics.
🧹 Nitpick comments (2)
management/server/telemetry/account_aggregator.go (2)
64-66: Telemetry failures are silently dropped.On Line 64-66 and Line 85-88, errors are swallowed (
return/return nil) with no visibility. This makes production observability failures hard to detect.Consider returning errors from
Record/FlushAndGetP95s, or at minimum logging and incrementing an internal error metric.Also applies to: 85-88
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 64 - 66, The Record and FlushAndGetP95s callers currently swallow errors (e.g., the early `if err != nil { return }` and `return nil` paths), so modify these functions to surface failures: change signatures to return error (e.g., func (a *AccountAggregator) Record(...) error and func (a *AccountAggregator) FlushAndGetP95s(...) (map[string]float64, error)) or, if changing signatures is too invasive, at minimum log the error via the existing logger and increment an internal telemetry error metric before returning; update the error-handling blocks in Record (the `if err != nil` path) and in FlushAndGetP95s (the `return nil` path) to either return the error to the caller or log + increment metric so telemetry failures are not silently dropped.
61-63: Make histogram name configurable instead of sync-specific.Line 61 hardcodes
"sync_duration_per_account"inside a genericAccountDurationAggregator. Since this aggregator is also reused for login durations, this name is misleading and harder to operate/debug.♻️ Suggested refactor
type AccountDurationAggregator struct { mu sync.RWMutex accounts map[string]*accountHistogram meterProvider *sdkmetric.MeterProvider manualReader *sdkmetric.ManualReader + metricName string @@ -func NewAccountDurationAggregator(ctx context.Context, flushInterval, maxAge time.Duration) *AccountDurationAggregator { +func NewAccountDurationAggregator(ctx context.Context, flushInterval, maxAge time.Duration, metricName string) *AccountDurationAggregator { @@ return &AccountDurationAggregator{ accounts: make(map[string]*accountHistogram), meterProvider: meterProvider, manualReader: manualReader, + metricName: metricName, @@ - histogram, err := meter.Int64Histogram( - "sync_duration_per_account", + histogram, err := meter.Int64Histogram( + a.metricName, metric.WithUnit("milliseconds"), )Also applies to: 34-49
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 61 - 63, The histogram name is hardcoded ("sync_duration_per_account") inside AccountDurationAggregator; change the constructor/factory (e.g., NewAccountDurationAggregator or AccountDurationAggregator type) to accept the metric name as a parameter (or an options struct) and use that value when creating the metric instead of the hardcoded string, then update all call sites (including the login-duration usage) to pass an appropriate name (e.g., "login_duration_per_account" or "sync_duration_per_account") so the aggregator can be reused with a configurable metric name; ensure tests and initialization paths are updated accordingly.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 137-141: The isStaleAccount method on AccountDurationAggregator
currently returns false when an accountID is not found, allowing
processDataPoint to treat non-tracked accounts as active; change
isStaleAccount(accountID string, now time.Time) to return true when the account
is missing (i.e., if !exists { return true }) so missing/removed accounts are
considered stale and won’t have P95 values appended; update any equivalent
missing-account checks (the duplicate block around lines 116-123) to the same
behavior to prevent non-tracked accounts emitting metrics.
---
Nitpick comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 64-66: The Record and FlushAndGetP95s callers currently swallow
errors (e.g., the early `if err != nil { return }` and `return nil` paths), so
modify these functions to surface failures: change signatures to return error
(e.g., func (a *AccountAggregator) Record(...) error and func (a
*AccountAggregator) FlushAndGetP95s(...) (map[string]float64, error)) or, if
changing signatures is too invasive, at minimum log the error via the existing
logger and increment an internal telemetry error metric before returning; update
the error-handling blocks in Record (the `if err != nil` path) and in
FlushAndGetP95s (the `return nil` path) to either return the error to the caller
or log + increment metric so telemetry failures are not silently dropped.
- Around line 61-63: The histogram name is hardcoded
("sync_duration_per_account") inside AccountDurationAggregator; change the
constructor/factory (e.g., NewAccountDurationAggregator or
AccountDurationAggregator type) to accept the metric name as a parameter (or an
options struct) and use that value when creating the metric instead of the
hardcoded string, then update all call sites (including the login-duration
usage) to pass an appropriate name (e.g., "login_duration_per_account" or
"sync_duration_per_account") so the aggregator can be reused with a configurable
metric name; ensure tests and initialization paths are updated accordingly.
There was a problem hiding this comment.
Actionable comments posted: 1
♻️ Duplicate comments (1)
management/server/telemetry/account_aggregator.go (1)
140-144:⚠️ Potential issue | 🟠 MajorSkip emission for non-tracked accounts before stale checks.
Line 142 currently treats missing accounts as “not stale,” which allows
processDataPointto still append a P95 for entries no longer tracked ina.accounts.🔧 Proposed fix
func (a *AccountDurationAggregator) processDataPoint(dataPoint metricdata.HistogramDataPoint[int64], now time.Time, p95s *[]int64) { accountID := extractAccountID(dataPoint) if accountID == "" { return } + if _, exists := a.accounts[accountID]; !exists { + return + } if a.isStaleAccount(accountID, now) { delete(a.accounts, accountID) return }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 140 - 144, The isStaleAccount method on AccountDurationAggregator currently returns false for missing accounts which allows processDataPoint to emit metrics for accounts no longer tracked; change isStaleAccount(accountID string, now time.Time) to treat a non-existent entry in a.accounts as stale (return true) so processDataPoint will skip appending P95 for non-tracked accounts; ensure the logic in AccountDurationAggregator.isStaleAccount and any callers like processDataPoint rely on this boolean to prevent emission.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Inline comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 93-107: The staleness eviction currently only runs inside
processDataPoint while iterating incoming datapoints, so accounts that emit no
datapoints are never rechecked; add a separate pass after the datapoint loop
that iterates over a.accounts and calls the same staleness logic (or
a.processDataPoint-equivalent check) using now to remove stale entries, ensuring
accounts are evicted even if they had no datapoints this cycle (refer to
a.accounts and processDataPoint/p95s to locate the relevant state and logic).
---
Duplicate comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 140-144: The isStaleAccount method on AccountDurationAggregator
currently returns false for missing accounts which allows processDataPoint to
emit metrics for accounts no longer tracked; change isStaleAccount(accountID
string, now time.Time) to treat a non-existent entry in a.accounts as stale
(return true) so processDataPoint will skip appending P95 for non-tracked
accounts; ensure the logic in AccountDurationAggregator.isStaleAccount and any
callers like processDataPoint rely on this boolean to prevent emission.
There was a problem hiding this comment.
♻️ Duplicate comments (1)
management/server/telemetry/account_aggregator.go (1)
115-123:⚠️ Potential issue | 🟠 MajorGate P95 emission by tracked + non-stale account state.
At Line 121, P95 is appended before validating whether the
accountIDis still tracked and fresh. Because cleanup runs later, stale/non-tracked accounts can still leak one emission in this cycle.🔧 Proposed fix
func (a *AccountDurationAggregator) processDataPoint(dataPoint metricdata.HistogramDataPoint[int64], now time.Time, p95s *[]int64) { accountID := extractAccountID(dataPoint) if accountID == "" { return } + + accHist, exists := a.accounts[accountID] + if !exists { + return + } + if now.Sub(accHist.lastUpdate) > a.MaxAge { + delete(a.accounts, accountID) + return + } if p95 := calculateP95FromHistogram(dataPoint); p95 > 0 { *p95s = append(*p95s, p95) } }🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed. In `@management/server/telemetry/account_aggregator.go` around lines 115 - 123, In AccountDurationAggregator.processDataPoint, don't append the computed p95 to p95s until you've verified the accountID is currently tracked and not stale; after calling calculateP95FromHistogram(dataPoint) and before "*p95s = append(...)", check the aggregator's tracked-account state (e.g., look up a.trackedAccounts[accountID] or call a.isAccountTracked/isAccountFresh) and only append when that check passes (or add a small helper like isTrackedAndFresh if none exists); this prevents emitting a P95 for accounts that will be cleaned up later in the cleanup pass.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.
Duplicate comments:
In `@management/server/telemetry/account_aggregator.go`:
- Around line 115-123: In AccountDurationAggregator.processDataPoint, don't
append the computed p95 to p95s until you've verified the accountID is currently
tracked and not stale; after calling calculateP95FromHistogram(dataPoint) and
before "*p95s = append(...)", check the aggregator's tracked-account state
(e.g., look up a.trackedAccounts[accountID] or call
a.isAccountTracked/isAccountFresh) and only append when that check passes (or
add a small helper like isTrackedAndFresh if none exists); this prevents
emitting a P95 for accounts that will be cleaned up later in the cleanup pass.
ℹ️ Review info
⚙️ Run configuration
Configuration used: defaults
Review profile: CHILL
Plan: Pro
Run ID: 80d84bdd-4e32-4e3f-adac-8bb97aea3aa9
📒 Files selected for processing (1)
management/server/telemetry/account_aggregator.go
|



Describe your changes
Issue ticket number and link
Stack
Checklist
Documentation
Select exactly one:
Docs PR URL (required if "docs added" is checked)
Paste the PR link from https://github.com/netbirdio/docs here:
https://github.com/netbirdio/docs/pull/__
Summary by CodeRabbit