diff --git a/docs/sources/reference/components/database_observability/database_observability.postgres.md b/docs/sources/reference/components/database_observability/database_observability.postgres.md index b04904ec43b..4d46112095b 100644 --- a/docs/sources/reference/components/database_observability/database_observability.postgres.md +++ b/docs/sources/reference/components/database_observability/database_observability.postgres.md @@ -55,6 +55,7 @@ You can use the following blocks with `database_observability.postgres`: | [`query_samples`][query_samples] | Configure the query samples collector. | no | | [`schema_details`][schema_details] | Configure the schema and table details collector. | no | | [`explain_plans`][explain_plans] | Configure the explain plans collector. | no | +| [`health_check`][health_check] | Configure the health check collector. | no | The > symbol indicates deeper levels of nesting. For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an `cloud_provider` block. @@ -65,6 +66,7 @@ For example, `cloud_provider` > `aws` refers to a `aws` block defined inside an [query_samples]: #query_samples [schema_details]: #schema_details [explain_plans]: #explain_plans +[health_check]: #health_check ### `cloud_provider` @@ -114,6 +116,12 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu | `per_collect_ratio` | `float64` | The ratio of queries to collect explain plans for. | `1.0` | no | | `explain_plan_exclude_schemas` | `list(string)` | Schemas to exclude from explain plans. | `[]` | no | +### `health_check` + +| Name | Type | Description | Default | Required | +|--------------------|------------|------------------------------------------------------|---------|----------| +| `collect_interval` | `duration` | How frequently to collect information from database. | `"1h"` | no | + ## Example ```alloy diff --git a/internal/component/database_observability/postgres/collector/health_check.go b/internal/component/database_observability/postgres/collector/health_check.go new file mode 100644 index 00000000000..632c7dae6b4 --- /dev/null +++ b/internal/component/database_observability/postgres/collector/health_check.go @@ -0,0 +1,205 @@ +package collector + +import ( + "context" + "database/sql" + "fmt" + "strconv" + "time" + + "github.com/go-kit/log" + "go.uber.org/atomic" + + "github.com/grafana/alloy/internal/build" + "github.com/grafana/alloy/internal/component/common/loki" + "github.com/grafana/alloy/internal/component/database_observability" + "github.com/grafana/alloy/internal/runtime/logging" + "github.com/grafana/alloy/internal/runtime/logging/level" +) + +const ( + HealthCheckCollector = "health_check" + OP_HEALTH_STATUS = "health_status" +) + +type HealthCheckArguments struct { + DB *sql.DB + CollectInterval time.Duration + EntryHandler loki.EntryHandler + + Logger log.Logger +} + +type HealthCheck struct { + dbConnection *sql.DB + collectInterval time.Duration + entryHandler loki.EntryHandler + logger log.Logger + + running *atomic.Bool + ctx context.Context + cancel context.CancelFunc +} + +func NewHealthCheck(args HealthCheckArguments) (*HealthCheck, error) { + h := &HealthCheck{ + dbConnection: args.DB, + collectInterval: args.CollectInterval, + entryHandler: args.EntryHandler, + logger: log.With(args.Logger, "collector", HealthCheckCollector), + running: &atomic.Bool{}, + } + return h, nil +} + +func (c *HealthCheck) Name() string { + return HealthCheckCollector +} + +func (c *HealthCheck) Start(ctx context.Context) error { + level.Debug(c.logger).Log("msg", "collector started") + + c.running.Store(true) + ctx, cancel := context.WithCancel(ctx) + c.ctx = ctx + c.cancel = cancel + + go func() { + defer func() { + c.Stop() + c.running.Store(false) + }() + + ticker := time.NewTicker(c.collectInterval) + + for { + c.fetchHealthChecks(c.ctx) + select { + case <-c.ctx.Done(): + return + case <-ticker.C: + // continue loop + } + } + }() + + return nil +} + +func (c *HealthCheck) Stopped() bool { + return !c.running.Load() +} + +// Stop should be kept idempotent +func (c *HealthCheck) Stop() { + if c.cancel != nil { + c.cancel() + } +} + +type healthCheckResult struct { + name string + result bool + value string + err error +} + +func (c *HealthCheck) fetchHealthChecks(ctx context.Context) { + checks := []func(context.Context, *sql.DB) healthCheckResult{ + checkAlloyVersion, + checkPgStatStatementsEnabled, + checkTrackActivityQuerySize, + checkMonitoringUserPrivileges, + } + + for _, checkFn := range checks { + result := checkFn(ctx, c.dbConnection) + if result.err != nil { + level.Error(c.logger).Log("msg", "health check failed", "check", result.name, "err", result.err) + continue + } + msg := fmt.Sprintf(`check="%s" result="%v" value="%s"`, result.name, result.result, result.value) + c.entryHandler.Chan() <- database_observability.BuildLokiEntry( + logging.LevelInfo, + OP_HEALTH_STATUS, + msg, + ) + } +} + +// checkAlloyVersion reports the running Alloy version. +func checkAlloyVersion(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "AlloyVersion"} + // Always succeeds; returns the version string embedded at build time. + r.result = true + r.value = build.Version + return r +} + +// checkPgStatStatementsEnabled verifies that the pg_stat_statements extension is enabled. +func checkPgStatStatementsEnabled(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "PgStatStatementsEnabled"} + const q = `SELECT * FROM pg_extension WHERE extname = 'pg_stat_statements'` + + rows, err := db.QueryContext(ctx, q) + if err != nil { + r.err = fmt.Errorf("query pg_extension: %w", err) + return r + } + defer rows.Close() + + if rows.Next() { + r.result = true + } + + if err := rows.Err(); err != nil { + r.err = fmt.Errorf("iterate pg_extension: %w", err) + } + + return r +} + +func checkTrackActivityQuerySize(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "TrackActivityQuerySize"} + const q = `SELECT setting FROM pg_settings WHERE name = 'track_activity_query_size'` + const expectedSize = 4096 + + var sizeString string + if err := db.QueryRowContext(ctx, q).Scan(&sizeString); err != nil { + r.err = fmt.Errorf("query track_activity_query_size: %w", err) + return r + } + + size, err := strconv.Atoi(sizeString) + if err != nil { + r.err = fmt.Errorf("parse track_activity_query_size: %w", err) + return r + } + + r.result = size >= expectedSize + r.value = sizeString + + return r +} + +func checkMonitoringUserPrivileges(ctx context.Context, db *sql.DB) healthCheckResult { + r := healthCheckResult{name: "MonitoringUserPrivileges"} + const q = `SELECT * FROM pg_stat_statements LIMIT 1` + + rows, err := db.QueryContext(ctx, q) + if err != nil { + r.err = fmt.Errorf("query pg_stat_statements: %w", err) + return r + } + defer rows.Close() + + if rows.Next() { + r.result = true + } + + if err := rows.Err(); err != nil { + r.err = fmt.Errorf("iterate pg_stat_statements: %w", err) + } + + return r +} diff --git a/internal/component/database_observability/postgres/collector/health_check_test.go b/internal/component/database_observability/postgres/collector/health_check_test.go new file mode 100644 index 00000000000..fb5ad41a6d4 --- /dev/null +++ b/internal/component/database_observability/postgres/collector/health_check_test.go @@ -0,0 +1,198 @@ +package collector + +import ( + "os" + "strings" + "testing" + "time" + + "github.com/DATA-DOG/go-sqlmock" + "github.com/go-kit/log" + "github.com/prometheus/common/model" + "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/grafana/alloy/internal/component/common/loki" +) + +func TestHealthCheck(t *testing.T) { + defer goleak.VerifyNone(t) + + t.Run("all checks pass", func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New( + sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), + sqlmock.MonitorPingsOption(true), + ) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + + collector, err := NewHealthCheck(HealthCheckArguments{ + DB: db, + CollectInterval: 100 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + require.NotNil(t, collector) + + // Setup all checks to pass (no custom expectation) + setupExpectQueryAssertions("", mock, nil) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 4 + }, 5*time.Second, 10*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 10*time.Millisecond) + + lokiClient.Stop() + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + require.GreaterOrEqual(t, len(lokiEntries), 4) + + for _, entry := range lokiEntries[:4] { + require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) + require.Contains(t, entry.Line, `result="true"`) + } + }) + + t.Run("individual check failures", func(t *testing.T) { + testCases := []struct { + name string + failingCheckName string + customSetup func(mock sqlmock.Sqlmock) + expectedResult string + }{ + { + name: "pg_stat_statements not installed", + failingCheckName: "PgStatStatementsEnabled", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT * FROM pg_extension WHERE extname = 'pg_stat_statements'`). + WillReturnRows(sqlmock.NewRows([]string{"oid", "extname", "extowner", "extnamespace", "extrelocatable", "extversion"})) + }, + expectedResult: `result="false"`, + }, + { + name: "track_activity_query_size too small", + failingCheckName: "TrackActivityQuerySize", + customSetup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT setting FROM pg_settings WHERE name = 'track_activity_query_size'`). + WillReturnRows(sqlmock.NewRows([]string{"track_activity_query_size"}). + AddRow("1024")) + }, + expectedResult: `result="false"`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New( + sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual), + sqlmock.MonitorPingsOption(true), + ) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + + collector, err := NewHealthCheck(HealthCheckArguments{ + DB: db, + CollectInterval: 100 * time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + + setupExpectQueryAssertions(tc.failingCheckName, mock, tc.customSetup) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 4 + }, 5*time.Second, 10*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 10*time.Millisecond) + + lokiClient.Stop() + + err = mock.ExpectationsWereMet() + require.NoError(t, err) + + lokiEntries := lokiClient.Received() + + found := false + for _, entry := range lokiEntries { + if strings.Contains(entry.Line, tc.failingCheckName) { + require.Equal(t, model.LabelSet{"op": OP_HEALTH_STATUS}, entry.Labels) + require.Contains(t, entry.Line, tc.expectedResult) + found = true + break + } + } + require.True(t, found) + }) + } + }) +} + +func setupExpectQueryAssertions(checkName string, mock sqlmock.Sqlmock, customSetup func(mock sqlmock.Sqlmock)) { + type checkSetup struct { + name string + setup func(mock sqlmock.Sqlmock) + } + + checks := []checkSetup{ + { + name: "PgStatStatementsEnabled", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT * FROM pg_extension WHERE extname = 'pg_stat_statements'`). + WillReturnRows(sqlmock.NewRows([]string{"oid", "extname", "extowner", "extnamespace", "extrelocatable", "extversion"}). + AddRow(1, "pg_stat_statements", 10, 11, false, "1.9")) + }, + }, + { + name: "TrackActivityQuerySize", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT setting FROM pg_settings WHERE name = 'track_activity_query_size'`). + WillReturnRows(sqlmock.NewRows([]string{"setting"}). + AddRow("4096")) + }, + }, + { + name: "MonitoringUserPrivileges", + setup: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(`SELECT * FROM pg_stat_statements LIMIT 1`). + WillReturnRows(sqlmock.NewRows([]string{"userid", "dbid", "queryid"}). + AddRow(1, 1, 123)) + }, + }, + } + + for _, check := range checks { + if check.name == checkName { + customSetup(mock) + continue + } + check.setup(mock) + } +} diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index de7c6eb18a9..4c1cf899ffc 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -71,6 +71,7 @@ type Arguments struct { QueryTablesArguments QueryTablesArguments `alloy:"query_details,block,optional"` SchemaDetailsArguments SchemaDetailsArguments `alloy:"schema_details,block,optional"` ExplainPlanArguments ExplainPlanArguments `alloy:"explain_plans,block,optional"` + HealthCheckArguments HealthCheckArguments `alloy:"health_check,block,optional"` } type CloudProvider struct { @@ -117,6 +118,9 @@ var DefaultArguments = Arguments{ CollectInterval: 1 * time.Minute, PerCollectRatio: 1.0, }, + HealthCheckArguments: HealthCheckArguments{ + CollectInterval: 1 * time.Hour, + }, } type ExplainPlanArguments struct { @@ -125,6 +129,10 @@ type ExplainPlanArguments struct { ExplainPlanExcludeSchemas []string `alloy:"explain_plan_exclude_schemas,attr,optional"` } +type HealthCheckArguments struct { + CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` +} + func (a *Arguments) SetToDefault() { *a = DefaultArguments } @@ -464,6 +472,22 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud c.collectors = append(c.collectors, epCollector) } + // HealthCheck collector is always enabled + hcCollector, err := collector.NewHealthCheck(collector.HealthCheckArguments{ + DB: c.dbConnection, + CollectInterval: c.args.HealthCheckArguments.CollectInterval, + EntryHandler: entryHandler, + Logger: c.opts.Logger, + }) + if err != nil { + logStartError(collector.HealthCheckCollector, "create", err) + } else { + if err := hcCollector.Start(context.Background()); err != nil { + logStartError(collector.HealthCheckCollector, "start", err) + } + c.collectors = append(c.collectors, hcCollector) + } + if len(startErrors) > 0 { return fmt.Errorf("failed to start some collectors: %s", strings.Join(startErrors, ", ")) }