Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"regexp"
"slices"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -31,33 +32,29 @@ var logFormatRegex = regexp.MustCompile(
`[A-Z0-9]{5}:`,
)

var supportedSeverities = map[string]bool{
"ERROR": true,
"FATAL": true,
"PANIC": true,
}

type ParsedError struct {
ErrorSeverity string
SQLStateCode string
SQLStateClass string
User string
DatabaseName string
Comment on lines -40 to -45
Copy link
Copy Markdown
Contributor Author

@cristiangreco cristiangreco Feb 18, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed struct in favour of updating the metric directly.

var supportedSeverities = map[string]struct{}{
"ERROR": {},
"FATAL": {},
"PANIC": {},
}

type LogsArguments struct {
Receiver loki.LogsReceiver
EntryHandler loki.EntryHandler
Logger log.Logger
Registry *prometheus.Registry
Receiver loki.LogsReceiver
EntryHandler loki.EntryHandler
Logger log.Logger
Registry *prometheus.Registry
ExcludeDatabases []string
ExcludeUsers []string
}

type Logs struct {
logger log.Logger
entryHandler loki.EntryHandler
registry *prometheus.Registry

receiver loki.LogsReceiver
receiver loki.LogsReceiver
excludeDatabases []string
excludeUsers []string

errorsBySQLState *prometheus.CounterVec
parseErrors prometheus.Counter
Expand All @@ -79,14 +76,16 @@ func NewLogs(args LogsArguments) (*Logs, error) {
ctx, cancel := context.WithCancel(context.Background())

l := &Logs{
logger: log.With(args.Logger, "collector", LogsCollector),
entryHandler: args.EntryHandler,
registry: args.Registry,
receiver: args.Receiver,
ctx: ctx,
cancel: cancel,
stopped: atomic.NewBool(false),
startTime: time.Now(),
logger: log.With(args.Logger, "collector", LogsCollector),
entryHandler: args.EntryHandler,
registry: args.Registry,
receiver: args.Receiver,
excludeDatabases: args.ExcludeDatabases,
excludeUsers: args.ExcludeUsers,
ctx: ctx,
cancel: cancel,
stopped: atomic.NewBool(false),
startTime: time.Now(),
}

l.initMetrics()
Expand Down Expand Up @@ -220,10 +219,18 @@ func (l *Logs) parseTextLog(entry loki.Entry) error {

database := strings.TrimSpace(afterAt[:pidMarkerIdx])

if slices.Contains(l.excludeDatabases, database) {
return nil
}

beforeAt := line[:atIdx]
lastColonBeforeAt := strings.LastIndex(beforeAt, ":")
user := strings.TrimSpace(beforeAt[lastColonBeforeAt+1:])

if slices.Contains(l.excludeUsers, user) {
return nil
}

// Extract SQLSTATE from format: [pid]:line_number:SQLSTATE:...
pidEndIdx := strings.Index(afterAt, "]")
afterPid := afterAt[pidEndIdx+1:]
Expand All @@ -250,19 +257,17 @@ func (l *Logs) parseTextLog(entry loki.Entry) error {
return nil
}

if !supportedSeverities[severity] {
if _, ok := supportedSeverities[severity]; !ok {
return nil
}

parsed := &ParsedError{
ErrorSeverity: severity,
SQLStateCode: sqlstateCode,
SQLStateClass: sqlstateClass,
User: user,
DatabaseName: database,
}

l.updateMetrics(parsed)
l.errorsBySQLState.WithLabelValues(
severity,
sqlstateCode,
sqlstateClass,
database,
user,
).Inc()

return nil
}
Expand Down Expand Up @@ -299,16 +304,6 @@ func extractSeverity(message string) string {
return ""
}

func (l *Logs) updateMetrics(parsed *ParsedError) {
l.errorsBySQLState.WithLabelValues(
parsed.ErrorSeverity,
parsed.SQLStateCode,
parsed.SQLStateClass,
parsed.DatabaseName,
parsed.User,
).Inc()
}

func truncateString(s string, maxLen int) string {
if len(s) <= maxLen {
return s
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -671,3 +671,97 @@ func TestLogsCollector_SkipsOnlyHistoricalLogs(t *testing.T) {
}
require.Equal(t, float64(0), totalCount, "historical logs must not produce metrics")
}

func TestLogsCollector_ExcludeDatabases(t *testing.T) {
entryHandler := loki.NewEntryHandler(make(chan loki.Entry, 10), func() {})
registry := prometheus.NewRegistry()

collector, err := NewLogs(LogsArguments{
Receiver: loki.NewLogsReceiver(),
EntryHandler: entryHandler,
Logger: log.NewNopLogger(),
Registry: registry,
ExcludeDatabases: []string{"excluded_db"},
})
require.NoError(t, err)

startTime := collector.startTime
err = collector.Start(context.Background())
require.NoError(t, err)
defer collector.Stop()

ts := startTime.Add(10 * time.Second).UTC()
ts1 := ts.Format("2006-01-02 15:04:05.000 MST")
ts2 := ts.Add(-1 * time.Second).Format("2006-01-02 15:04:05 MST")

excludedLog := ts1 + ":10.0.1.5(12345):app-user@excluded_db:[9112]:4:57014:" + ts2 + ":25/112:0:693c34cb.2398::psqlERROR: canceling statement"
allowedLog := ts1 + ":10.0.1.5(12345):app-user@allowed_db:[9113]:5:57014:" + ts2 + ":25/113:0:693c34cb.2399::psqlERROR: canceling statement"

collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: excludedLog, Timestamp: time.Now()}}
collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: allowedLog, Timestamp: time.Now()}}

time.Sleep(200 * time.Millisecond)

mfs, _ := registry.Gather()
var totalCount float64
for _, mf := range mfs {
if mf.GetName() == "database_observability_pg_errors_total" {
for _, metric := range mf.GetMetric() {
labels := make(map[string]string)
for _, label := range metric.GetLabel() {
labels[label.GetName()] = label.GetValue()
}
totalCount += metric.GetCounter().GetValue()
require.Equal(t, "allowed_db", labels["datname"], "only allowed_db should produce metrics")
}
}
}
require.Equal(t, float64(1), totalCount, "only the non-excluded database log should be counted")
}

func TestLogsCollector_ExcludeUsers(t *testing.T) {
entryHandler := loki.NewEntryHandler(make(chan loki.Entry, 10), func() {})
registry := prometheus.NewRegistry()

collector, err := NewLogs(LogsArguments{
Receiver: loki.NewLogsReceiver(),
EntryHandler: entryHandler,
Logger: log.NewNopLogger(),
Registry: registry,
ExcludeUsers: []string{"excluded_user"},
})
require.NoError(t, err)

startTime := collector.startTime
err = collector.Start(context.Background())
require.NoError(t, err)
defer collector.Stop()

ts := startTime.Add(10 * time.Second).UTC()
ts1 := ts.Format("2006-01-02 15:04:05.000 MST")
ts2 := ts.Add(-1 * time.Second).Format("2006-01-02 15:04:05 MST")

excludedLog := ts1 + ":10.0.1.5(12345):excluded_user@testdb:[9112]:4:57014:" + ts2 + ":25/112:0:693c34cb.2398::psqlERROR: canceling statement"
allowedLog := ts1 + ":10.0.1.5(12345):allowed_user@testdb:[9113]:5:57014:" + ts2 + ":25/113:0:693c34cb.2399::psqlERROR: canceling statement"

collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: excludedLog, Timestamp: time.Now()}}
collector.Receiver().Chan() <- loki.Entry{Entry: push.Entry{Line: allowedLog, Timestamp: time.Now()}}

