diff --git a/internal/component/database_observability/postgres/collector/logs.go b/internal/component/database_observability/postgres/collector/logs.go index ddffc9ea33b..0239d9ea785 100644 --- a/internal/component/database_observability/postgres/collector/logs.go +++ b/internal/component/database_observability/postgres/collector/logs.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "regexp" + "slices" "strings" "sync" "time" @@ -31,25 +32,19 @@ var logFormatRegex = regexp.MustCompile( `[A-Z0-9]{5}:`, ) -var supportedSeverities = map[string]bool{ - "ERROR": true, - "FATAL": true, - "PANIC": true, -} - -type ParsedError struct { - ErrorSeverity string - SQLStateCode string - SQLStateClass string - User string - DatabaseName string +var supportedSeverities = map[string]struct{}{ + "ERROR": {}, + "FATAL": {}, + "PANIC": {}, } type LogsArguments struct { - Receiver loki.LogsReceiver - EntryHandler loki.EntryHandler - Logger log.Logger - Registry *prometheus.Registry + Receiver loki.LogsReceiver + EntryHandler loki.EntryHandler + Logger log.Logger + Registry *prometheus.Registry + ExcludeDatabases []string + ExcludeUsers []string } type Logs struct { @@ -57,7 +52,9 @@ type Logs struct { entryHandler loki.EntryHandler registry *prometheus.Registry - receiver loki.LogsReceiver + receiver loki.LogsReceiver + excludeDatabases []string + excludeUsers []string errorsBySQLState *prometheus.CounterVec parseErrors prometheus.Counter @@ -79,14 +76,16 @@ func NewLogs(args LogsArguments) (*Logs, error) { ctx, cancel := context.WithCancel(context.Background()) l := &Logs{ - logger: log.With(args.Logger, "collector", LogsCollector), - entryHandler: args.EntryHandler, - registry: args.Registry, - receiver: args.Receiver, - ctx: ctx, - cancel: cancel, - stopped: atomic.NewBool(false), - startTime: time.Now(), + logger: log.With(args.Logger, "collector", LogsCollector), + entryHandler: args.EntryHandler, + registry: args.Registry, + receiver: args.Receiver, + excludeDatabases: args.ExcludeDatabases, + excludeUsers: args.ExcludeUsers, + ctx: ctx, + cancel: cancel, + stopped: atomic.NewBool(false), + startTime: time.Now(), } l.initMetrics() @@ -220,10 +219,18 @@ func (l *Logs) parseTextLog(entry loki.Entry) error { database := strings.TrimSpace(afterAt[:pidMarkerIdx]) + if slices.Contains(l.excludeDatabases, database) { + return nil + } + beforeAt := line[:atIdx] lastColonBeforeAt := strings.LastIndex(beforeAt, ":") user := strings.TrimSpace(beforeAt[lastColonBeforeAt+1:]) + if slices.Contains(l.excludeUsers, user) { + return nil + } + // Extract SQLSTATE from format: [pid]:line_number:SQLSTATE:... pidEndIdx := strings.Index(afterAt, "]") afterPid := afterAt[pidEndIdx+1:] @@ -250,19 +257,17 @@ func (l *Logs) parseTextLog(entry loki.Entry) error { return nil } - if !supportedSeverities[severity] { + if _, ok := supportedSeverities[severity]; !ok { return nil } - parsed := &ParsedError{ - ErrorSeverity: severity, - SQLStateCode: sqlstateCode, - SQLStateClass: sqlstateClass, - User: user, - DatabaseName: database, - } - - l.updateMetrics(parsed) + l.errorsBySQLState.WithLabelValues( + severity, + sqlstateCode, + sqlstateClass, + database, + user, + ).Inc() return nil } @@ -299,16 +304,6 @@ func extractSeverity(message string) string { return "" } -func (l *Logs) updateMetrics(parsed *ParsedError) { - l.errorsBySQLState.WithLabelValues( - parsed.ErrorSeverity, - parsed.SQLStateCode, - parsed.SQLStateClass, - parsed.DatabaseName, - parsed.User, - ).Inc() -} - func truncateString(s string, maxLen int) string { if len(s) <= maxLen { return s diff --git a/internal/component/database_observability/postgres/collector/logs_test.go b/internal/component/database_observability/postgres/collector/logs_test.go index c8c8bedcd76..6ce368ad25b 100644 --- a/internal/component/database_observability/postgres/collector/logs_test.go +++ b/internal/component/database_observability/postgres/collector/logs_test.go @@ -671,3 +671,97 @@ func TestLogsCollector_SkipsOnlyHistoricalLogs(t *testing.T) { } require.Equal(t, float64(0), totalCount, "historical logs must not produce metrics") } + +func TestLogsCollector_ExcludeDatabases(t *testing.T) { + entryHandler := loki.NewEntryHandler(make(chan loki.Entry, 10), func() {}) + registry := prometheus.NewRegistry() + + collector, err := NewLogs(LogsArguments{ + Receiver: loki.NewLogsReceiver(), + EntryHandler: entryHandler, + Logger: log.NewNopLogger(), + Registry: registry, + ExcludeDatabases: []string{"excluded_db"}, + }) + require.NoError(t, err) + + startTime := collector.startTime + err = collector.Start(context.Background()) + require.NoError(t, err) + defer collector.Stop() + + ts := startTime.Add(10 * time.Second).UTC() + ts1 := ts.Format("2006-01-02 15:04:05.000 MST") + ts2 := ts.Add(-1 * time.Second).Format("2006-01-02 15:04:05 MST") + + excludedLog := ts1 + ":10.0.1.5(12345):app-user@excluded_db:[9112]:4:57014:" + ts2 + ":25/112:0:693c34cb.2398::psqlERROR: canceling statement" + allowedLog := ts1 + ":10.0.1.5(12345):app-user@allowed_db:[9113]:5:57014:" + ts2 + ":25/113:0:693c34cb.2399::psqlERROR: canceling statement" + + collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: excludedLog, Timestamp: time.Now()}} + collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: allowedLog, Timestamp: time.Now()}} + + time.Sleep(200 * time.Millisecond) + + mfs, _ := registry.Gather() + var totalCount float64 + for _, mf := range mfs { + if mf.GetName() == "database_observability_pg_errors_total" { + for _, metric := range mf.GetMetric() { + labels := make(map[string]string) + for _, label := range metric.GetLabel() { + labels[label.GetName()] = label.GetValue() + } + totalCount += metric.GetCounter().GetValue() + require.Equal(t, "allowed_db", labels["datname"], "only allowed_db should produce metrics") + } + } + } + require.Equal(t, float64(1), totalCount, "only the non-excluded database log should be counted") +} + +func TestLogsCollector_ExcludeUsers(t *testing.T) { + entryHandler := loki.NewEntryHandler(make(chan loki.Entry, 10), func() {}) + registry := prometheus.NewRegistry() + + collector, err := NewLogs(LogsArguments{ + Receiver: loki.NewLogsReceiver(), + EntryHandler: entryHandler, + Logger: log.NewNopLogger(), + Registry: registry, + ExcludeUsers: []string{"excluded_user"}, + }) + require.NoError(t, err) + + startTime := collector.startTime + err = collector.Start(context.Background()) + require.NoError(t, err) + defer collector.Stop() + + ts := startTime.Add(10 * time.Second).UTC() + ts1 := ts.Format("2006-01-02 15:04:05.000 MST") + ts2 := ts.Add(-1 * time.Second).Format("2006-01-02 15:04:05 MST") + + excludedLog := ts1 + ":10.0.1.5(12345):excluded_user@testdb:[9112]:4:57014:" + ts2 + ":25/112:0:693c34cb.2398::psqlERROR: canceling statement" + allowedLog := ts1 + ":10.0.1.5(12345):allowed_user@testdb:[9113]:5:57014:" + ts2 + ":25/113:0:693c34cb.2399::psqlERROR: canceling statement" + + collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: excludedLog, Timestamp: time.Now()}} + collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: allowedLog, Timestamp: time.Now()}} + + time.Sleep(200 * time.Millisecond) + + mfs, _ := registry.Gather() + var totalCount float64 + for _, mf := range mfs { + if mf.GetName() == "database_observability_pg_errors_total" { + for _, metric := range mf.GetMetric() { + labels := make(map[string]string) + for _, label := range metric.GetLabel() { + labels[label.GetName()] = label.GetValue() + } + totalCount += metric.GetCounter().GetValue() + require.Equal(t, "allowed_user", labels["user"], "only allowed_user should produce metrics") + } + } + } + require.Equal(t, float64(1), totalCount, "only the non-excluded user log should be counted") +} diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index 91c7b47492f..1831adc6840 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -561,10 +561,12 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud // Logs collector is always enabled logsCollector, err := collector.NewLogs(collector.LogsArguments{ - Receiver: c.logsReceiver, - EntryHandler: loki.NewEntryHandler(c.logsReceiver.Chan(), func() {}), - Logger: c.opts.Logger, - Registry: c.registry, + Receiver: c.logsReceiver, + EntryHandler: loki.NewEntryHandler(c.logsReceiver.Chan(), func() {}), + Logger: c.opts.Logger, + Registry: c.registry, + ExcludeDatabases: c.args.ExcludeDatabases, + ExcludeUsers: c.args.ExcludeUsers, }) if err != nil { logStartError(collector.LogsCollector, "create", err)