From 75ffb67d3332eb70eb76b1734359382d63de26d0 Mon Sep 17 00:00:00 2001 From: Gabriel Date: Fri, 20 Feb 2026 11:46:01 -0300 Subject: [PATCH] fix: prevent goroutine leak in query_samples collectors Add WaitGroup tracking to both mysql and postgres QuerySamples collectors so Stop() waits for all goroutines to finish before returning. Also adds nil-safety to cancel() and defers ticker.Stop() to properly clean up resources. Co-authored-by: Cursor --- .../mysql/collector/query_samples.go | 19 ++++++++++++++----- .../postgres/collector/query_samples.go | 17 +++++++++++------ 2 files changed, 25 insertions(+), 11 deletions(-) diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index 25f83361bd6..81e54d6dbf9 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "sync" "time" "github.com/blang/semver/v4" @@ -107,6 +108,7 @@ type QuerySamples struct { running *atomic.Bool ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup timerBookmark float64 lastUptime float64 @@ -151,16 +153,17 @@ func (c *QuerySamples) Start(ctx context.Context) error { // Start setup_consumers check goroutine if enabled if c.autoEnableSetupConsumers { + c.wg.Add(1) go c.runSetupConsumersCheck() } + c.wg.Add(1) go func() { - defer func() { - c.Stop() - c.running.Store(false) - }() + defer c.wg.Done() + defer c.running.Store(false) ticker := time.NewTicker(c.collectInterval) + defer ticker.Stop() for { if err := c.fetchQuerySamples(c.ctx); err != nil { @@ -185,11 +188,17 @@ func (c *QuerySamples) Stopped() bool { // Stop should be kept idempotent func (c *QuerySamples) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + c.wg.Wait() } func (c *QuerySamples) runSetupConsumersCheck() { + defer c.wg.Done() + ticker := time.NewTicker(c.setupConsumersCheckInterval) + defer ticker.Stop() for { if err := c.updateSetupConsumersSettings(c.ctx); err != nil { diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 59d0368be4b..ad57cc07d7a 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "slices" + "sync" "time" "github.com/go-kit/log" @@ -123,6 +124,7 @@ type QuerySamples struct { running *atomic.Bool ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup // in-memory state of running samples samples map[SampleKey]*SampleState @@ -249,13 +251,13 @@ func (c *QuerySamples) Start(ctx context.Context) error { c.ctx = ctx c.cancel = cancel + c.wg.Add(1) go func() { - defer func() { - c.Stop() - c.running.Store(false) - }() + defer c.wg.Done() + defer c.running.Store(false) ticker := time.NewTicker(c.collectInterval) + defer ticker.Stop() for { if err := c.fetchQuerySample(c.ctx); err != nil { @@ -280,7 +282,10 @@ func (c *QuerySamples) Stopped() bool { // Stop should be kept idempotent func (c *QuerySamples) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + c.wg.Wait() } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { @@ -396,7 +401,7 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, error) { return key, nil } -func (c QuerySamples) validateQuerySample(sample QuerySamplesInfo) error { +func (c *QuerySamples) validateQuerySample(sample QuerySamplesInfo) error { if c.disableQueryRedaction { if sample.Query.Valid && sample.Query.String == "" { return fmt.Errorf("insufficient privilege to access query sample set: %+v", sample)