From e32d29810209998d9c6cf8675318fd451882f4ae Mon Sep 17 00:00:00 2001 From: Matt Nolf Date: Tue, 16 Dec 2025 11:03:41 +0000 Subject: [PATCH 1/4] add health_check collector to validate configuration --- .../database_observability.mysql.md | 9 + .../mysql/collector/health_check.go | 356 ++++++++++++++++++ .../mysql/collector/health_check_test.go | 325 ++++++++++++++++ .../database_observability/mysql/component.go | 24 ++ 4 files changed, 714 insertions(+) create mode 100644 internal/component/database_observability/mysql/collector/health_check.go create mode 100644 internal/component/database_observability/mysql/collector/health_check_test.go diff --git a/docs/sources/reference/components/database_observability/database_observability.mysql.md b/docs/sources/reference/components/database_observability/database_observability.mysql.md index 9691571d000..655670bc177 100644 --- a/docs/sources/reference/components/database_observability/database_observability.mysql.md +++ b/docs/sources/reference/components/database_observability/database_observability.mysql.md @@ -62,6 +62,7 @@ You can use the following blocks with `database_observability.mysql`: | [`explain_plans`][explain_plans] | Configure the explain plans collector. | no | | [`locks`][locks] | Configure the locks collector. | no | | [`query_samples`][query_samples] | Configure the query samples collector. | no | +| [`health_check`][health_check] | Configure the health check collector. | no | The > symbol indicates deeper levels of nesting. For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an `cloud_provider` block. @@ -75,6 +76,7 @@ For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an [locks]: #locks [query_samples]: #query_samples [setup_actors]: #setup_actors +[health_check]: #health_check ### `cloud_provider` @@ -146,6 +148,13 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu | `collect_interval` | `duration` | How frequently to check if `setup_actors` are configured correctly. | `"1h"` | no | +### `health_checks` + +| Name | Type | Description | Default | Required | +| -------------------------- | ---------- | ---------------------------------------------------------------------- | ------- | -------- | +| `collect_interval` | `duration` | How frequently to run health checks. | `"1h"` | no | + + ## Example ```alloy diff --git a/internal/component/database_observability/mysql/collector/health_check.go b/internal/component/database_observability/mysql/collector/health_check.go new file mode 100644 index 00000000000..8e24876ea5b --- /dev/null +++ b/internal/component/database_observability/mysql/collector/health_check.go @@ -0,0 +1,356 @@ +package collector + +import ( + "context" + "database/sql" + "fmt" + "regexp" + "strings" + "time" + + "github.com/blang/semver/v4" + "github.com/go-kit/log" + "go.uber.org/atomic" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +// Extracts leading major.minor.patch from a version string (e.g., "8.0.36" from "8.0.36-28.1"). +var mysqlVersionRegex = regexp.MustCompile(`^((\d+)(\.\d+)(\.\d+))`) + +const ( + HealthCheckCollector = "health_check" + OP_HEALTH_STATUS = "health_status" +) + +type HealthCheckArguments struct { + DB *sql.DB + CollectInterval time.Duration + EntryHandler loki.EntryHandler + + Logger log.Logger +} + +type HealthCheck struct { + dbConnection *sql.DB + collectInterval time.Duration + entryHandler loki.EntryHandler + logger log.Logger + + running *atomic.Bool + ctx context.Context + cancel context.CancelFunc +} + +func NewHealthCheck(args HealthCheckArguments) (*HealthCheck, error) { + h := &HealthCheck{ + dbConnection: args.DB, + collectInterval: args.CollectInterval, + entryHandler: args.EntryHandler, + logger: log.With(args.Logger, "collector", HealthCheckCollector), + running: &atomic.Bool{}, + } + return h, nil +} + +func (c *HealthCheck) Name() string { + return HealthCheckCollector +} + +func (c *HealthCheck) Start(ctx context.Context) error { + level.Debug(c.logger).Log("msg", "collector started") + + c.running.Store(true) + ctx, cancel := context.WithCancel(ctx) + c.ctx = ctx + c.cancel = cancel + + go func() { + defer func() { + c.Stop() + c.running.Store(false) + }() + + ticker := time.NewTicker(c.collectInterval) + + for { + if err := c.fetchHealthChecks(c.ctx); err != nil { + level.Error(c.logger).Log("msg", "collector error", "err", err) + } + + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // continue loop + } + } + }() + + return nil +} + +func (c *HealthCheck) Stopped() bool { + return !c.running.Load() +} + +// Stop should be kept idempotent +func (c *HealthCheck) Stop() { + if c.cancel != nil { + c.cancel() + } +} + +type healthCheckResult struct { + name string + result bool + value string + err error +} + +func (c *HealthCheck) fetchHealthChecks(ctx context.Context) error { + checks := []func(context.Context, *sql.DB) healthCheckResult{ + checkDBConnectionValid, + checkPerformanceSchemaEnabled, + checkMySQLVersion, + checkAlloyVersion, + checkRequiredGrants, + checkDigestVariablesLength, + checkSetupConsumerCPUTimeEnabled, + checkSetupConsumersEventsWaitsEnabled, + checkEventsStatementsDigestHasRows, + } + + for _, checkFn := range checks { + result := checkFn(ctx, c.dbConnection) + if result.err != nil { + level.Error(c.logger).Log("msg", "health check failed", "check", result.name, "err", result.err) + continue + } + msg := fmt.Sprintf(`check="%s" result="%v" value="%s"`, result.name, result.result, result.value) + c.entryHandler.Chan() <- database_observability.BuildLokiEntry( + logging.LevelInfo, + OP_HEALTH_STATUS, + msg, + ) + } + + return nil +} + +// checkPerformanceSchemaEnabled validates that performance_schema is enabled. +func checkPerformanceSchemaEnabled(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "PerformaneSchemaEnabled"} + const q = `SHOW VARIABLES LIKE 'performance_schema'` + var varName, varValue string + if err := db.QueryRowContext(ctx, q).Scan(&varName, &varValue); err != nil { + r.err = fmt.Errorf("query performance_schema variable: %w", err) + return r + } + + r.result = strings.EqualFold(varValue, "ON") || varValue == "1" + r.value = varValue + return r +} + +// checkDBConnectionValid validates the database connection with a short timeout. +func checkDBConnectionValid(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "DBConnectionValid"} + subCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + if err := db.PingContext(subCtx); err != nil { + r.value = err.Error() + return r + } + r.result = true + r.value = "ok" + return r +} + +// checkMySQLVersion validates that the database the MySQL version >= 8.0. +func checkMySQLVersion(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "MySQLVersionSupported"} + const q = `SELECT VERSION()` + var version string + if err := db.QueryRowContext(ctx, q).Scan(&version); err != nil { + r.err = fmt.Errorf("query version(): %w", err) + return r + } + + matches := mysqlVersionRegex.FindStringSubmatch(version) + if len(matches) <= 1 { + r.err = fmt.Errorf("unexpected version format: %s", version) + return r + } + + parsed, err := semver.ParseTolerant(matches[1]) + if err != nil { + r.err = fmt.Errorf("parse semver: %w", err) + return r + } + + r.result = semver.MustParseRange(">=8.0.0")(parsed) + r.value = version + return r +} + +// checkAlloyVersion reports the running Alloy version. +func checkAlloyVersion(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "AlloyVersion"} + // Always succeeds; returns the version string embedded at build time. + r.result = true + r.value = build.Version + return r +} + +// checkRequiredGrants verifies required privileges are present. +func checkRequiredGrants(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "RequiredGrantsPresent"} + req := map[string]bool{ + "PROCESS": false, + "REPLICATION CLIENT": false, + "SELECT": false, + "SHOW VIEW": false, + } + + rows, err := db.QueryContext(ctx, "SHOW GRANTS") + if err != nil { + r.err = fmt.Errorf("SHOW GRANTS: %w", err) + return r + } + defer rows.Close() + + for rows.Next() { + var grantLine string + if err := rows.Scan(&grantLine); err != nil { + r.err = fmt.Errorf("scan SHOW GRANTS: %w", err) + return r + } + up := strings.ToUpper(grantLine) + + // Mark individual privileges if present on *.* scope. + for k := range req { + if strings.Contains(up, " ON *.*") && strings.Contains(up, k) { + req[k] = true + } + } + } + if err := rows.Err(); err != nil { + r.err = fmt.Errorf("iterate SHOW GRANTS: %w", err) + return r + } + + r.result = true + for k, found := range req { + if !found { + r.result = false + if r.value == "" { + r.value = "missing: " + k + } else { + r.value += "," + k + } + } + } + + return r +} + +// checkDigestVariablesLength ensures text/digest length variables are >= 4096. +func checkDigestVariablesLength(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "DigestVariablesLengthCheck"} + const q = ` +SELECT + @@performance_schema_max_sql_text_length, + @@performance_schema_max_digest_length, + @@max_digest_length` + + var sqlTextLen, digestLen, maxDigestLen sql.NullInt64 + if err := db.QueryRowContext(ctx, q).Scan(&sqlTextLen, &digestLen, &maxDigestLen); err != nil { + r.err = fmt.Errorf("query perf schema length vars: %w", err) + return r + } + + r.result = true + if sqlTextLen.Int64 < 4096 { + r.result = false + r.value += fmt.Sprintf("performance_schema_max_sql_text_length=%d < 4096", sqlTextLen.Int64) + } + if digestLen.Int64 < 4096 { + r.result = false + r.value += fmt.Sprintf(" performance_schema_max_digest_length=%d < 4096", digestLen.Int64) + } + if maxDigestLen.Int64 < 4096 { + r.result = false + r.value += fmt.Sprintf(" max_digest_length=%d < 4096", maxDigestLen.Int64) + } + + return r +} + +// checkEventsStatementsDigestHasRows ensures performance_schema.events_statements_summary_by_digest has rows. +func checkEventsStatementsDigestHasRows(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "PerformanceSchemaHasRows"} + const q = `SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest` + var rowCount int64 + if err := db.QueryRowContext(ctx, q).Scan(&rowCount); err != nil { + r.err = err + return r + } + if rowCount == 0 { + return r + } + r.result = true + return r +} + +// checkSetupConsumerCPUTimeEnabled validates events_statements_cpu consumer is enabled. +func checkSetupConsumerCPUTimeEnabled(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "SetupConsumerCPUTimeEnabled"} + const q = `SELECT enabled FROM performance_schema.setup_consumers WHERE NAME = 'events_statements_cpu'` + var enabled string + if err := db.QueryRowContext(ctx, q).Scan(&enabled); err != nil { + r.err = fmt.Errorf("query setup_consumers: %w", err) + return r + } + r.result = enabled == "YES" + r.value = enabled + return r +} + +// checkSetupConsumersEventsWaitsEnabled validates events_waits_current and events_waits_history consumers are enabled. +func checkSetupConsumersEventsWaitsEnabled(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "SetupConsumersEventsWaitsEnabled"} + const q = `SELECT name, enabled FROM performance_schema.setup_consumers WHERE NAME IN ('events_waits_current','events_waits_history')` + + rows, err := db.QueryContext(ctx, q) + if err != nil { + r.err = fmt.Errorf("query setup_consumers: %w", err) + return r + } + defer rows.Close() + + r.result = true + + for rows.Next() { + var consumerName, enabled string + if err := rows.Scan(&consumerName, &enabled); err != nil { + r.err = fmt.Errorf("scan setup_consumers: %w", err) + return r + } + if enabled != "YES" { + r.result = false + r.value += fmt.Sprintf(" %v=%v", consumerName, enabled) + } + } + if err := rows.Err(); err != nil { + r.err = fmt.Errorf("iterate setup_consumers: %w", err) + return r + } + + return r +} diff --git a/internal/component/database_observability/mysql/collector/health_check_test.go b/internal/component/database_observability/mysql/collector/health_check_test.go new file mode 100644 index 00000000000..f43ee136685 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/health_check_test.go @@ -0,0 +1,325 @@ +package collector + +import ( + "os" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +func TestHealthCheck(t *testing.T) { + defer goleak.VerifyNone(t) + + t.Run("all checks pass", func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New( + sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), + sqlmock.MonitorPingsOption(true), + ) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + + collector, err := NewHealthCheck(HealthCheckArguments{ + DB: db, + CollectInterval: 100 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + require.NotNil(t, collector) + + // Setup all checks to pass (no custom expectation) + setupExpectQueryAssertions("", mock, nil) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 9 + }, 5*time.Second, 10*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 10*time.Millisecond) + + lokiClient.Stop() + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + require.GreaterOrEqual(t, len(lokiEntries), 9) + + for _, entry := range lokiEntries[:9] { + require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) + require.Contains(t, entry.Line, `result="true"`) + } + }) + + t.Run("individual check failures", func(t *testing.T) { + testCases := []struct { + name string + failingCheckName string + customSetup func(mock sqlmock.Sqlmock) + expectedResult string + }{ + { + name: "performance schema disabled", + failingCheckName: "PerformaneSchemaEnabled", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SHOW VARIABLES LIKE 'performance_schema'`). + WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("performance_schema", "OFF"), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "mysql version too old", + failingCheckName: "MySQLVersionSupported", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT VERSION()`). + WillReturnRows( + sqlmock.NewRows([]string{"VERSION()"}). + AddRow("5.7.44"), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "missing grants", + failingCheckName: "RequiredGrantsPresent", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SHOW GRANTS`). + WillReturnRows( + sqlmock.NewRows([]string{"Grants"}). + AddRow("GRANT SELECT, SHOW VIEW ON *.* TO 'user'@'host'"), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "digest variables too short", + failingCheckName: "DigestVariablesLengthCheck", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(` +SELECT + @@performance_schema_max_sql_text_length, + @@performance_schema_max_digest_length, + @@max_digest_length`). + WillReturnRows( + sqlmock.NewRows([]string{"@@performance_schema_max_sql_text_length", "@@performance_schema_max_digest_length", "@@max_digest_length"}). + AddRow(1024, 2048, 1024), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "setup consumer cpu time disabled", + failingCheckName: "SetupConsumerCPUTimeEnabled", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT enabled FROM performance_schema.setup_consumers WHERE NAME = 'events_statements_cpu'`). + WillReturnRows( + sqlmock.NewRows([]string{"enabled"}). + AddRow("NO"), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "events waits consumer partially disabled", + failingCheckName: "SetupConsumersEventsWaitsEnabled", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT name, enabled FROM performance_schema.setup_consumers WHERE NAME IN ('events_waits_current','events_waits_history')`). + WillReturnRows( + sqlmock.NewRows([]string{"name", "enabled"}). + AddRow("events_waits_current", "YES"). + AddRow("events_waits_history", "NO"), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "no rows in events statements digest", + failingCheckName: "PerformanceSchemaHasRows", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest`). + WillReturnRows( + sqlmock.NewRows([]string{"COUNT(*)"}). + AddRow(0), + ) + }, + expectedResult: `result="false"`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New( + sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), + sqlmock.MonitorPingsOption(true), + ) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + + collector, err := NewHealthCheck(HealthCheckArguments{ + DB: db, + CollectInterval: 100 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + + setupExpectQueryAssertions(tc.failingCheckName, mock, tc.customSetup) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 9 + }, 5*time.Second, 10*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 10*time.Millisecond) + + lokiClient.Stop() + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + + found := false + for _, entry := range lokiEntries { + if strings.Contains(entry.Line, tc.failingCheckName) { + require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) + require.Contains(t, entry.Line, tc.expectedResult) + found = true + break + } + } + require.True(t, found) + }) + } + }) +} + +func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSetup func(mock sqlmock.Sqlmock)) { + type checkSetup struct { + name string + setup func(mock sqlmock.Sqlmock) + } + + checks := []checkSetup{ + { + name: "DBConnectionValid", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectPing().WillDelayFor(10 * time.Millisecond) + }, + }, + { + name: "PerformaneSchemaEnabled", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SHOW VARIABLES LIKE 'performance_schema'`). + WillReturnRows( + sqlmock.NewRows([]string{"Variable_name", "Value"}). + AddRow("performance_schema", "ON"), + ) + }, + }, + { + name: "MySQLVersionSupported", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT VERSION()`). + WillReturnRows( + sqlmock.NewRows([]string{"VERSION()"}). + AddRow("8.0.36"), + ) + }, + }, + { + name: "RequiredGrantsPresent", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SHOW GRANTS`). + WillReturnRows( + sqlmock.NewRows([]string{"Grants"}). + AddRow("GRANT PROCESS, REPLICATION CLIENT, SELECT, SHOW VIEW ON *.* TO 'user'@'host'"), + ) + }, + }, + { + name: "DigestVariablesLengthCheck", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(` +SELECT + @@performance_schema_max_sql_text_length, + @@performance_schema_max_digest_length, + @@max_digest_length`). + WillReturnRows( + sqlmock.NewRows([]string{"@@performance_schema_max_sql_text_length", "@@performance_schema_max_digest_length", "@@max_digest_length"}). + AddRow(4096, 4096, 4096), + ) + }, + }, + { + name: "SetupConsumerCPUTimeEnabled", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT enabled FROM performance_schema.setup_consumers WHERE NAME = 'events_statements_cpu'`). + WillReturnRows( + sqlmock.NewRows([]string{"enabled"}). + AddRow("YES"), + ) + }, + }, + { + name: "SetupConsumersEventsWaitsEnabled", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT name, enabled FROM performance_schema.setup_consumers WHERE NAME IN ('events_waits_current','events_waits_history')`). + WillReturnRows( + sqlmock.NewRows([]string{"name", "enabled"}). + AddRow("events_waits_current", "YES"). + AddRow("events_waits_history", "YES"), + ) + }, + }, + { + name: "PerformanceSchemaHasRows", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest`). + WillReturnRows( + sqlmock.NewRows([]string{"COUNT(*)"}). + AddRow(100), + ) + }, + }, + } + + for _, check := range checks { + if check.name == checkName { + customSetup(mock) + continue + } + check.setup(mock) + } +} diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index 2a4ee73b2b1..0bbb96f4045 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -70,6 +70,7 @@ type Arguments struct { ExplainPlansArguments ExplainPlansArguments `alloy:"explain_plans,block,optional"` LocksArguments LocksArguments `alloy:"locks,block,optional"` QuerySamplesArguments QuerySamplesArguments `alloy:"query_samples,block,optional"` + HealthCheckArguments HealthCheckArguments `alloy:"health_check,block,optional"` } type CloudProvider struct { @@ -119,6 +120,10 @@ type QuerySamplesArguments struct { SetupConsumersCheckInterval time.Duration `alloy:"setup_consumers_check_interval,attr,optional"` } +type HealthCheckArguments struct { + CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` +} + var DefaultArguments = Arguments{ AllowUpdatePerfSchemaSettings: false, @@ -159,6 +164,9 @@ var DefaultArguments = Arguments{ AutoEnableSetupConsumers: false, SetupConsumersCheckInterval: 1 * time.Hour, }, + HealthCheckArguments: HealthCheckArguments{ + CollectInterval: 1 * time.Hour, + }, } func (a *Arguments) SetToDefault() { @@ -573,6 +581,22 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse c.collectors = append(c.collectors, ciCollector) } + // HealthCheck collector is always enabled + hcCollector, err := collector.NewHealthCheck(collector.HealthCheckArguments{ + DB: c.dbConnection, + CollectInterval: c.args.HealthCheckArguments.CollectInterval, + EntryHandler: entryHandler, + Logger: c.opts.Logger, + }) + if err != nil { + logStartError(collector.HealthCheckCollector, "create", err) + } else { + if err := hcCollector.Start(context.Background()); err != nil { + logStartError(collector.HealthCheckCollector, "start", err) + } + c.collectors = append(c.collectors, hcCollector) + } + if len(startErrors) > 0 { return fmt.Errorf("failed to start some collectors: %s", strings.Join(startErrors, ", ")) } From b1421556feda18ef9b078adfd8d0314fbcf98dfc Mon Sep 17 00:00:00 2001 From: Matt Nolf Date: Tue, 6 Jan 2026 12:15:40 +0000 Subject: [PATCH 2/4] remove checks which can be computed from other data --- .../mysql/collector/health_check.go | 131 ------------------ .../mysql/collector/health_check_test.go | 112 +-------------- 2 files changed, 4 insertions(+), 239 deletions(-) diff --git a/internal/component/database_observability/mysql/collector/health_check.go b/internal/component/database_observability/mysql/collector/health_check.go index 8e24876ea5b..66e4fe3ab52 100644 --- a/internal/component/database_observability/mysql/collector/health_check.go +++ b/internal/component/database_observability/mysql/collector/health_check.go @@ -4,11 +4,9 @@ import ( "context" "database/sql" "fmt" - "regexp" "strings" "time" - "github.com/blang/semver/v4" "github.com/go-kit/log" "go.uber.org/atomic" @@ -19,9 +17,6 @@ import ( "github.com/grafana/alloy/internal/runtime/logging/level" ) -// Extracts leading major.minor.patch from a version string (e.g., "8.0.36" from "8.0.36-28.1"). -var mysqlVersionRegex = regexp.MustCompile(`^((\d+)(\.\d+)(\.\d+))`) - const ( HealthCheckCollector = "health_check" OP_HEALTH_STATUS = "health_status" @@ -114,14 +109,9 @@ type healthCheckResult struct { func (c *HealthCheck) fetchHealthChecks(ctx context.Context) error { checks := []func(context.Context, *sql.DB) healthCheckResult{ - checkDBConnectionValid, checkPerformanceSchemaEnabled, - checkMySQLVersion, checkAlloyVersion, checkRequiredGrants, - checkDigestVariablesLength, - checkSetupConsumerCPUTimeEnabled, - checkSetupConsumersEventsWaitsEnabled, checkEventsStatementsDigestHasRows, } @@ -157,48 +147,6 @@ func checkPerformanceSchemaEnabled(ctx context.Context, db *sql.DB) healthCheckR return r } -// checkDBConnectionValid validates the database connection with a short timeout. -func checkDBConnectionValid(ctx context.Context, db *sql.DB) healthCheckResult { - r := healthCheckResult{name: "DBConnectionValid"} - subCtx, cancel := context.WithTimeout(ctx, 5*time.Second) - defer cancel() - - if err := db.PingContext(subCtx); err != nil { - r.value = err.Error() - return r - } - r.result = true - r.value = "ok" - return r -} - -// checkMySQLVersion validates that the database the MySQL version >= 8.0. -func checkMySQLVersion(ctx context.Context, db *sql.DB) healthCheckResult { - r := healthCheckResult{name: "MySQLVersionSupported"} - const q = `SELECT VERSION()` - var version string - if err := db.QueryRowContext(ctx, q).Scan(&version); err != nil { - r.err = fmt.Errorf("query version(): %w", err) - return r - } - - matches := mysqlVersionRegex.FindStringSubmatch(version) - if len(matches) <= 1 { - r.err = fmt.Errorf("unexpected version format: %s", version) - return r - } - - parsed, err := semver.ParseTolerant(matches[1]) - if err != nil { - r.err = fmt.Errorf("parse semver: %w", err) - return r - } - - r.result = semver.MustParseRange(">=8.0.0")(parsed) - r.value = version - return r -} - // checkAlloyVersion reports the running Alloy version. func checkAlloyVersion(ctx context.Context, db *sql.DB) healthCheckResult { r := healthCheckResult{name: "AlloyVersion"} @@ -260,38 +208,6 @@ func checkRequiredGrants(ctx context.Context, db *sql.DB) healthCheckResult { return r } -// checkDigestVariablesLength ensures text/digest length variables are >= 4096. -func checkDigestVariablesLength(ctx context.Context, db *sql.DB) healthCheckResult { - r := healthCheckResult{name: "DigestVariablesLengthCheck"} - const q = ` -SELECT - @@performance_schema_max_sql_text_length, - @@performance_schema_max_digest_length, - @@max_digest_length` - - var sqlTextLen, digestLen, maxDigestLen sql.NullInt64 - if err := db.QueryRowContext(ctx, q).Scan(&sqlTextLen, &digestLen, &maxDigestLen); err != nil { - r.err = fmt.Errorf("query perf schema length vars: %w", err) - return r - } - - r.result = true - if sqlTextLen.Int64 < 4096 { - r.result = false - r.value += fmt.Sprintf("performance_schema_max_sql_text_length=%d < 4096", sqlTextLen.Int64) - } - if digestLen.Int64 < 4096 { - r.result = false - r.value += fmt.Sprintf(" performance_schema_max_digest_length=%d < 4096", digestLen.Int64) - } - if maxDigestLen.Int64 < 4096 { - r.result = false - r.value += fmt.Sprintf(" max_digest_length=%d < 4096", maxDigestLen.Int64) - } - - return r -} - // checkEventsStatementsDigestHasRows ensures performance_schema.events_statements_summary_by_digest has rows. func checkEventsStatementsDigestHasRows(ctx context.Context, db *sql.DB) healthCheckResult { r := healthCheckResult{name: "PerformanceSchemaHasRows"} @@ -307,50 +223,3 @@ func checkEventsStatementsDigestHasRows(ctx context.Context, db *sql.DB) healthC r.result = true return r } - -// checkSetupConsumerCPUTimeEnabled validates events_statements_cpu consumer is enabled. -func checkSetupConsumerCPUTimeEnabled(ctx context.Context, db *sql.DB) healthCheckResult { - r := healthCheckResult{name: "SetupConsumerCPUTimeEnabled"} - const q = `SELECT enabled FROM performance_schema.setup_consumers WHERE NAME = 'events_statements_cpu'` - var enabled string - if err := db.QueryRowContext(ctx, q).Scan(&enabled); err != nil { - r.err = fmt.Errorf("query setup_consumers: %w", err) - return r - } - r.result = enabled == "YES" - r.value = enabled - return r -} - -// checkSetupConsumersEventsWaitsEnabled validates events_waits_current and events_waits_history consumers are enabled. -func checkSetupConsumersEventsWaitsEnabled(ctx context.Context, db *sql.DB) healthCheckResult { - r := healthCheckResult{name: "SetupConsumersEventsWaitsEnabled"} - const q = `SELECT name, enabled FROM performance_schema.setup_consumers WHERE NAME IN ('events_waits_current','events_waits_history')` - - rows, err := db.QueryContext(ctx, q) - if err != nil { - r.err = fmt.Errorf("query setup_consumers: %w", err) - return r - } - defer rows.Close() - - r.result = true - - for rows.Next() { - var consumerName, enabled string - if err := rows.Scan(&consumerName, &enabled); err != nil { - r.err = fmt.Errorf("scan setup_consumers: %w", err) - return r - } - if enabled != "YES" { - r.result = false - r.value += fmt.Sprintf(" %v=%v", consumerName, enabled) - } - } - if err := rows.Err(); err != nil { - r.err = fmt.Errorf("iterate setup_consumers: %w", err) - return r - } - - return r -} diff --git a/internal/component/database_observability/mysql/collector/health_check_test.go b/internal/component/database_observability/mysql/collector/health_check_test.go index f43ee136685..39141b6767b 100644 --- a/internal/component/database_observability/mysql/collector/health_check_test.go +++ b/internal/component/database_observability/mysql/collector/health_check_test.go @@ -46,7 +46,7 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - return len(lokiClient.Received()) >= 9 + return len(lokiClient.Received()) >= 4 }, 5*time.Second, 10*time.Millisecond) collector.Stop() @@ -61,9 +61,9 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - require.GreaterOrEqual(t, len(lokiEntries), 9) + require.GreaterOrEqual(t, len(lokiEntries), 4) - for _, entry := range lokiEntries[:9] { + for _, entry := range lokiEntries[:4] { require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) require.Contains(t, entry.Line, `result="true"`) } @@ -88,18 +88,6 @@ func TestHealthCheck(t *testing.T) { }, expectedResult: `result="false"`, }, - { - name: "mysql version too old", - failingCheckName: "MySQLVersionSupported", - customSetup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SELECT VERSION()`). - WillReturnRows( - sqlmock.NewRows([]string{"VERSION()"}). - AddRow("5.7.44"), - ) - }, - expectedResult: `result="false"`, - }, { name: "missing grants", failingCheckName: "RequiredGrantsPresent", @@ -112,47 +100,6 @@ func TestHealthCheck(t *testing.T) { }, expectedResult: `result="false"`, }, - { - name: "digest variables too short", - failingCheckName: "DigestVariablesLengthCheck", - customSetup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(` -SELECT - @@performance_schema_max_sql_text_length, - @@performance_schema_max_digest_length, - @@max_digest_length`). - WillReturnRows( - sqlmock.NewRows([]string{"@@performance_schema_max_sql_text_length", "@@performance_schema_max_digest_length", "@@max_digest_length"}). - AddRow(1024, 2048, 1024), - ) - }, - expectedResult: `result="false"`, - }, - { - name: "setup consumer cpu time disabled", - failingCheckName: "SetupConsumerCPUTimeEnabled", - customSetup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SELECT enabled FROM performance_schema.setup_consumers WHERE NAME = 'events_statements_cpu'`). - WillReturnRows( - sqlmock.NewRows([]string{"enabled"}). - AddRow("NO"), - ) - }, - expectedResult: `result="false"`, - }, - { - name: "events waits consumer partially disabled", - failingCheckName: "SetupConsumersEventsWaitsEnabled", - customSetup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SELECT name, enabled FROM performance_schema.setup_consumers WHERE NAME IN ('events_waits_current','events_waits_history')`). - WillReturnRows( - sqlmock.NewRows([]string{"name", "enabled"}). - AddRow("events_waits_current", "YES"). - AddRow("events_waits_history", "NO"), - ) - }, - expectedResult: `result="false"`, - }, { name: "no rows in events statements digest", failingCheckName: "PerformanceSchemaHasRows", @@ -194,7 +141,7 @@ SELECT require.NoError(t, err) require.Eventually(t, func() bool { - return len(lokiClient.Received()) >= 9 + return len(lokiClient.Received()) >= 4 }, 5*time.Second, 10*time.Millisecond) collector.Stop() @@ -232,12 +179,6 @@ func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSe } checks := []checkSetup{ - { - name: "DBConnectionValid", - setup: func(mock sqlmock.Sqlmock) { - mock.ExpectPing().WillDelayFor(10 * time.Millisecond) - }, - }, { name: "PerformaneSchemaEnabled", setup: func(mock sqlmock.Sqlmock) { @@ -248,16 +189,6 @@ func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSe ) }, }, - { - name: "MySQLVersionSupported", - setup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SELECT VERSION()`). - WillReturnRows( - sqlmock.NewRows([]string{"VERSION()"}). - AddRow("8.0.36"), - ) - }, - }, { name: "RequiredGrantsPresent", setup: func(mock sqlmock.Sqlmock) { @@ -268,41 +199,6 @@ func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSe ) }, }, - { - name: "DigestVariablesLengthCheck", - setup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(` -SELECT - @@performance_schema_max_sql_text_length, - @@performance_schema_max_digest_length, - @@max_digest_length`). - WillReturnRows( - sqlmock.NewRows([]string{"@@performance_schema_max_sql_text_length", "@@performance_schema_max_digest_length", "@@max_digest_length"}). - AddRow(4096, 4096, 4096), - ) - }, - }, - { - name: "SetupConsumerCPUTimeEnabled", - setup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SELECT enabled FROM performance_schema.setup_consumers WHERE NAME = 'events_statements_cpu'`). - WillReturnRows( - sqlmock.NewRows([]string{"enabled"}). - AddRow("YES"), - ) - }, - }, - { - name: "SetupConsumersEventsWaitsEnabled", - setup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SELECT name, enabled FROM performance_schema.setup_consumers WHERE NAME IN ('events_waits_current','events_waits_history')`). - WillReturnRows( - sqlmock.NewRows([]string{"name", "enabled"}). - AddRow("events_waits_current", "YES"). - AddRow("events_waits_history", "YES"), - ) - }, - }, { name: "PerformanceSchemaHasRows", setup: func(mock sqlmock.Sqlmock) { From d95259e71d2b31fe04512971960e520689919de4 Mon Sep 17 00:00:00 2001 From: Matt Nolf Date: Tue, 6 Jan 2026 15:06:41 +0000 Subject: [PATCH 3/4] set health check collect interval in tests --- .../component/database_observability/mysql/component_test.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/internal/component/database_observability/mysql/component_test.go b/internal/component/database_observability/mysql/component_test.go index 95193f9bfb1..bc711fe5eb3 100644 --- a/internal/component/database_observability/mysql/component_test.go +++ b/internal/component/database_observability/mysql/component_test.go @@ -324,6 +324,9 @@ func TestMySQL_StartCollectors_ReportsUnhealthy_StackedErrors(t *testing.T) { CollectInterval: time.Second, Threshold: time.Second, }, + HealthCheckArguments: HealthCheckArguments{ + CollectInterval: 1 * time.Hour, + }, } var gotExports cmp.Exports opts := cmp.Options{ From 9c06c5aff5e9f3e88186f3b29206b83d37d3db99 Mon Sep 17 00:00:00 2001 From: Matt Nolf Date: Tue, 6 Jan 2026 16:15:29 +0000 Subject: [PATCH 4/4] remove perf_schema check --- .../mysql/collector/health_check.go | 16 ---------- .../mysql/collector/health_check_test.go | 30 +++---------------- 2 files changed, 4 insertions(+), 42 deletions(-) diff --git a/internal/component/database_observability/mysql/collector/health_check.go b/internal/component/database_observability/mysql/collector/health_check.go index 8e4300ba23c..3b9150be605 100644 --- a/internal/component/database_observability/mysql/collector/health_check.go +++ b/internal/component/database_observability/mysql/collector/health_check.go @@ -106,7 +106,6 @@ type healthCheckResult struct { func (c *HealthCheck) fetchHealthChecks(ctx context.Context) { checks := []func(context.Context, *sql.DB) healthCheckResult{ - checkPerformanceSchemaEnabled, checkAlloyVersion, checkRequiredGrants, checkEventsStatementsDigestHasRows, @@ -127,21 +126,6 @@ func (c *HealthCheck) fetchHealthChecks(ctx context.Context) { } } -// checkPerformanceSchemaEnabled validates that performance_schema is enabled. -func checkPerformanceSchemaEnabled(ctx context.Context, db *sql.DB) healthCheckResult { - r := healthCheckResult{name: "PerformaneSchemaEnabled"} - const q = `SHOW VARIABLES LIKE 'performance_schema'` - var varName, varValue string - if err := db.QueryRowContext(ctx, q).Scan(&varName, &varValue); err != nil { - r.err = fmt.Errorf("query performance_schema variable: %w", err) - return r - } - - r.result = strings.EqualFold(varValue, "ON") || varValue == "1" - r.value = varValue - return r -} - // checkAlloyVersion reports the running Alloy version. func checkAlloyVersion(ctx context.Context, db *sql.DB) healthCheckResult { r := healthCheckResult{name: "AlloyVersion"} diff --git a/internal/component/database_observability/mysql/collector/health_check_test.go b/internal/component/database_observability/mysql/collector/health_check_test.go index 39141b6767b..50bdcee3faf 100644 --- a/internal/component/database_observability/mysql/collector/health_check_test.go +++ b/internal/component/database_observability/mysql/collector/health_check_test.go @@ -46,7 +46,7 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - return len(lokiClient.Received()) >= 4 + return len(lokiClient.Received()) >= 3 }, 5*time.Second, 10*time.Millisecond) collector.Stop() @@ -61,9 +61,9 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) lokiEntries := lokiClient.Received() - require.GreaterOrEqual(t, len(lokiEntries), 4) + require.GreaterOrEqual(t, len(lokiEntries), 3) - for _, entry := range lokiEntries[:4] { + for _, entry := range lokiEntries[:3] { require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) require.Contains(t, entry.Line, `result="true"`) } @@ -76,18 +76,6 @@ func TestHealthCheck(t *testing.T) { customSetup func(mock sqlmock.Sqlmock) expectedResult string }{ - { - name: "performance schema disabled", - failingCheckName: "PerformaneSchemaEnabled", - customSetup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SHOW VARIABLES LIKE 'performance_schema'`). - WillReturnRows( - sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("performance_schema", "OFF"), - ) - }, - expectedResult: `result="false"`, - }, { name: "missing grants", failingCheckName: "RequiredGrantsPresent", @@ -141,7 +129,7 @@ func TestHealthCheck(t *testing.T) { require.NoError(t, err) require.Eventually(t, func() bool { - return len(lokiClient.Received()) >= 4 + return len(lokiClient.Received()) >= 3 }, 5*time.Second, 10*time.Millisecond) collector.Stop() @@ -179,16 +167,6 @@ func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSe } checks := []checkSetup{ - { - name: "PerformaneSchemaEnabled", - setup: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(`SHOW VARIABLES LIKE 'performance_schema'`). - WillReturnRows( - sqlmock.NewRows([]string{"Variable_name", "Value"}). - AddRow("performance_schema", "ON"), - ) - }, - }, { name: "RequiredGrantsPresent", setup: func(mock sqlmock.Sqlmock) {