diff --git a/internal/component/database_observability/connection_info_ping.go b/internal/component/database_observability/connection_info_ping.go new file mode 100644 index 00000000000..41ae08d1827 --- /dev/null +++ b/internal/component/database_observability/connection_info_ping.go @@ -0,0 +1,69 @@ +package database_observability + +import ( + "context" + "database/sql" +) + +// ConnectionInfoPingThreshold is the number of consecutive ping failures before +// the connection_info metric is unregistered, and the number of consecutive +// ping successes before it is re-registered. +const ConnectionInfoPingThreshold = 3 + +// ConnectionInfoToggler is implemented by the ConnectionInfo collector in each +// database engine package. It allows the component to toggle metric registration +// without importing a concrete collector type. +type ConnectionInfoToggler interface { + IsRegistered() bool + Unregister() + Reregister() +} + +// CIPingState tracks consecutive ping results for the connection_info metric +// toggle. It is intended to be goroutine-local (owned by Run()'s ticker loop) +// and requires no external locking. +type CIPingState struct { + failures int + successes int + lastCI ConnectionInfoToggler +} + +// PingConnectionInfo pings db and toggles the connection_info metric via +// toggler based on consecutive failure or success counts in state. It should +// be called once per ticker tick from the component's Run() loop. +// +// After ConnectionInfoPingThreshold consecutive failures, toggler.Unregister() +// is called. After ConnectionInfoPingThreshold consecutive successes (while +// unregistered), toggler.Reregister() is called. When toggler changes (i.e. +// the component reconnected and created a new collector), state resets. +func PingConnectionInfo(ctx context.Context, db *sql.DB, toggler ConnectionInfoToggler, state *CIPingState) { + if toggler != state.lastCI { + state.failures = 0 + state.successes = 0 + state.lastCI = toggler + } + + if db == nil || toggler == nil { + return + } + + if err := db.PingContext(ctx); err != nil { + state.successes = 0 + if toggler.IsRegistered() { + state.failures++ + if state.failures >= ConnectionInfoPingThreshold { + toggler.Unregister() + state.failures = 0 + } + } + } else { + state.failures = 0 + if !toggler.IsRegistered() { + state.successes++ + if state.successes >= ConnectionInfoPingThreshold { + toggler.Reregister() + state.successes = 0 + } + } + } +} diff --git a/internal/component/database_observability/connection_info_ping_test.go b/internal/component/database_observability/connection_info_ping_test.go new file mode 100644 index 00000000000..21e811cea69 --- /dev/null +++ b/internal/component/database_observability/connection_info_ping_test.go @@ -0,0 +1,112 @@ +package database_observability + +import ( + "context" + "errors" + "testing" + + sqlmock "github.com/DATA-DOG/go-sqlmock" + "github.com/stretchr/testify/require" +) + +type mockToggler struct { + registered bool +} + +func (m *mockToggler) IsRegistered() bool { return m.registered } +func (m *mockToggler) Unregister() { m.registered = false } +func (m *mockToggler) Reregister() { m.registered = true } + +func TestPingConnectionInfo_UnregistersAfterThresholdFailures(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + pingErr := errors.New("connection refused") + for i := 0; i < ConnectionInfoPingThreshold; i++ { + mock.ExpectPing().WillReturnError(pingErr) + } + + toggler := &mockToggler{registered: true} + state := &CIPingState{} + + for i := 0; i < ConnectionInfoPingThreshold; i++ { + PingConnectionInfo(context.Background(), db, toggler, state) + } + + require.False(t, toggler.IsRegistered(), "metric should be unregistered after %d consecutive failures", ConnectionInfoPingThreshold) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestPingConnectionInfo_ReregistersAfterThresholdSuccesses(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + pingErr := errors.New("connection refused") + for i := 0; i < ConnectionInfoPingThreshold; i++ { + mock.ExpectPing().WillReturnError(pingErr) + } + for i := 0; i < ConnectionInfoPingThreshold; i++ { + mock.ExpectPing() + } + + toggler := &mockToggler{registered: true} + state := &CIPingState{} + + for i := 0; i < ConnectionInfoPingThreshold*2; i++ { + PingConnectionInfo(context.Background(), db, toggler, state) + } + + require.True(t, toggler.IsRegistered(), "metric should be re-registered after %d consecutive successes", ConnectionInfoPingThreshold) + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestPingConnectionInfo_RemainsRegisteredWhilePingsSucceed(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + const pings = 5 + for i := 0; i < pings; i++ { + mock.ExpectPing() + } + + toggler := &mockToggler{registered: true} + state := &CIPingState{} + + for i := 0; i < pings; i++ { + PingConnectionInfo(context.Background(), db, toggler, state) + } + + require.True(t, toggler.IsRegistered(), "metric should remain registered while pings succeed") + require.NoError(t, mock.ExpectationsWereMet()) +} + +func TestPingConnectionInfo_ResetsStateWhenTogglerChanges(t *testing.T) { + db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true)) + require.NoError(t, err) + defer db.Close() + + pingErr := errors.New("connection refused") + for i := 0; i < ConnectionInfoPingThreshold-1; i++ { + mock.ExpectPing().WillReturnError(pingErr) + } + mock.ExpectPing() // first ping with new toggler + + toggler1 := &mockToggler{registered: true} + state := &CIPingState{} + + for i := 0; i < ConnectionInfoPingThreshold-1; i++ { + PingConnectionInfo(context.Background(), db, toggler1, state) + } + require.True(t, toggler1.IsRegistered(), "should not have unregistered yet") + require.Equal(t, ConnectionInfoPingThreshold-1, state.failures, "failures should have accumulated") + + toggler2 := &mockToggler{registered: true} + PingConnectionInfo(context.Background(), db, toggler2, state) + + require.Equal(t, 0, state.failures, "failures should reset when toggler changes") + require.True(t, toggler1.IsRegistered(), "old toggler should be unaffected") + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/component/database_observability/mysql/collector/connection_info.go b/internal/component/database_observability/mysql/collector/connection_info.go index d28e2b39050..c05e5233468 100644 --- a/internal/component/database_observability/mysql/collector/connection_info.go +++ b/internal/component/database_observability/mysql/collector/connection_info.go @@ -4,6 +4,7 @@ import ( "context" "net" "strings" + "sync" "github.com/go-sql-driver/mysql" "github.com/grafana/alloy/internal/component/database_observability" @@ -27,7 +28,10 @@ type ConnectionInfo struct { InfoMetric *prometheus.GaugeVec CloudProvider *database_observability.CloudProvider - running *atomic.Bool + mu sync.Mutex + metricRegistered bool + labelValues []string + running *atomic.Bool } func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { @@ -40,12 +44,13 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { args.Registry.MustRegister(infoMetric) return &ConnectionInfo{ - DSN: args.DSN, - Registry: args.Registry, - EngineVersion: args.EngineVersion, - InfoMetric: infoMetric, - CloudProvider: args.CloudProvider, - running: &atomic.Bool{}, + DSN: args.DSN, + Registry: args.Registry, + EngineVersion: args.EngineVersion, + InfoMetric: infoMetric, + CloudProvider: args.CloudProvider, + metricRegistered: true, + running: &atomic.Bool{}, }, nil } @@ -103,17 +108,59 @@ func (c *ConnectionInfo) Start(ctx context.Context) error { } } } + + c.labelValues = []string{providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, c.EngineVersion} + c.InfoMetric.WithLabelValues(c.labelValues...).Set(1) c.running.Store(true) - c.InfoMetric.WithLabelValues(providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, c.EngineVersion).Set(1) return nil } +// IsRegistered reports whether the connection_info metric is currently registered +// in the Prometheus registry. +func (c *ConnectionInfo) IsRegistered() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.metricRegistered +} + +// Unregister removes the connection_info metric from the Prometheus registry. +// Called by the component when consecutive DB ping failures indicate the +// instance is unreachable. +func (c *ConnectionInfo) Unregister() { + c.mu.Lock() + defer c.mu.Unlock() + if c.metricRegistered { + c.Registry.Unregister(c.InfoMetric) + c.metricRegistered = false + } +} + +// Reregister adds the connection_info metric back to the Prometheus registry +// and restores its value with the label values captured during Start. +// Called by the component when the DB becomes reachable again. +func (c *ConnectionInfo) Reregister() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.metricRegistered { + c.Registry.MustRegister(c.InfoMetric) + if len(c.labelValues) > 0 { + c.InfoMetric.WithLabelValues(c.labelValues...).Set(1) + } + c.metricRegistered = true + } +} + func (c *ConnectionInfo) Stopped() bool { return !c.running.Load() } func (c *ConnectionInfo) Stop() { - c.Registry.Unregister(c.InfoMetric) + c.mu.Lock() + if c.metricRegistered { + c.Registry.Unregister(c.InfoMetric) + c.metricRegistered = false + } + c.mu.Unlock() c.running.Store(false) } diff --git a/internal/component/database_observability/mysql/collector/connection_info_test.go b/internal/component/database_observability/mysql/collector/connection_info_test.go index a3b91ec9958..eaf97ea4351 100644 --- a/internal/component/database_observability/mysql/collector/connection_info_test.go +++ b/internal/component/database_observability/mysql/collector/connection_info_test.go @@ -14,6 +14,57 @@ import ( "github.com/grafana/alloy/internal/component/database_observability" ) +func TestConnectionInfo_Unregister(t *testing.T) { + defer goleak.VerifyNone(t) + + reg := prometheus.NewRegistry() + c, err := NewConnectionInfo(ConnectionInfoArguments{ + DSN: "user:pass@tcp(localhost:3306)/schema", + Registry: reg, + EngineVersion: "8.0.32", + }) + require.NoError(t, err) + require.NoError(t, c.Start(t.Context())) + + mfs, err := reg.Gather() + require.NoError(t, err) + require.Len(t, mfs, 1, "metric should be present before Unregister") + + c.Unregister() + + mfs, err = reg.Gather() + require.NoError(t, err) + require.Empty(t, mfs, "metric should be absent after Unregister") + require.False(t, c.IsRegistered()) +} + +func TestConnectionInfo_Reregister(t *testing.T) { + defer goleak.VerifyNone(t) + + reg := prometheus.NewRegistry() + c, err := NewConnectionInfo(ConnectionInfoArguments{ + DSN: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/schema", + Registry: reg, + EngineVersion: "8.0.32", + }) + require.NoError(t, err) + require.NoError(t, c.Start(t.Context())) + + c.Unregister() + require.False(t, c.IsRegistered()) + + c.Reregister() + require.True(t, c.IsRegistered()) + + const expected = ` + # HELP database_observability_connection_info Information about the connection + # TYPE database_observability_connection_info gauge + database_observability_connection_info{db_instance_identifier="products-db",engine="mysql",engine_version="8.0.32",provider_account="unknown",provider_name="aws",provider_region="us-east-1"} 1 +` + err = testutil.GatherAndCompare(reg, strings.NewReader(expected)) + require.NoError(t, err, "metric should be restored with original label values after Reregister") +} + func TestConnectionInfo(t *testing.T) { defer goleak.VerifyNone(t) diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index d446fc47e70..f26ff086dc7 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -219,6 +219,7 @@ type Component struct { collectors []Collector instanceKey string dbConnection *sql.DB + ciCollector *collector.ConnectionInfo healthErr *atomic.String openSQL func(driverName, dataSourceName string) (*sql.DB, error) } @@ -277,6 +278,7 @@ func (c *Component) Run(ctx context.Context) error { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() + var ciState database_observability.CIPingState for { select { case <-ctx.Done(): @@ -287,10 +289,17 @@ func (c *Component) Run(ctx context.Context) error { c.mut.RUnlock() if !hasCollectors { + ciState = database_observability.CIPingState{} level.Debug(c.opts.Logger).Log("msg", "attempting to reconnect to database") if err := c.tryReconnect(ctx); err != nil { level.Error(c.opts.Logger).Log("msg", "reconnection attempt failed", "err", err) } + } else { + c.mut.RLock() + db := c.dbConnection + ci := c.ciCollector + c.mut.RUnlock() + database_observability.PingConnectionInfo(ctx, db, ci, &ciState) } } } @@ -645,6 +654,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse if err := ciCollector.Start(context.Background()); err != nil { logStartError(collector.ConnectionInfoName, "start", err) } + c.ciCollector = ciCollector c.collectors = append(c.collectors, ciCollector) } diff --git a/internal/component/database_observability/mysql/component_test.go b/internal/component/database_observability/mysql/component_test.go index 90d05e11cc5..d8bba76d9fc 100644 --- a/internal/component/database_observability/mysql/component_test.go +++ b/internal/component/database_observability/mysql/component_test.go @@ -557,3 +557,4 @@ func TestMySQL_Reconnection(t *testing.T) { } }) } + diff --git a/internal/component/database_observability/postgres/collector/connection_info.go b/internal/component/database_observability/postgres/collector/connection_info.go index b64fa3aafe0..e460a1cb905 100644 --- a/internal/component/database_observability/postgres/collector/connection_info.go +++ b/internal/component/database_observability/postgres/collector/connection_info.go @@ -4,6 +4,7 @@ import ( "context" "regexp" "strings" + "sync" "github.com/grafana/alloy/internal/component/database_observability" "github.com/prometheus/client_golang/prometheus" @@ -28,7 +29,10 @@ type ConnectionInfo struct { InfoMetric *prometheus.GaugeVec CloudProvider *database_observability.CloudProvider - running *atomic.Bool + mu sync.Mutex + metricRegistered bool + labelValues []string + running *atomic.Bool } func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { @@ -41,12 +45,13 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) { args.Registry.MustRegister(infoMetric) return &ConnectionInfo{ - DSN: args.DSN, - Registry: args.Registry, - EngineVersion: args.EngineVersion, - InfoMetric: infoMetric, - CloudProvider: args.CloudProvider, - running: &atomic.Bool{}, + DSN: args.DSN, + Registry: args.Registry, + EngineVersion: args.EngineVersion, + InfoMetric: infoMetric, + CloudProvider: args.CloudProvider, + metricRegistered: true, + running: &atomic.Bool{}, }, nil } @@ -109,17 +114,58 @@ func (c *ConnectionInfo) Start(ctx context.Context) error { engineVersion = matches[1] } + c.labelValues = []string{providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, engineVersion} + c.InfoMetric.WithLabelValues(c.labelValues...).Set(1) c.running.Store(true) - c.InfoMetric.WithLabelValues(providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, engineVersion).Set(1) return nil } +// IsRegistered reports whether the connection_info metric is currently registered +// in the Prometheus registry. +func (c *ConnectionInfo) IsRegistered() bool { + c.mu.Lock() + defer c.mu.Unlock() + return c.metricRegistered +} + +// Unregister removes the connection_info metric from the Prometheus registry. +// Called by the component when consecutive DB ping failures indicate the +// instance is unreachable. +func (c *ConnectionInfo) Unregister() { + c.mu.Lock() + defer c.mu.Unlock() + if c.metricRegistered { + c.Registry.Unregister(c.InfoMetric) + c.metricRegistered = false + } +} + +// Reregister adds the connection_info metric back to the Prometheus registry +// and restores its value with the label values captured during Start. +// Called by the component when the DB becomes reachable again. +func (c *ConnectionInfo) Reregister() { + c.mu.Lock() + defer c.mu.Unlock() + if !c.metricRegistered { + c.Registry.MustRegister(c.InfoMetric) + if len(c.labelValues) > 0 { + c.InfoMetric.WithLabelValues(c.labelValues...).Set(1) + } + c.metricRegistered = true + } +} + func (c *ConnectionInfo) Stopped() bool { return !c.running.Load() } func (c *ConnectionInfo) Stop() { - c.Registry.Unregister(c.InfoMetric) + c.mu.Lock() + if c.metricRegistered { + c.Registry.Unregister(c.InfoMetric) + c.metricRegistered = false + } + c.mu.Unlock() c.running.Store(false) } diff --git a/internal/component/database_observability/postgres/collector/connection_info_test.go b/internal/component/database_observability/postgres/collector/connection_info_test.go index 4cd76a02608..14dea0ac598 100644 --- a/internal/component/database_observability/postgres/collector/connection_info_test.go +++ b/internal/component/database_observability/postgres/collector/connection_info_test.go @@ -14,6 +14,57 @@ import ( "github.com/grafana/alloy/internal/component/database_observability" ) +func TestConnectionInfo_Unregister(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + reg := prometheus.NewRegistry() + c, err := NewConnectionInfo(ConnectionInfoArguments{ + DSN: "postgres://user:pass@localhost:5432/mydb", + Registry: reg, + EngineVersion: "15.4", + }) + require.NoError(t, err) + require.NoError(t, c.Start(t.Context())) + + mfs, err := reg.Gather() + require.NoError(t, err) + require.Len(t, mfs, 1, "metric should be present before Unregister") + + c.Unregister() + + mfs, err = reg.Gather() + require.NoError(t, err) + require.Empty(t, mfs, "metric should be absent after Unregister") + require.False(t, c.IsRegistered()) +} + +func TestConnectionInfo_Reregister(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + reg := prometheus.NewRegistry() + c, err := NewConnectionInfo(ConnectionInfoArguments{ + DSN: "postgres://user:pass@products-db.abc123xyz.us-east-1.rds.amazonaws.com:5432/mydb", + Registry: reg, + EngineVersion: "15.4", + }) + require.NoError(t, err) + require.NoError(t, c.Start(t.Context())) + + c.Unregister() + require.False(t, c.IsRegistered()) + + c.Reregister() + require.True(t, c.IsRegistered()) + + const expected = ` + # HELP database_observability_connection_info Information about the connection + # TYPE database_observability_connection_info gauge + database_observability_connection_info{db_instance_identifier="products-db",engine="postgres",engine_version="15.4",provider_account="unknown",provider_name="aws",provider_region="us-east-1"} 1 +` + err = testutil.GatherAndCompare(reg, strings.NewReader(expected)) + require.NoError(t, err, "metric should be restored with original label values after Reregister") +} + func TestConnectionInfo(t *testing.T) { // The goroutine which deletes expired entries runs indefinitely, // see https://github.com/hashicorp/golang-lru/blob/v2.0.7/expirable/expirable_lru.go#L79-L80 diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index 1831adc6840..e610928363e 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -184,6 +184,7 @@ type Component struct { collectors []Collector instanceKey string dbConnection *sql.DB + ciCollector *collector.ConnectionInfo healthErr *atomic.String openSQL func(driverName, dataSourceName string) (*sql.DB, error) logsReceiver loki.LogsReceiver @@ -250,6 +251,7 @@ func (c *Component) Run(ctx context.Context) error { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() + var ciState database_observability.CIPingState for { select { case <-ctx.Done(): @@ -260,10 +262,17 @@ func (c *Component) Run(ctx context.Context) error { c.mut.RUnlock() if !hasCollectors { + ciState = database_observability.CIPingState{} level.Debug(c.opts.Logger).Log("msg", "attempting to reconnect to database") if err := c.tryReconnect(ctx); err != nil { level.Error(c.opts.Logger).Log("msg", "reconnection attempt failed", "err", err) } + } else { + c.mut.RLock() + db := c.dbConnection + ci := c.ciCollector + c.mut.RUnlock() + database_observability.PingConnectionInfo(ctx, db, ci, &ciState) } } } @@ -519,7 +528,7 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud if err := ciCollector.Start(context.Background()); err != nil { logStartError(collector.ConnectionInfoName, "start", err) } - + c.ciCollector = ciCollector c.collectors = append(c.collectors, ciCollector) if collectors[collector.ExplainPlanCollector] { diff --git a/internal/component/database_observability/postgres/component_test.go b/internal/component/database_observability/postgres/component_test.go index 612dc3bef51..e1dfd87da30 100644 --- a/internal/component/database_observability/postgres/component_test.go +++ b/internal/component/database_observability/postgres/component_test.go @@ -734,3 +734,4 @@ func TestPostgres_Reconnection(t *testing.T) { } }) } +