Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 1 addition & 6 deletions experimental/stats/metricregistry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ func (s) TestNumerousIntCounts(t *testing.T) {
}

type fakeMetricsRecorder struct {
UnimplementedMetricsRecorder
t *testing.T

intValues map[*MetricDescriptor]int64
Expand Down Expand Up @@ -315,9 +316,3 @@ func (r *fakeMetricsRecorder) RecordInt64AsyncGauge(handle *Int64AsyncGaugeHandl
// the current state of the world every cycle, they do not accumulate deltas.
r.intValues[handle.Descriptor()] = val
}

// RegisterAsyncReporter is noop implementation, this might be changed at a
// later stage.
func (r *fakeMetricsRecorder) RegisterAsyncReporter(AsyncMetricReporter, ...AsyncMetric) func() {
return func() {}
}
41 changes: 40 additions & 1 deletion experimental/stats/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,13 @@
// Package stats contains experimental metrics/stats API's.
package stats

import "google.golang.org/grpc/stats"
import (
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
)

// MetricsRecorder records on metrics derived from metric registry.
// Implementors must embed UnimplementedMetricsRecorder.
type MetricsRecorder interface {
// RecordInt64Count records the measurement alongside labels on the int
// count associated with the provided handle.
Expand All @@ -46,6 +50,11 @@ type MetricsRecorder interface {
// the metrics are no longer needed, which will remove the reporter. The
// returned method needs to be idempotent and concurrent safe.
RegisterAsyncReporter(reporter AsyncMetricReporter, descriptors ...AsyncMetric) func()

// EnforceMetricsRecorderEmbedding is included to force implementers to embed
// another implementation of this interface, allowing gRPC to add methods
// without breaking users.
internal.EnforceMetricsRecorderEmbedding
}

// AsyncMetricReporter is an interface for types that record metrics asynchronously
Expand Down Expand Up @@ -90,3 +99,33 @@ type Metric = string
func NewMetrics(metrics ...Metric) *Metrics {
return stats.NewMetricSet(metrics...)
}

// UnimplementedMetricsRecorder must be embedded to have forward compatible implementations.
type UnimplementedMetricsRecorder struct {
internal.EnforceMetricsRecorderEmbedding
}

// RecordInt64Count provides a no-op implementation.
func (UnimplementedMetricsRecorder) RecordInt64Count(*Int64CountHandle, int64, ...string) {}

// RecordFloat64Count provides a no-op implementation.
func (UnimplementedMetricsRecorder) RecordFloat64Count(*Float64CountHandle, float64, ...string) {}

// RecordInt64Histo provides a no-op implementation.
func (UnimplementedMetricsRecorder) RecordInt64Histo(*Int64HistoHandle, int64, ...string) {}

// RecordFloat64Histo provides a no-op implementation.
func (UnimplementedMetricsRecorder) RecordFloat64Histo(*Float64HistoHandle, float64, ...string) {}

// RecordInt64Gauge provides a no-op implementation.
func (UnimplementedMetricsRecorder) RecordInt64Gauge(*Int64GaugeHandle, int64, ...string) {}

// RecordInt64UpDownCount provides a no-op implementation.
func (UnimplementedMetricsRecorder) RecordInt64UpDownCount(*Int64UpDownCountHandle, int64, ...string) {
}

// RegisterAsyncReporter provides a no-op implementation.
func (UnimplementedMetricsRecorder) RegisterAsyncReporter(AsyncMetricReporter, ...AsyncMetric) func() {
// No-op: Return an empty function to ensure caller doesn't panic on nil function call
return func() {}
}
5 changes: 5 additions & 0 deletions internal/grpctest/grpctest.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func (Tester) Setup(t *testing.T) {
// fixed.
leakcheck.SetTrackingBufferPool(logger{t: t})
leakcheck.TrackTimers()
leakcheck.TrackAsyncReporters()
}

// Teardown performs a leak check.
Expand All @@ -75,6 +76,10 @@ func (Tester) Teardown(t *testing.T) {
if atomic.LoadUint32(&lcFailed) == 1 {
t.Log("Goroutine leak check disabled for future tests")
}
leakcheck.CheckAsyncReporters(logger{t: t})
if atomic.LoadUint32(&lcFailed) == 1 {
return
}
tLogr.endTest(t)
}

Expand Down
14 changes: 14 additions & 0 deletions internal/internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,14 @@ var (
// AddressToTelemetryLabels is an xDS-provided function to extract telemetry
// labels from a resolver.Address. Callers must assert its type before calling.
AddressToTelemetryLabels any // func(addr resolver.Address) map[string]string

// AsyncReporterCleanupDelegate is initialized to a pass-through function by
// default (production behavior), allowing tests to swap it with an
// implementation which tracks registration of async reporter and its
// corresponding cleanup.
AsyncReporterCleanupDelegate = func(cleanup func()) func() {
return cleanup
}
)

// HealthChecker defines the signature of the client-side LB channel health
Expand Down Expand Up @@ -295,3 +303,9 @@ type EnforceClientConnEmbedding interface {
type Timer interface {
Stop() bool
}

// EnforceMetricsRecorderEmbedding is used to enforce proper MetricsRecorder
// implementation embedding.
type EnforceMetricsRecorderEmbedding interface {
enforceMetricsRecorderEmbedding()
}
93 changes: 93 additions & 0 deletions internal/leakcheck/leakcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -394,3 +394,96 @@ func traceToString(stack []uintptr) string {
}
return trace.String()
}

// Async Reporter Leak Checking

var asyncReporterTracker *reporterTracker

type reporterTracker struct {
mu sync.Mutex
allocations map[*int][]uintptr
}

func newReporterTracker() *reporterTracker {
return &reporterTracker{
allocations: make(map[*int][]uintptr),
}
}

// register records the stack trace.
func (rt *reporterTracker) register() *int {
rt.mu.Lock()
defer rt.mu.Unlock()

id := new(int)
// Skip 4 frames: register -> internal.Delegate -> stats.RegisterAsyncReporter -> Caller
rt.allocations[id] = currentStack(4)
return id
}

// unregister removes the ID.
func (rt *reporterTracker) unregister(id *int) {
rt.mu.Lock()
defer rt.mu.Unlock()
delete(rt.allocations, id)
}

// leakedStackTraces returns formatted stack traces for all currently registered
// reporters.
func (rt *reporterTracker) leakedStackTraces() []string {
rt.mu.Lock()
defer rt.mu.Unlock()

var traces []string
for _, pcs := range rt.allocations {
msg := "\n--- Leaked Async Reporter Registration ---\n" + traceToString(pcs)
traces = append(traces, msg)
}
return traces
}

// TrackAsyncReporters installs the tracking delegate.
func TrackAsyncReporters() {
asyncReporterTracker = newReporterTracker()

// Swap the delegate: Replace the default pass-through with tracking logic.
internal.AsyncReporterCleanupDelegate = func(originalCleanup func()) func() {
// 1. Capture Stack Trace (happens during Registration)
token := asyncReporterTracker.register()

// 2. Return Wrapped Cleanup
return func() {
// Defer unregister to ensure we stop tracking even if the original cleanup panics.
defer asyncReporterTracker.unregister(token)

if originalCleanup != nil {
originalCleanup()
}
}
}
}

// CheckAsyncReporters verifies that no leaks exist and restores the default delegate.
func CheckAsyncReporters(logger Logger) {
// Restore the delegate: Reset to the default pass-through behavior.
internal.AsyncReporterCleanupDelegate = func(cleanup func()) func() {
return cleanup
}

if asyncReporterTracker == nil {
return
}

leaks := asyncReporterTracker.leakedStackTraces()
if len(leaks) > 0 {
// Join all stack traces into one message
allTraces := ""
for _, trace := range leaks {
allTraces += trace
}
logger.Errorf("Found %d leaked async reporters:%s", len(leaks), allTraces)
}

// Clean up global state
asyncReporterTracker = nil
}
55 changes: 55 additions & 0 deletions internal/leakcheck/leakcheck_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,3 +132,58 @@ func TestTrackTimers(t *testing.T) {
defer cancel()
CheckTimers(ctx, t)
}

func TestLeakChecker_DetectsLeak(t *testing.T) {
// 1. Setup the tracker (swaps the delegate in internal).
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Please avoid putting numbers in the comments. The step numbers may go out of sync with the code when it's changed.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will be doing this in follow up PR.

TrackAsyncReporters()

// Safety defer: ensure we restore the default delegate even if the test crashes
// before CheckAsyncReporters is called.
defer func() {
internal.AsyncReporterCleanupDelegate = func(f func()) func() { return f }
}()

// 2. Simulate a registration using the swapped delegate.
// We utilize the internal delegate directly to simulate stats.RegisterAsyncReporter behavior.
noOpCleanup := func() {}
wrappedCleanup := internal.AsyncReporterCleanupDelegate(noOpCleanup)

// 3. Create a leak: We discard 'wrappedCleanup' without calling it.
_ = wrappedCleanup

// 4. Check for leaks.
tl := &testLogger{}
CheckAsyncReporters(tl)

// 5. Assertions.
if tl.errorCount == 0 {
t.Error("Expected leak checker to report a leak, but it succeeded silently.")
}
if asyncReporterTracker != nil {
t.Error("Expected CheckAsyncReporters to cleanup global tracker, but it was not nil.")
}
}

func TestLeakChecker_PassesOnCleanup(t *testing.T) {
// 1. Setup.
TrackAsyncReporters()
defer func() {
internal.AsyncReporterCleanupDelegate = func(f func()) func() { return f }
}()

// 2. Simulate registration.
noOpCleanup := func() {}
wrappedCleanup := internal.AsyncReporterCleanupDelegate(noOpCleanup)

// 3. Behave correctly: Call the cleanup.
wrappedCleanup()

// 4. Check for leaks.
tl := &testLogger{}
CheckAsyncReporters(tl)

// 5. Assertions.
if tl.errorCount > 0 {
t.Errorf("Expected no leaks, but got errors: %v", tl.errors)
}
}
10 changes: 10 additions & 0 deletions internal/stats/metrics_recorder_list.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"

estats "google.golang.org/grpc/experimental/stats"
"google.golang.org/grpc/internal"
"google.golang.org/grpc/stats"
)

Expand All @@ -28,6 +29,7 @@ import (
// It eats any record calls where the label values provided do not match the
// number of label keys.
type MetricsRecorderList struct {
internal.EnforceMetricsRecorderEmbedding
// metricsRecorders are the metrics recorders this list will forward to.
metricsRecorders []estats.MetricsRecorder
}
Expand Down Expand Up @@ -138,6 +140,14 @@ func (l *MetricsRecorderList) RegisterAsyncReporter(reporter estats.AsyncMetricR
}
unregisterFns = append(unregisterFns, mr.RegisterAsyncReporter(estats.AsyncMetricReporterFunc(wrappedCallback), metrics...))
}

// Wrap the cleanup function using the internal delegate.
// In production, this returns realCleanup as-is.
// In tests, the leak checker can swap this to track the registration lifetime.
return internal.AsyncReporterCleanupDelegate(defaultCleanUp(unregisterFns))
}

