Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
package database_observability

import (
"context"
"database/sql"
)

// ConnectionInfoPingThreshold is the number of consecutive ping failures before
// the connection_info metric is unregistered, and the number of consecutive
// ping successes before it is re-registered.
const ConnectionInfoPingThreshold = 3

// ConnectionInfoToggler is implemented by the ConnectionInfo collector in each
// database engine package. It allows the component to toggle metric registration
// without importing a concrete collector type.
type ConnectionInfoToggler interface {
IsRegistered() bool
Unregister()
Reregister()
}

// CIPingState tracks consecutive ping results for the connection_info metric
// toggle. It is intended to be goroutine-local (owned by Run()'s ticker loop)
// and requires no external locking.
type CIPingState struct {
failures int
successes int
lastCI ConnectionInfoToggler
}

// PingConnectionInfo pings db and toggles the connection_info metric via
// toggler based on consecutive failure or success counts in state. It should
// be called once per ticker tick from the component's Run() loop.
//
// After ConnectionInfoPingThreshold consecutive failures, toggler.Unregister()
// is called. After ConnectionInfoPingThreshold consecutive successes (while
// unregistered), toggler.Reregister() is called. When toggler changes (i.e.
// the component reconnected and created a new collector), state resets.
func PingConnectionInfo(ctx context.Context, db *sql.DB, toggler ConnectionInfoToggler, state *CIPingState) {
if toggler != state.lastCI {
state.failures = 0
state.successes = 0
state.lastCI = toggler
}

if db == nil || toggler == nil {
return
}

if err := db.PingContext(ctx); err != nil {
state.successes = 0
if toggler.IsRegistered() {
state.failures++
if state.failures >= ConnectionInfoPingThreshold {
toggler.Unregister()
state.failures = 0
}
}
} else {
state.failures = 0
if !toggler.IsRegistered() {
state.successes++
if state.successes >= ConnectionInfoPingThreshold {
toggler.Reregister()
state.successes = 0
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package database_observability

import (
"context"
"errors"
"testing"

sqlmock "github.com/DATA-DOG/go-sqlmock"
"github.com/stretchr/testify/require"
)

type mockToggler struct {
registered bool
}

func (m *mockToggler) IsRegistered() bool { return m.registered }
func (m *mockToggler) Unregister() { m.registered = false }
func (m *mockToggler) Reregister() { m.registered = true }

func TestPingConnectionInfo_UnregistersAfterThresholdFailures(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
require.NoError(t, err)
defer db.Close()

pingErr := errors.New("connection refused")
for i := 0; i < ConnectionInfoPingThreshold; i++ {
mock.ExpectPing().WillReturnError(pingErr)
}

toggler := &mockToggler{registered: true}
state := &CIPingState{}

for i := 0; i < ConnectionInfoPingThreshold; i++ {
PingConnectionInfo(context.Background(), db, toggler, state)
}

require.False(t, toggler.IsRegistered(), "metric should be unregistered after %d consecutive failures", ConnectionInfoPingThreshold)
require.NoError(t, mock.ExpectationsWereMet())
}

func TestPingConnectionInfo_ReregistersAfterThresholdSuccesses(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
require.NoError(t, err)
defer db.Close()

pingErr := errors.New("connection refused")
for i := 0; i < ConnectionInfoPingThreshold; i++ {
mock.ExpectPing().WillReturnError(pingErr)
}
for i := 0; i < ConnectionInfoPingThreshold; i++ {
mock.ExpectPing()
}

toggler := &mockToggler{registered: true}
state := &CIPingState{}

for i := 0; i < ConnectionInfoPingThreshold*2; i++ {
PingConnectionInfo(context.Background(), db, toggler, state)
}

require.True(t, toggler.IsRegistered(), "metric should be re-registered after %d consecutive successes", ConnectionInfoPingThreshold)
require.NoError(t, mock.ExpectationsWereMet())
}

func TestPingConnectionInfo_RemainsRegisteredWhilePingsSucceed(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
require.NoError(t, err)
defer db.Close()

const pings = 5
for i := 0; i < pings; i++ {
mock.ExpectPing()
}

toggler := &mockToggler{registered: true}
state := &CIPingState{}

for i := 0; i < pings; i++ {
PingConnectionInfo(context.Background(), db, toggler, state)
}

require.True(t, toggler.IsRegistered(), "metric should remain registered while pings succeed")
require.NoError(t, mock.ExpectationsWereMet())
}

func TestPingConnectionInfo_ResetsStateWhenTogglerChanges(t *testing.T) {
db, mock, err := sqlmock.New(sqlmock.MonitorPingsOption(true))
require.NoError(t, err)
defer db.Close()

pingErr := errors.New("connection refused")
for i := 0; i < ConnectionInfoPingThreshold-1; i++ {
mock.ExpectPing().WillReturnError(pingErr)
}
mock.ExpectPing() // first ping with new toggler

toggler1 := &mockToggler{registered: true}
state := &CIPingState{}

for i := 0; i < ConnectionInfoPingThreshold-1; i++ {
PingConnectionInfo(context.Background(), db, toggler1, state)
}
require.True(t, toggler1.IsRegistered(), "should not have unregistered yet")
require.Equal(t, ConnectionInfoPingThreshold-1, state.failures, "failures should have accumulated")

toggler2 := &mockToggler{registered: true}
PingConnectionInfo(context.Background(), db, toggler2, state)

require.Equal(t, 0, state.failures, "failures should reset when toggler changes")
require.True(t, toggler1.IsRegistered(), "old toggler should be unaffected")
require.NoError(t, mock.ExpectationsWereMet())
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"net"
"strings"
"sync"

"github.com/go-sql-driver/mysql"
"github.com/grafana/alloy/internal/component/database_observability"
Expand All @@ -27,7 +28,10 @@ type ConnectionInfo struct {
InfoMetric *prometheus.GaugeVec
CloudProvider *database_observability.CloudProvider

running *atomic.Bool
mu sync.Mutex
metricRegistered bool
labelValues []string
running *atomic.Bool
}

func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
Expand All @@ -40,12 +44,13 @@ func NewConnectionInfo(args ConnectionInfoArguments) (*ConnectionInfo, error) {
args.Registry.MustRegister(infoMetric)

return &ConnectionInfo{
DSN: args.DSN,
Registry: args.Registry,
EngineVersion: args.EngineVersion,
InfoMetric: infoMetric,
CloudProvider: args.CloudProvider,
running: &atomic.Bool{},
DSN: args.DSN,
Registry: args.Registry,
EngineVersion: args.EngineVersion,
InfoMetric: infoMetric,
CloudProvider: args.CloudProvider,
metricRegistered: true,
running: &atomic.Bool{},
}, nil
}

Expand Down Expand Up @@ -103,17 +108,59 @@ func (c *ConnectionInfo) Start(ctx context.Context) error {
}
}
}

c.labelValues = []string{providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, c.EngineVersion}
c.InfoMetric.WithLabelValues(c.labelValues...).Set(1)
c.running.Store(true)

c.InfoMetric.WithLabelValues(providerName, providerRegion, providerAccount, dbInstanceIdentifier, engine, c.EngineVersion).Set(1)
return nil
}

// IsRegistered reports whether the connection_info metric is currently registered
// in the Prometheus registry.
func (c *ConnectionInfo) IsRegistered() bool {
c.mu.Lock()
defer c.mu.Unlock()
return c.metricRegistered
}

// Unregister removes the connection_info metric from the Prometheus registry.
// Called by the component when consecutive DB ping failures indicate the
// instance is unreachable.
func (c *ConnectionInfo) Unregister() {
c.mu.Lock()
defer c.mu.Unlock()
if c.metricRegistered {
c.Registry.Unregister(c.InfoMetric)
c.metricRegistered = false
}
}

// Reregister adds the connection_info metric back to the Prometheus registry
// and restores its value with the label values captured during Start.
// Called by the component when the DB becomes reachable again.
func (c *ConnectionInfo) Reregister() {
c.mu.Lock()
defer c.mu.Unlock()
if !c.metricRegistered {
c.Registry.MustRegister(c.InfoMetric)
if len(c.labelValues) > 0 {
c.InfoMetric.WithLabelValues(c.labelValues...).Set(1)
}
c.metricRegistered = true
}
}

func (c *ConnectionInfo) Stopped() bool {
return !c.running.Load()
}

func (c *ConnectionInfo) Stop() {
c.Registry.Unregister(c.InfoMetric)
c.mu.Lock()
if c.metricRegistered {
c.Registry.Unregister(c.InfoMetric)
c.metricRegistered = false
}
c.mu.Unlock()
c.running.Store(false)
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,57 @@ import (
"github.com/grafana/alloy/internal/component/database_observability"
)

func TestConnectionInfo_Unregister(t *testing.T) {
defer goleak.VerifyNone(t)

reg := prometheus.NewRegistry()
c, err := NewConnectionInfo(ConnectionInfoArguments{
DSN: "user:pass@tcp(localhost:3306)/schema",
Registry: reg,
EngineVersion: "8.0.32",
})
require.NoError(t, err)
require.NoError(t, c.Start(t.Context()))

mfs, err := reg.Gather()
require.NoError(t, err)
require.Len(t, mfs, 1, "metric should be present before Unregister")

c.Unregister()

mfs, err = reg.Gather()
require.NoError(t, err)
require.Empty(t, mfs, "metric should be absent after Unregister")
require.False(t, c.IsRegistered())
}

func TestConnectionInfo_Reregister(t *testing.T) {
defer goleak.VerifyNone(t)

reg := prometheus.NewRegistry()
c, err := NewConnectionInfo(ConnectionInfoArguments{
DSN: "user:pass@tcp(products-db.abc123xyz.us-east-1.rds.amazonaws.com:3306)/schema",
Registry: reg,
EngineVersion: "8.0.32",
})
require.NoError(t, err)
require.NoError(t, c.Start(t.Context()))

c.Unregister()
require.False(t, c.IsRegistered())

c.Reregister()
require.True(t, c.IsRegistered())

const expected = `
# HELP database_observability_connection_info Information about the connection
# TYPE database_observability_connection_info gauge
database_observability_connection_info{db_instance_identifier="products-db",engine="mysql",engine_version="8.0.32",provider_account="unknown",provider_name="aws",provider_region="us-east-1"} 1
`
err = testutil.GatherAndCompare(reg, strings.NewReader(expected))
require.NoError(t, err, "metric should be restored with original label values after Reregister")
}

func TestConnectionInfo(t *testing.T) {
defer goleak.VerifyNone(t)

Expand Down
10 changes: 10 additions & 0 deletions internal/component/database_observability/mysql/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ type Component struct {
collectors []Collector
instanceKey string
dbConnection *sql.DB
ciCollector *collector.ConnectionInfo
healthErr *atomic.String
openSQL func(driverName, dataSourceName string) (*sql.DB, error)
}
Expand Down Expand Up @@ -277,6 +278,7 @@ func (c *Component) Run(ctx context.Context) error {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()

var ciState database_observability.CIPingState
for {
select {
case <-ctx.Done():
Expand All @@ -287,10 +289,17 @@ func (c *Component) Run(ctx context.Context) error {
c.mut.RUnlock()

if !hasCollectors {
ciState = database_observability.CIPingState{}
level.Debug(c.opts.Logger).Log("msg", "attempting to reconnect to database")
if err := c.tryReconnect(ctx); err != nil {
level.Error(c.opts.Logger).Log("msg", "reconnection attempt failed", "err", err)
}
} else {
c.mut.RLock()
db := c.dbConnection
ci := c.ciCollector
c.mut.RUnlock()
database_observability.PingConnectionInfo(ctx, db, ci, &ciState)
}
}
}
Expand Down Expand Up @@ -645,6 +654,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse
if err := ciCollector.Start(context.Background()); err != nil {
logStartError(collector.ConnectionInfoName, "start", err)
}
c.ciCollector = ciCollector
c.collectors = append(c.collectors, ciCollector)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -557,3 +557,4 @@
}
})
}

Check failure on line 560 in internal/component/database_observability/mysql/component_test.go

View workflow job for this annotation

GitHub Actions / lint / Lint Go

File is not properly formatted (gofmt)
Loading
Loading