diff --git a/docs/sources/reference/components/database_observability/database_observability.postgres.md b/docs/sources/reference/components/database_observability/database_observability.postgres.md index 79cb12e28f1..b04904ec43b 100644 --- a/docs/sources/reference/components/database_observability/database_observability.postgres.md +++ b/docs/sources/reference/components/database_observability/database_observability.postgres.md @@ -94,6 +94,7 @@ The `aws` block supplies the [ARN](https://docs.aws.amazon.com/IAM/latest/UserGu |---------------------------|------------|---------------------------------------------------------------|---------|----------| | `collect_interval` | `duration` | How frequently to collect information from database. | `"15s"` | no | | `disable_query_redaction` | `bool` | Collect unredacted SQL query text (might include parameters). | `false` | no | +| `exclude_current_user` | `bool` | Do not collect query samples for current database user. | `true` | no | ### `schema_details` diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index d526b13d2f2..1f644ae60b2 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -66,8 +66,11 @@ const selectPgStatActivity = ` s.query_id != 0 ) ) + %s ` +const excludeCurrentUserClause = `AND s.usesysid != (select oid from pg_roles where rolname = current_user)` + type QuerySamplesInfo struct { DatabaseName sql.NullString DatabaseID int @@ -100,6 +103,7 @@ type QuerySamplesArguments struct { EntryHandler loki.EntryHandler Logger log.Logger DisableQueryRedaction bool + ExcludeCurrentUser bool } type QuerySamples struct { @@ -107,6 +111,7 @@ type QuerySamples struct { collectInterval time.Duration entryHandler loki.EntryHandler disableQueryRedaction bool + excludeCurrentUser bool logger log.Logger running *atomic.Bool @@ -204,7 +209,7 @@ func (w WaitEventIdentity) Equal(other WaitEventIdentity) bool { } func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { - const emittedCacheSize = 1000 //pg_stat_statements default max number of statements to track + const emittedCacheSize = 1000 // pg_stat_statements default max number of statements to track const emittedCacheTTL = 10 * time.Minute return &QuerySamples{ @@ -212,6 +217,7 @@ func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { collectInterval: args.CollectInterval, entryHandler: args.EntryHandler, disableQueryRedaction: args.DisableQueryRedaction, + excludeCurrentUser: args.ExcludeCurrentUser, logger: log.With(args.Logger, "collector", QuerySamplesCollector), running: &atomic.Bool{}, samples: map[SampleKey]*SampleState{}, @@ -275,7 +281,11 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { queryTextField = queryTextClause } - query := fmt.Sprintf(selectPgStatActivity, queryTextField) + excludeCurrentUserClauseField := "" + if c.excludeCurrentUser { + excludeCurrentUserClauseField = excludeCurrentUserClause + } + query := fmt.Sprintf(selectPgStatActivity, queryTextField, excludeCurrentUserClauseField) rows, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index 5013d4e5384..ac1e8e04f52 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -12,7 +12,6 @@ import ( "github.com/go-kit/log" "github.com/lib/pq" "github.com/prometheus/common/model" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/goleak" @@ -30,6 +29,14 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { xactStartTime := now.Add(-2 * time.Minute) // 2 minutes ago backendStartTime := now.Add(-1 * time.Hour) // 1 hour ago + columns := []string{ + "now", "datname", "pid", "leader_pid", + "usename", "application_name", "client_addr", "client_port", + "backend_type", "backend_start", "backend_xid", "backend_xmin", + "xact_start", "state", "state_change", "wait_event_type", + "wait_event", "blocked_by_pids", "query_start", "query_id", + } + testCases := []struct { name string setupMock func(mock sqlmock.Sqlmock) @@ -41,14 +48,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "active query without wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - }).AddRow( + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, @@ -56,14 +57,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - })) + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) }, expectedLabels: []model.LabelSet{ @@ -76,14 +71,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "parallel query with leader PID", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - }).AddRow( + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 101, sql.NullInt64{Int64: 100, Valid: true}, "testuser", "testapp", "127.0.0.1", 5432, "parallel worker", now, sql.NullInt32{}, sql.NullInt32{}, @@ -91,14 +80,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - })) + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) }, expectedLabels: []model.LabelSet{ @@ -111,14 +94,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - }).AddRow( + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 102, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, @@ -126,14 +103,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - })) + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) }, expectedLabels: []model.LabelSet{ @@ -148,15 +119,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "insufficient privilege query - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", - }).AddRow( + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(append(columns, "query")).AddRow( now, "testdb", 103, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", now, sql.NullInt32{}, sql.NullInt32{}, @@ -165,15 +129,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "", )) // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", - })) + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(append(columns, "query"))) }, disableQueryRedaction: true, expectedErrorLine: `err="insufficient privilege to access query`, @@ -183,14 +140,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "null database name - no loki entries expected", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - }).AddRow( + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", now, sql.NullInt32{}, sql.NullInt32{}, @@ -198,14 +149,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, )) // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "")).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - })) + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) }, expectedErrorLine: `err="database name is not valid`, expectedLabels: []model.LabelSet{}, // No Loki entries expected @@ -214,15 +159,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with redaction disabled", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", - }).AddRow( + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(append(columns, "query")).AddRow( now, "testdb", 106, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, "client backend", backendStartTime, sql.NullInt32{}, sql.NullInt32{}, @@ -231,15 +169,8 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows([]string{ - "now", "datname", "pid", "leader_pid", - "usename", "application_name", "client_addr", "client_port", - "backend_type", "backend_start", "backend_xid", "backend_xmin", - "xact_start", "state", "state_change", "wait_event_type", - "wait_event", "blocked_by_pids", "query_start", "query_id", - "query", - })) + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(append(columns, "query"))) }, disableQueryRedaction: true, expectedLabels: []model.LabelSet{ @@ -265,59 +196,49 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: tc.disableQueryRedaction, + ExcludeCurrentUser: true, }) require.NoError(t, err) require.NotNil(t, sampleCollector) - // Setup mock expectations tc.setupMock(mock) - err = sampleCollector.Start(t.Context()) - require.NoError(t, err) + require.NoError(t, sampleCollector.Start(t.Context())) - // Wait for Loki entries to be generated and verify their content, labels, and timestamps. - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, len(tc.expectedLines)) - - require.Contains(t, logBuffer.String(), tc.expectedErrorLine) - - for i, entry := range entries { - if !reflect.DeepEqual(entry.Labels, tc.expectedLabels[i]) { - t.Errorf("expected label %v, got %v", tc.expectedLabels[i], entry.Labels) - } - require.Contains(t, entry.Line, tc.expectedLines[i]) - // Verify that BuildLokiEntryWithTimestamp is setting the timestamp correctly - expectedTimestamp := time.Unix(0, now.UnixNano()) - require.True(t, entry.Timestamp.Equal(expectedTimestamp)) - } + // For error cases, wait for error message in logs + if tc.expectedErrorLine != "" { + require.Eventually(t, func() bool { + return strings.Contains(logBuffer.String(), tc.expectedErrorLine) + }, 5*time.Second, 100*time.Millisecond) + } + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == len(tc.expectedLines) }, 5*time.Second, 100*time.Millisecond) - sampleCollector.Stop() + entries := lokiClient.Received() + for i, entry := range entries { + if !reflect.DeepEqual(entry.Labels, tc.expectedLabels[i]) { + t.Errorf("expected label %v, got %v", tc.expectedLabels[i], entry.Labels) + } + require.Equal(t, entry.Line, tc.expectedLines[i]) + // Verify that BuildLokiEntryWithTimestamp is setting the timestamp correctly + expectedTimestamp := time.Unix(0, now.UnixNano()) + require.True(t, entry.Timestamp.Equal(expectedTimestamp)) + } - // Wait for the collector to stop + sampleCollector.Stop() require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - - // Give time for goroutines to clean up - time.Sleep(100 * time.Millisecond) - // Run this after Stop() to avoid race conditions - err = mock.ExpectationsWereMet() - require.NoError(t, err) - - lokiEntries := lokiClient.Received() - require.Equal(t, len(tc.expectedLines), len(lokiEntries)) - for i, entry := range lokiEntries { - require.Equal(t, tc.expectedLabels[i], entry.Labels) - require.Equal(t, tc.expectedLines[i], entry.Line) - } + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) } } @@ -341,24 +262,28 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { } t.Run("finalize on disappear after active scrape", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // First scrape: active row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 1000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -368,46 +293,55 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Second scrape: no rows -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Equal(t, `level="info" datname="testdb" pid="1000" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="10" xmin="20" xact_time="2m0s" query_time="30s" queryid="999" cpu_time="10s" query="SELECT * FROM t"`, entries[0].Line) - expectedTimestamp := time.Unix(0, now.UnixNano()) - require.True(t, entries[0].Timestamp.Equal(expectedTimestamp)) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 1) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, `level="info" datname="testdb" pid="1000" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="10" xmin="20" xact_time="2m0s" query_time="30s" queryid="999" cpu_time="10s" query="SELECT * FROM t"`, entries[0].Line) + expectedTimestamp := time.Unix(0, now.UnixNano()) + require.True(t, entries[0].Timestamp.Equal(expectedTimestamp)) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("wait-event merges across scrapes with normalized PID set", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: wait event with unordered/dup PIDs - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -417,7 +351,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: same wait event with normalized PIDs - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -427,45 +361,54 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 2) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Equal(t, `level="info" datname="testdb" pid="300" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="12s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124"`, entries[1].Line) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 2 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 2) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) + require.Equal(t, `level="info" datname="testdb" pid="300" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="12s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124"`, entries[1].Line) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("wait-event closes on no-wait row; single occurrence emitted", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: wait event - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -475,7 +418,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: active with no wait -> close occurrence - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -485,46 +428,55 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 2) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Equal(t, `level="info" datname="testdb" pid="301" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="0" xmin="0" xact_time="2m0s" query_time="0s" queryid="555" cpu_time="0s" query="UPDATE users SET status = 'active'"`, entries[0].Line) - require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Equal(t, `level="info" datname="testdb" pid="301" leader_pid="" user="testuser" backend_type="client backend" state="active" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="555"`, entries[1].Line) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 2 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 2) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, `level="info" datname="testdb" pid="301" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="active" xid="0" xmin="0" xact_time="2m0s" query_time="0s" queryid="555" cpu_time="0s" query="UPDATE users SET status = 'active'"`, entries[0].Line) + require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) + require.Equal(t, `level="info" datname="testdb" pid="301" leader_pid="" user="testuser" backend_type="client backend" state="active" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="555"`, entries[1].Line) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("cpu persists across waits", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: active CPU snapshot (10s) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -534,7 +486,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: waiting with wait_event; state_change 7s ago - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -544,46 +496,55 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 2) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Equal(t, `level="info" datname="testdb" pid="402" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="9002" cpu_time="10s" query="SELECT * FROM t"`, entries[0].Line) - require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Equal(t, `level="info" datname="testdb" pid="402" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="7s" wait_event_type="IO" wait_event="DataFileRead" wait_event_name="IO:DataFileRead" blocked_by_pids="[501]" queryid="9002"`, entries[1].Line) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 2 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 2) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, `level="info" datname="testdb" pid="402" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="9002" cpu_time="10s" query="SELECT * FROM t"`, entries[0].Line) + require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) + require.Equal(t, `level="info" datname="testdb" pid="402" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="7s" wait_event_type="IO" wait_event="DataFileRead" wait_event_name="IO:DataFileRead" blocked_by_pids="[501]" queryid="9002"`, entries[1].Line) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("wait-event starts new occurrence on set change", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: wait event set A - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -593,7 +554,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 2: same event, set changes -> new occurrence - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -603,27 +564,32 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 3) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="9003" query="UPDATE t SET c=1"`, entries[0].Line) - require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) - require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="5s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103]" queryid="9003"`, entries[1].Line) - require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[2].Labels) - require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="8s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="9003"`, entries[2].Line) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 3 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 3) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="client backend" state="waiting" xid="0" xmin="0" xact_time="2m0s" query_time="30s" queryid="9003" query="UPDATE t SET c=1"`, entries[0].Line) + require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[1].Labels) + require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="5s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103]" queryid="9003"`, entries[1].Line) + require.Equal(t, model.LabelSet{"op": OP_WAIT_EVENT}, entries[2].Labels) + require.Equal(t, `level="info" datname="testdb" pid="403" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="8s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="9003"`, entries[2].Line) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) } @@ -646,24 +612,28 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { } t.Run("emit at idle state with end at state_change", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: active row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -673,7 +643,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: same key turns idle; state_change denotes end - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -685,43 +655,51 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `query_time="20s"`) - require.Contains(t, entries[0].Line, `cpu_time="10s"`) - expectedTs := time.Unix(0, stateChangeTime.UnixNano()) - require.True(t, entries[0].Timestamp.Equal(expectedTs)) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 1 + }, 5*time.Second, 100*time.Millisecond) - require.Eventually(t, func() bool { return mock.ExpectationsWereMet() == nil }, 5*time.Second, 50*time.Millisecond) + entries := lokiClient.Received() + require.Len(t, entries, 1) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Contains(t, entries[0].Line, `query_time="20s"`) + require.Contains(t, entries[0].Line, `cpu_time="10s"`) + expectedTs := time.Unix(0, stateChangeTime.UnixNano()) + require.True(t, entries[0].Timestamp.Equal(expectedTs)) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("idle-only emitted once and deduped across scrapes", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: only idle row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2001, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -731,7 +709,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM users", )) // Scrape 2: same idle row again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2001, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -743,43 +721,50 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - require.Contains(t, entries[0].Line, `query_time="20s"`) - expectedTs := time.Unix(0, stateChangeTime.UnixNano()) - require.True(t, entries[0].Timestamp.Equal(expectedTs)) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 1 + }, 5*time.Second, 100*time.Millisecond) - require.Eventually(t, func() bool { return mock.ExpectationsWereMet() == nil }, 5*time.Second, 50*time.Millisecond) + entries := lokiClient.Received() + require.Len(t, entries, 1) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + require.Contains(t, entries[0].Line, `query_time="20s"`) + expectedTs := time.Unix(0, stateChangeTime.UnixNano()) + require.True(t, entries[0].Timestamp.Equal(expectedTs)) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("idle in transaction (aborted) emitted once and deduped across scrapes", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: idle in transaction (aborted) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -789,7 +774,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT 1", )) // Scrape 2: same idle row again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -801,43 +786,50 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 1) - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) - // End timestamp should match state_change - expectedTs := time.Unix(0, stateChangeTime.UnixNano()) - require.True(t, entries[0].Timestamp.Equal(expectedTs)) - }, 5*time.Second, 50*time.Millisecond) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 1 + }, 5*time.Second, 100*time.Millisecond) - require.Eventually(t, func() bool { return mock.ExpectationsWereMet() == nil }, 5*time.Second, 50*time.Millisecond) + entries := lokiClient.Received() + require.Len(t, entries, 1) + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, entries[0].Labels) + // End timestamp should match state_change + expectedTs := time.Unix(0, stateChangeTime.UnixNano()) + require.True(t, entries[0].Timestamp.Equal(expectedTs)) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) t.Run("two idle-only keys emit separately and dedup individually", func(t *testing.T) { + t.Parallel() + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() logBuffer := syncbuffer.Buffer{} lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ DB: db, - CollectInterval: 10 * time.Millisecond, + CollectInterval: time.Millisecond, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), DisableQueryRedaction: true, + ExcludeCurrentUser: true, }) require.NoError(t, err) // Scrape 1: two idle-only rows with different keys (PID/QueryID) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns). AddRow( now, "testdb", 2200, sql.NullInt64{}, @@ -856,7 +848,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM b", )) // Scrape 2: same idle rows again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns). AddRow( now, "testdb", 2200, sql.NullInt64{}, @@ -877,32 +869,127 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, sampleCollector.Start(t.Context())) - require.EventuallyWithT(t, func(t *assert.CollectT) { - entries := lokiClient.Received() - require.Len(t, entries, 2) - // Both entries should be OP_QUERY_SAMPLE - for _, e := range entries { - require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, e.Labels) + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 2 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 2) + // Both entries should be OP_QUERY_SAMPLE + for _, e := range entries { + require.Equal(t, model.LabelSet{"op": OP_QUERY_SAMPLE}, e.Labels) + } + // Ensure both queryids are present among the two entries + var seen22002, seen23002 bool + for _, e := range entries { + if strings.Contains(e.Line, `queryid="22002"`) { + seen22002 = true } - // Ensure both queryids are present among the two entries - var seen22002, seen23002 bool - for _, e := range entries { - if strings.Contains(e.Line, `queryid="22002"`) { - seen22002 = true - } - if strings.Contains(e.Line, `queryid="23002"`) { - seen23002 = true - } + if strings.Contains(e.Line, `queryid="23002"`) { + seen23002 = true } - require.True(t, seen22002 && seen23002) - }, 5*time.Second, 50*time.Millisecond) - - require.Eventually(t, func() bool { return mock.ExpectationsWereMet() == nil }, 5*time.Second, 50*time.Millisecond) + } + require.True(t, seen22002 && seen23002) sampleCollector.Stop() - require.Eventually(t, func() bool { return sampleCollector.Stopped() }, 5*time.Second, 100*time.Millisecond) - lokiClient.Stop() - time.Sleep(100 * time.Millisecond) - require.NoError(t, mock.ExpectationsWereMet()) + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) }) } + +func TestQuerySamples_ExcludeCurrentUser(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + now := time.Now() + stateChangeTime := now.Add(-10 * time.Second) + queryStartTime := now.Add(-30 * time.Second) + xactStartTime := now.Add(-2 * time.Minute) + backendStartTime := now.Add(-1 * time.Hour) + + columns := []string{ + "now", "datname", "pid", "leader_pid", + "usename", "application_name", "client_addr", "client_port", + "backend_type", "backend_start", "backend_xid", "backend_xmin", + "xact_start", "state", "state_change", "wait_event_type", + "wait_event", "blocked_by_pids", "query_start", "query_id", + } + + testCases := []struct { + name string + excludeCurrentUser bool + expectedQuery string + }{ + { + name: "ExcludeCurrentUser enabled", + excludeCurrentUser: true, + expectedQuery: fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause), + }, + { + name: "ExcludeCurrentUser disabled", + excludeCurrentUser: false, + expectedQuery: fmt.Sprintf(selectPgStatActivity, "", ""), + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + ExcludeCurrentUser: tc.excludeCurrentUser, + }) + require.NoError(t, err) + require.NotNil(t, sampleCollector) + + // First scrape: expect query with correct SQL format + mock.ExpectQuery(tc.expectedQuery).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, "testdb", 100, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{Int32: 500, Valid: true}, sql.NullInt32{Int32: 400, Valid: true}, + xactStartTime, "active", stateChangeTime, sql.NullString{}, + sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, + )) + + // Second scrape: empty to trigger finalization + mock.ExpectQuery(tc.expectedQuery).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + err = sampleCollector.Start(t.Context()) + require.NoError(t, err) + + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 1) + + sampleCollector.Stop() + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) + }) + } +} diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index 21994af9c7b..5a1994a46e8 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -85,6 +85,7 @@ type AWSCloudProviderInfo struct { type QuerySampleArguments struct { CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` DisableQueryRedaction bool `alloy:"disable_query_redaction,attr,optional"` + ExcludeCurrentUser bool `alloy:"exclude_current_user,attr,optional"` } type QueryTablesArguments struct { @@ -102,6 +103,7 @@ var DefaultArguments = Arguments{ QuerySampleArguments: QuerySampleArguments{ CollectInterval: 15 * time.Second, DisableQueryRedaction: false, + ExcludeCurrentUser: true, }, QueryTablesArguments: QueryTablesArguments{ CollectInterval: 1 * time.Minute, @@ -417,6 +419,7 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud EntryHandler: entryHandler, Logger: c.opts.Logger, DisableQueryRedaction: c.args.QuerySampleArguments.DisableQueryRedaction, + ExcludeCurrentUser: c.args.QuerySampleArguments.ExcludeCurrentUser, }) if err != nil { logStartError(collector.QuerySamplesCollector, "create", err)