func defaultCleanUp(unregisterFns []func()) func() {
return func() {
for _, unregister := range unregisterFns {
unregister()
Expand Down
2 changes: 1 addition & 1 deletion internal/stats/metrics_recorder_list_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ func (s) TestMetricRecorderListPanic(t *testing.T) {
// TestMetricsRecorderList_RegisterAsyncReporter verifies that the list implementation
// correctly fans out registration calls to all underlying recorders and
// aggregates the cleanup calls.
func TestMetricsRecorderList_RegisterAsyncReporter(t *testing.T) {
func (s) TestMetricsRecorderList_RegisterAsyncReporter(t *testing.T) {
spy1 := &spyMetricsRecorder{name: "spy1"}
spy2 := &spyMetricsRecorder{name: "spy2"}
spy3 := &spyMetricsRecorder{name: "spy3"}
Expand Down
33 changes: 3 additions & 30 deletions internal/testutils/stats/test_metrics_recorder.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
// have taken place. It also persists metrics data keyed on the metrics
// descriptor.
type TestMetricsRecorder struct {
estats.UnimplementedMetricsRecorder
intCountCh *testutils.Channel
floatCountCh *testutils.Channel
intHistoCh *testutils.Channel
Expand Down Expand Up @@ -276,12 +277,6 @@ func (r *TestMetricsRecorder) RecordInt64Gauge(handle *estats.Int64GaugeHandle,
r.data[handle.Name] = float64(incr)
}

// RegisterAsyncReporter is noop implementation, async gauge test recorders should
// provide their own implementation
func (r *TestMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
return func() {}
}

// To implement a stats.Handler, which allows it to be set as a dial option:

// TagRPC is TestMetricsRecorder's implementation of TagRPC.
Expand All @@ -302,28 +297,6 @@ func (r *TestMetricsRecorder) HandleConn(context.Context, stats.ConnStats) {}

// NoopMetricsRecorder is a noop MetricsRecorder to be used in tests to prevent
// nil panics.
type NoopMetricsRecorder struct{}

// RecordInt64Count is a noop implementation of RecordInt64Count.
func (r *NoopMetricsRecorder) RecordInt64Count(*estats.Int64CountHandle, int64, ...string) {}

// RecordFloat64Count is a noop implementation of RecordFloat64Count.
func (r *NoopMetricsRecorder) RecordFloat64Count(*estats.Float64CountHandle, float64, ...string) {}

// RecordInt64Histo is a noop implementation of RecordInt64Histo.
func (r *NoopMetricsRecorder) RecordInt64Histo(*estats.Int64HistoHandle, int64, ...string) {}

// RecordFloat64Histo is a noop implementation of RecordFloat64Histo.
func (r *NoopMetricsRecorder) RecordFloat64Histo(*estats.Float64HistoHandle, float64, ...string) {}

// RecordInt64Gauge is a noop implementation of RecordInt64Gauge.
func (r *NoopMetricsRecorder) RecordInt64Gauge(*estats.Int64GaugeHandle, int64, ...string) {}

// RecordInt64UpDownCount is a noop implementation of RecordInt64UpDownCount.
func (r *NoopMetricsRecorder) RecordInt64UpDownCount(*estats.Int64UpDownCountHandle, int64, ...string) {
}

// RegisterAsyncReporter is a noop implementation of RegisterAsyncReporter.
func (r *NoopMetricsRecorder) RegisterAsyncReporter(estats.AsyncMetricReporter, ...estats.AsyncMetric) func() {
return func() {}
type NoopMetricsRecorder struct {
estats.UnimplementedMetricsRecorder
}
Loading