Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
119 changes: 100 additions & 19 deletions internal/component/database_observability/postgres/component.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import (
"crypto/sha256"
"database/sql"
"fmt"
"log/slog"
"net/http"
"path"
"strings"
"sync"
"time"

"github.com/lib/pq"
pg_collector "github.com/prometheus-community/postgres_exporter/collector"
pg_exporter "github.com/prometheus-community/postgres_exporter/exporter"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/prometheus/common/model"
Expand All @@ -23,7 +26,9 @@ import (
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/grafana/alloy/internal/component/database_observability/postgres/collector"
"github.com/grafana/alloy/internal/component/discovery"
exporter_postgres "github.com/grafana/alloy/internal/component/prometheus/exporter/postgres"
"github.com/grafana/alloy/internal/featuregate"
"github.com/grafana/alloy/internal/runtime/logging"
"github.com/grafana/alloy/internal/runtime/logging/level"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/syntax"
Expand Down Expand Up @@ -68,12 +73,13 @@ type Arguments struct {
ExcludeDatabases []string `alloy:"exclude_databases,attr,optional"`
ExcludeUsers []string `alloy:"exclude_users,attr,optional"`

CloudProvider *CloudProvider `alloy:"cloud_provider,block,optional"`
QuerySampleArguments QuerySampleArguments `alloy:"query_samples,block,optional"`
QueryDetailsArguments QueryDetailsArguments `alloy:"query_details,block,optional"`
SchemaDetailsArguments SchemaDetailsArguments `alloy:"schema_details,block,optional"`
ExplainPlansArguments ExplainPlansArguments `alloy:"explain_plans,block,optional"`
HealthCheckArguments HealthCheckArguments `alloy:"health_check,block,optional"`
CloudProvider *CloudProvider `alloy:"cloud_provider,block,optional"`
QuerySampleArguments QuerySampleArguments `alloy:"query_samples,block,optional"`
QueryDetailsArguments QueryDetailsArguments `alloy:"query_details,block,optional"`
SchemaDetailsArguments SchemaDetailsArguments `alloy:"schema_details,block,optional"`
ExplainPlansArguments ExplainPlansArguments `alloy:"explain_plans,block,optional"`
HealthCheckArguments HealthCheckArguments `alloy:"health_check,block,optional"`
PrometheusExporter *PrometheusExporterArguments `alloy:"prometheus_exporter,block,optional"`
}

type CloudProvider struct {
Expand Down Expand Up @@ -145,6 +151,24 @@ type HealthCheckArguments struct {
CollectInterval time.Duration `alloy:"collect_interval,attr,optional"`
}

// PrometheusExporterArguments configures the embedded postgres_exporter scrapers.
// When this block is present, postgres_exporter metrics are served alongside the
// component's own metrics at the same /metrics endpoint.
//
// It is a distinct type (not an embedded struct) because the Alloy syntax
// system does not support anonymous/embedded fields.
// Note: data_source_names is ignored; the component's data_source_name is always used.
type PrometheusExporterArguments exporter_postgres.Arguments

func (a *PrometheusExporterArguments) SetToDefault() {
*a = PrometheusExporterArguments(exporter_postgres.DefaultArguments)
}

func (a *PrometheusExporterArguments) Validate() error {
args := exporter_postgres.Arguments(*a)
return args.Validate()
}

func (a *Arguments) SetToDefault() {
*a = DefaultArguments
}
Expand All @@ -154,6 +178,9 @@ func (a *Arguments) Validate() error {
if err != nil {
return err
}
if a.PrometheusExporter != nil && len(a.Targets) > 0 {
return fmt.Errorf("prometheus_exporter and targets are mutually exclusive: use prometheus_exporter to embed the exporter, or targets to scrape an external one")
}
return nil
}

Expand All @@ -176,19 +203,20 @@ type Collector interface {
}

type Component struct {
opts component.Options
args Arguments
handler loki.LogsReceiver
fanout *loki.Fanout
mut sync.RWMutex
registry *prometheus.Registry
baseTarget discovery.Target
collectors []Collector
instanceKey string
dbConnection *sql.DB
healthErr *atomic.String
openSQL func(driverName, dataSourceName string) (*sql.DB, error)
logsReceiver loki.LogsReceiver
opts component.Options
args Arguments
handler loki.LogsReceiver
fanout *loki.Fanout
mut sync.RWMutex
registry *prometheus.Registry
baseTarget discovery.Target
collectors []Collector
instanceKey string
dbConnection *sql.DB
healthErr *atomic.String
openSQL func(driverName, dataSourceName string) (*sql.DB, error)
logsReceiver loki.LogsReceiver
exporterCollectors []prometheus.Collector
}

func New(opts component.Options, args Arguments) (*Component, error) {
Expand Down Expand Up @@ -367,6 +395,59 @@ func (c *Component) connectAndStartCollectors(ctx context.Context) error {
cp = cloudProvider
}

for _, col := range c.exporterCollectors {
c.registry.Unregister(col)
}
c.exporterCollectors = nil

if c.args.PrometheusExporter != nil {
exporterArgs := exporter_postgres.Arguments(*c.args.PrometheusExporter)
slogLogger := slog.New(logging.NewSlogGoKitHandler(c.opts.Logger))
dsn := string(c.args.DataSourceName)

e := pg_exporter.NewExporter(
[]string{dsn},
slogLogger,
pg_exporter.DisableDefaultMetrics(exporterArgs.DisableDefaultMetrics),
pg_exporter.WithUserQueriesPath(exporterArgs.CustomQueriesConfigPath),
pg_exporter.DisableSettingsMetrics(exporterArgs.DisableSettingsMetrics),
pg_exporter.AutoDiscoverDatabases(true),
pg_exporter.ExcludeDatabases(c.args.ExcludeDatabases),
pg_exporter.WithMetricPrefix("pg"),
)
if err := c.registry.Register(e); err != nil {
return fmt.Errorf("failed to register prometheus_exporter: %w", err)
}
c.exporterCollectors = append(c.exporterCollectors, e)

if !exporterArgs.DisableDefaultMetrics {
collectorOpts := []pg_collector.Option{pg_collector.WithCollectionTimeout("10s")}
if exporterArgs.StatStatementFlags != nil {
collectorOpts = append(collectorOpts, pg_collector.WithStatStatementsConfig(pg_collector.StatStatementsConfig{
IncludeQuery: exporterArgs.StatStatementFlags.IncludeQuery,
QueryLength: exporterArgs.StatStatementFlags.QueryLength,
Limit: exporterArgs.StatStatementFlags.Limit,
ExcludeDatabases: exporterArgs.StatStatementFlags.ExcludeDatabases,
ExcludeUsers: exporterArgs.StatStatementFlags.ExcludeUsers,
}))
}
col, err := pg_collector.NewPostgresCollector(
slogLogger,
c.args.ExcludeDatabases,
dsn,
exporterArgs.EnabledCollectors,
collectorOpts...,
)
if err != nil {
return fmt.Errorf("failed to create postgres collector: %w", err)
}
if err := c.registry.Register(col); err != nil {
return fmt.Errorf("failed to register postgres collector: %w", err)
}
c.exporterCollectors = append(c.exporterCollectors, col)
}
}

allTargets := append([]discovery.Target{c.baseTarget}, c.args.Targets...)
targets := make([]discovery.Target, 0, len(allTargets))
for _, t := range allTargets {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/grafana/alloy/internal/component/database_observability"
"github.com/grafana/alloy/internal/component/database_observability/postgres/collector"
"github.com/grafana/alloy/internal/component/discovery"
exporter_postgres "github.com/grafana/alloy/internal/component/prometheus/exporter/postgres"
http_service "github.com/grafana/alloy/internal/service/http"
"github.com/grafana/alloy/syntax"
"github.com/grafana/alloy/syntax/alloytypes"
Expand Down Expand Up @@ -734,3 +735,62 @@ func TestPostgres_Reconnection(t *testing.T) {
}
})
}

func Test_PrometheusExporterBlock(t *testing.T) {
t.Run("absent when not specified", func(t *testing.T) {
cfg := `
data_source_name = "postgresql://user:pass@localhost:5432/db"
forward_to = []
targets = []
`
var args Arguments
err := syntax.Unmarshal([]byte(cfg), &args)
require.NoError(t, err)
assert.Nil(t, args.PrometheusExporter)
})

t.Run("present with defaults when empty block", func(t *testing.T) {
cfg := `
data_source_name = "postgresql://user:pass@localhost:5432/db"
forward_to = []
targets = []
prometheus_exporter {}
`
var args Arguments
err := syntax.Unmarshal([]byte(cfg), &args)
require.NoError(t, err)
require.NotNil(t, args.PrometheusExporter)
exporterArgs := exporter_postgres.Arguments(*args.PrometheusExporter)
assert.False(t, exporterArgs.DisableDefaultMetrics)
assert.False(t, exporterArgs.DisableSettingsMetrics)
})

t.Run("present with explicit config", func(t *testing.T) {
cfg := `
data_source_name = "postgresql://user:pass@localhost:5432/db"
forward_to = []
targets = []
prometheus_exporter {
disable_settings_metrics = true
}
`
var args Arguments
err := syntax.Unmarshal([]byte(cfg), &args)
require.NoError(t, err)
require.NotNil(t, args.PrometheusExporter)
exporterArgs := exporter_postgres.Arguments(*args.PrometheusExporter)
assert.True(t, exporterArgs.DisableSettingsMetrics)
})

t.Run("error when both prometheus_exporter and targets are set", func(t *testing.T) {
cfg := `
data_source_name = "postgresql://user:pass@localhost:5432/db"
forward_to = []
targets = [{"__address__" = "localhost:9187"}]
prometheus_exporter {}
`
var args Arguments
err := syntax.Unmarshal([]byte(cfg), &args)
require.ErrorContains(t, err, "prometheus_exporter and targets are mutually exclusive")
})
}
Loading