diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index 4199ed2338c..b2d181d28c1 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -4,6 +4,7 @@ import ( "context" "database/sql" "fmt" + "sync" "time" "github.com/blang/semver/v4" @@ -112,6 +113,7 @@ type QuerySamples struct { running *atomic.Bool ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup timerBookmark float64 lastUptime float64 @@ -158,16 +160,17 @@ func (c *QuerySamples) Start(ctx context.Context) error { // Start setup_consumers check goroutine if enabled if c.autoEnableSetupConsumers { + c.wg.Add(1) go c.runSetupConsumersCheck() } + c.wg.Add(1) go func() { - defer func() { - c.Stop() - c.running.Store(false) - }() + defer c.wg.Done() + defer c.running.Store(false) ticker := time.NewTicker(c.collectInterval) + defer ticker.Stop() for { if err := c.fetchQuerySamples(c.ctx); err != nil { @@ -192,11 +195,17 @@ func (c *QuerySamples) Stopped() bool { // Stop should be kept idempotent func (c *QuerySamples) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + c.wg.Wait() } func (c *QuerySamples) runSetupConsumersCheck() { + defer c.wg.Done() + ticker := time.NewTicker(c.setupConsumersCheckInterval) + defer ticker.Stop() for { if err := c.updateSetupConsumersSettings(c.ctx); err != nil { diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 59d0368be4b..ad57cc07d7a 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -5,6 +5,7 @@ import ( "database/sql" "fmt" "slices" + "sync" "time" "github.com/go-kit/log" @@ -123,6 +124,7 @@ type QuerySamples struct { running *atomic.Bool ctx context.Context cancel context.CancelFunc + wg sync.WaitGroup // in-memory state of running samples samples map[SampleKey]*SampleState @@ -249,13 +251,13 @@ func (c *QuerySamples) Start(ctx context.Context) error { c.ctx = ctx c.cancel = cancel + c.wg.Add(1) go func() { - defer func() { - c.Stop() - c.running.Store(false) - }() + defer c.wg.Done() + defer c.running.Store(false) ticker := time.NewTicker(c.collectInterval) + defer ticker.Stop() for { if err := c.fetchQuerySample(c.ctx); err != nil { @@ -280,7 +282,10 @@ func (c *QuerySamples) Stopped() bool { // Stop should be kept idempotent func (c *QuerySamples) Stop() { - c.cancel() + if c.cancel != nil { + c.cancel() + } + c.wg.Wait() } func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { @@ -396,7 +401,7 @@ func (c *QuerySamples) processRow(sample QuerySamplesInfo) (SampleKey, error) { return key, nil } -func (c QuerySamples) validateQuerySample(sample QuerySamplesInfo) error { +func (c *QuerySamples) validateQuerySample(sample QuerySamplesInfo) error { if c.disableQueryRedaction { if sample.Query.Valid && sample.Query.String == "" { return fmt.Errorf("insufficient privilege to access query sample set: %+v", sample)