time.Sleep(200 * time.Millisecond)

mfs, _ := registry.Gather()
var totalCount float64
for _, mf := range mfs {
if mf.GetName() == "database_observability_pg_errors_total" {
for _, metric := range mf.GetMetric() {
labels := make(map[string]string)
for _, label := range metric.GetLabel() {
labels[label.GetName()] = label.GetValue()
}
totalCount += metric.GetCounter().GetValue()
require.Equal(t, "allowed_user", labels["user"], "only allowed_user should produce metrics")
}
}
}
require.Equal(t, float64(1), totalCount, "only the non-excluded user log should be counted")
}
Original file line number Diff line number Diff line change
Expand Up @@ -561,10 +561,12 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud

// Logs collector is always enabled
logsCollector, err := collector.NewLogs(collector.LogsArguments{
Receiver: c.logsReceiver,
EntryHandler: loki.NewEntryHandler(c.logsReceiver.Chan(), func() {}),
Logger: c.opts.Logger,
Registry: c.registry,
Receiver: c.logsReceiver,
EntryHandler: loki.NewEntryHandler(c.logsReceiver.Chan(), func() {}),
Logger: c.opts.Logger,
Registry: c.registry,
ExcludeDatabases: c.args.ExcludeDatabases,
ExcludeUsers: c.args.ExcludeUsers,
})
if err != nil {
logStartError(collector.LogsCollector, "create", err)
Expand Down
Loading