diff --git a/docs/sources/reference/components/database_observability/database_observability.mysql.md b/docs/sources/reference/components/database_observability/database_observability.mysql.md index 9691571d000..655670bc177 100644 --- a/docs/sources/reference/components/database_observability/database_observability.mysql.md +++ b/docs/sources/reference/components/database_observability/database_observability.mysql.md @@ -62,6 +62,7 @@ You can use the following blocks with `database_observability.mysql`: | [`explain_plans`][explain_plans] | Configure the explain plans collector. | no | | [`locks`][locks] | Configure the locks collector. | no | | [`query_samples`][query_samples] | Configure the query samples collector. | no | +| [`health_check`][health_check] | Configure the health check collector. | no | The > symbol indicates deeper levels of nesting. For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an `cloud_provider` block. @@ -75,6 +76,7 @@ For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an [locks]: #locks [query_samples]: #query_samples [setup_actors]: #setup_actors +[health_check]: #health_check ### `cloud_provider` @@ -146,6 +148,13 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu | `collect_interval` | `duration` | How frequently to check if `setup_actors` are configured correctly. | `"1h"` | no | +### `health_checks` + +| Name | Type | Description | Default | Required | +| -------------------------- | ---------- | ---------------------------------------------------------------------- | ------- | -------- | +| `collect_interval` | `duration` | How frequently to run health checks. | `"1h"` | no | + + ## Example ```alloy diff --git a/internal/component/database_observability/mysql/collector/health_check.go b/internal/component/database_observability/mysql/collector/health_check.go new file mode 100644 index 00000000000..3b9150be605 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/health_check.go @@ -0,0 +1,204 @@ +package collector + +import ( + "context" + "database/sql" + "fmt" + "strings" + "time" + + "github.com/go-kit/log" + "go.uber.org/atomic" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +const ( + HealthCheckCollector = "health_check" + OP_HEALTH_STATUS = "health_status" +) + +type HealthCheckArguments struct { + DB *sql.DB + CollectInterval time.Duration + EntryHandler loki.EntryHandler + + Logger log.Logger +} + +type HealthCheck struct { + dbConnection *sql.DB + collectInterval time.Duration + entryHandler loki.EntryHandler + logger log.Logger + + running *atomic.Bool + ctx context.Context + cancel context.CancelFunc +} + +func NewHealthCheck(args HealthCheckArguments) (*HealthCheck, error) { + h := &HealthCheck{ + dbConnection: args.DB, + collectInterval: args.CollectInterval, + entryHandler: args.EntryHandler, + logger: log.With(args.Logger, "collector", HealthCheckCollector), + running: &atomic.Bool{}, + } + return h, nil +} + +func (c *HealthCheck) Name() string { + return HealthCheckCollector +} + +func (c *HealthCheck) Start(ctx context.Context) error { + level.Debug(c.logger).Log("msg", "collector started") + + c.running.Store(true) + ctx, cancel := context.WithCancel(ctx) + c.ctx = ctx + c.cancel = cancel + + go func() { + defer func() { + c.Stop() + c.running.Store(false) + }() + + ticker := time.NewTicker(c.collectInterval) + + for { + c.fetchHealthChecks(c.ctx) + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // continue loop + } + } + }() + + return nil +} + +func (c *HealthCheck) Stopped() bool { + return !c.running.Load() +} + +// Stop should be kept idempotent +func (c *HealthCheck) Stop() { + if c.cancel != nil { + c.cancel() + } +} + +type healthCheckResult struct { + name string + result bool + value string + err error +} + +func (c *HealthCheck) fetchHealthChecks(ctx context.Context) { + checks := []func(context.Context, *sql.DB) healthCheckResult{ + checkAlloyVersion, + checkRequiredGrants, + checkEventsStatementsDigestHasRows, + } + + for _, checkFn := range checks { + result := checkFn(ctx, c.dbConnection) + if result.err != nil { + level.Error(c.logger).Log("msg", "health check failed", "check", result.name, "err", result.err) + continue + } + msg := fmt.Sprintf(`check="%s" result="%v" value="%s"`, result.name, result.result, result.value) + c.entryHandler.Chan() <- database_observability.BuildLokiEntry( + logging.LevelInfo, + OP_HEALTH_STATUS, + msg, + ) + } +} + +// checkAlloyVersion reports the running Alloy version. +func checkAlloyVersion(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "AlloyVersion"} + // Always succeeds; returns the version string embedded at build time. + r.result = true + r.value = build.Version + return r +} + +// checkRequiredGrants verifies required privileges are present. +func checkRequiredGrants(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "RequiredGrantsPresent"} + req := map[string]bool{ + "PROCESS": false, + "REPLICATION CLIENT": false, + "SELECT": false, + "SHOW VIEW": false, + } + + rows, err := db.QueryContext(ctx, "SHOW GRANTS") + if err != nil { + r.err = fmt.Errorf("SHOW GRANTS: %w", err) + return r + } + defer rows.Close() + + for rows.Next() { + var grantLine string + if err := rows.Scan(&grantLine); err != nil { + r.err = fmt.Errorf("scan SHOW GRANTS: %w", err) + return r + } + up := strings.ToUpper(grantLine) + + // Mark individual privileges if present on *.* scope. + for k := range req { + if strings.Contains(up, " ON *.*") && strings.Contains(up, k) { + req[k] = true + } + } + } + if err := rows.Err(); err != nil { + r.err = fmt.Errorf("iterate SHOW GRANTS: %w", err) + return r + } + + r.result = true + for k, found := range req { + if !found { + r.result = false + if r.value == "" { + r.value = "missing: " + k + } else { + r.value += "," + k + } + } + } + + return r +} + +// checkEventsStatementsDigestHasRows ensures performance_schema.events_statements_summary_by_digest has rows. +func checkEventsStatementsDigestHasRows(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "PerformanceSchemaHasRows"} + const q = `SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest` + var rowCount int64 + if err := db.QueryRowContext(ctx, q).Scan(&rowCount); err != nil { + r.err = err + return r + } + if rowCount == 0 { + return r + } + r.result = true + return r +} diff --git a/internal/component/database_observability/mysql/collector/health_check_test.go b/internal/component/database_observability/mysql/collector/health_check_test.go new file mode 100644 index 00000000000..50bdcee3faf --- /dev/null +++ b/internal/component/database_observability/mysql/collector/health_check_test.go @@ -0,0 +1,199 @@ +package collector + +import ( + "os" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +func TestHealthCheck(t *testing.T) { + defer goleak.VerifyNone(t) + + t.Run("all checks pass", func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New( + sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), + sqlmock.MonitorPingsOption(true), + ) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + + collector, err := NewHealthCheck(HealthCheckArguments{ + DB: db, + CollectInterval: 100 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + require.NotNil(t, collector) + + // Setup all checks to pass (no custom expectation) + setupExpectQueryAssertions("", mock, nil) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 3 + }, 5*time.Second, 10*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 10*time.Millisecond) + + lokiClient.Stop() + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + require.GreaterOrEqual(t, len(lokiEntries), 3) + + for _, entry := range lokiEntries[:3] { + require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) + require.Contains(t, entry.Line, `result="true"`) + } + }) + + t.Run("individual check failures", func(t *testing.T) { + testCases := []struct { + name string + failingCheckName string + customSetup func(mock sqlmock.Sqlmock) + expectedResult string + }{ + { + name: "missing grants", + failingCheckName: "RequiredGrantsPresent", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SHOW GRANTS`). + WillReturnRows( + sqlmock.NewRows([]string{"Grants"}). + AddRow("GRANT SELECT, SHOW VIEW ON *.* TO 'user'@'host'"), + ) + }, + expectedResult: `result="false"`, + }, + { + name: "no rows in events statements digest", + failingCheckName: "PerformanceSchemaHasRows", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest`). + WillReturnRows( + sqlmock.NewRows([]string{"COUNT(*)"}). + AddRow(0), + ) + }, + expectedResult: `result="false"`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New( + sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), + sqlmock.MonitorPingsOption(true), + ) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + + collector, err := NewHealthCheck(HealthCheckArguments{ + DB: db, + CollectInterval: 100 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + + setupExpectQueryAssertions(tc.failingCheckName, mock, tc.customSetup) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 3 + }, 5*time.Second, 10*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 10*time.Millisecond) + + lokiClient.Stop() + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + + found := false + for _, entry := range lokiEntries { + if strings.Contains(entry.Line, tc.failingCheckName) { + require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) + require.Contains(t, entry.Line, tc.expectedResult) + found = true + break + } + } + require.True(t, found) + }) + } + }) +} + +func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSetup func(mock sqlmock.Sqlmock)) { + type checkSetup struct { + name string + setup func(mock sqlmock.Sqlmock) + } + + checks := []checkSetup{ + { + name: "RequiredGrantsPresent", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SHOW GRANTS`). + WillReturnRows( + sqlmock.NewRows([]string{"Grants"}). + AddRow("GRANT PROCESS, REPLICATION CLIENT, SELECT, SHOW VIEW ON *.* TO 'user'@'host'"), + ) + }, + }, + { + name: "PerformanceSchemaHasRows", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT COUNT(*) FROM performance_schema.events_statements_summary_by_digest`). + WillReturnRows( + sqlmock.NewRows([]string{"COUNT(*)"}). + AddRow(100), + ) + }, + }, + } + + for _, check := range checks { + if check.name == checkName { + customSetup(mock) + continue + } + check.setup(mock) + } +} diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index 2a4ee73b2b1..0bbb96f4045 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -70,6 +70,7 @@ type Arguments struct { ExplainPlansArguments ExplainPlansArguments `alloy:"explain_plans,block,optional"` LocksArguments LocksArguments `alloy:"locks,block,optional"` QuerySamplesArguments QuerySamplesArguments `alloy:"query_samples,block,optional"` + HealthCheckArguments HealthCheckArguments `alloy:"health_check,block,optional"` } type CloudProvider struct { @@ -119,6 +120,10 @@ type QuerySamplesArguments struct { SetupConsumersCheckInterval time.Duration `alloy:"setup_consumers_check_interval,attr,optional"` } +type HealthCheckArguments struct { + CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` +} + var DefaultArguments = Arguments{ AllowUpdatePerfSchemaSettings: false, @@ -159,6 +164,9 @@ var DefaultArguments = Arguments{ AutoEnableSetupConsumers: false, SetupConsumersCheckInterval: 1 * time.Hour, }, + HealthCheckArguments: HealthCheckArguments{ + CollectInterval: 1 * time.Hour, + }, } func (a *Arguments) SetToDefault() { @@ -573,6 +581,22 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse c.collectors = append(c.collectors, ciCollector) } + // HealthCheck collector is always enabled + hcCollector, err := collector.NewHealthCheck(collector.HealthCheckArguments{ + DB: c.dbConnection, + CollectInterval: c.args.HealthCheckArguments.CollectInterval, + EntryHandler: entryHandler, + Logger: c.opts.Logger, + }) + if err != nil { + logStartError(collector.HealthCheckCollector, "create", err) + } else { + if err := hcCollector.Start(context.Background()); err != nil { + logStartError(collector.HealthCheckCollector, "start", err) + } + c.collectors = append(c.collectors, hcCollector) + } + if len(startErrors) > 0 { return fmt.Errorf("failed to start some collectors: %s", strings.Join(startErrors, ", ")) } diff --git a/internal/component/database_observability/mysql/component_test.go b/internal/component/database_observability/mysql/component_test.go index 95193f9bfb1..bc711fe5eb3 100644 --- a/internal/component/database_observability/mysql/component_test.go +++ b/internal/component/database_observability/mysql/component_test.go @@ -324,6 +324,9 @@ func TestMySQL_StartCollectors_ReportsUnhealthy_StackedErrors(t *testing.T) { CollectInterval: time.Second, Threshold: time.Second, }, + HealthCheckArguments: HealthCheckArguments{ + CollectInterval: 1 * time.Hour, + }, } var gotExports cmp.Exports opts := cmp.Options{