From 80bc1b56740f2dbc76579b46d35f49fb58005919 Mon Sep 17 00:00:00 2001 From: Cristian Greco Date: Wed, 21 Jan 2026 17:14:00 +0100 Subject: [PATCH] fix: Database_observability: fix race in postgres query samples test --- .../postgres/collector/query_samples_test.go | 197 +++++++++++++----- 1 file changed, 145 insertions(+), 52 deletions(-) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index ac1e8e04f52..a0b3e187ea8 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -2,6 +2,7 @@ package collector import ( "database/sql" + "errors" "fmt" "reflect" "strings" @@ -19,6 +20,8 @@ import ( "github.com/grafana/alloy/internal/util/syncbuffer" ) +var errMockQuerySamplesFailed = errors.New("test-error") + func TestQuerySamples_FetchQuerySamples(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) @@ -41,7 +44,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { name string setupMock func(mock sqlmock.Sqlmock) disableQueryRedaction bool - expectedErrorLine string expectedLabels []model.LabelSet expectedLines []string }{ @@ -60,7 +62,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) }, - expectedLabels: []model.LabelSet{ {"op": OP_QUERY_SAMPLE}, }, @@ -83,12 +84,11 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) }, - expectedLabels: []model.LabelSet{ {"op": OP_QUERY_SAMPLE}, }, expectedLines: []string{ - `level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" state="active" xid="0" xmin="0" xact_time="0s" query_time="0s" queryid="123" cpu_time="0s"`, // time.Duration(0).String(), + `level="info" datname="testdb" pid="101" leader_pid="100" user="testuser" app="testapp" client="127.0.0.1:5432" backend_type="parallel worker" state="active" xid="0" xmin="0" xact_time="0s" query_time="0s" queryid="123" cpu_time="0s"`, }, }, { @@ -106,7 +106,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) }, - expectedLabels: []model.LabelSet{ {"op": OP_QUERY_SAMPLE}, {"op": OP_WAIT_EVENT}, @@ -116,46 +115,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { `level="info" datname="testdb" pid="102" leader_pid="" user="testuser" backend_type="client backend" state="waiting" xid="0" xmin="0" wait_time="10s" wait_event_type="Lock" wait_event="relation" wait_event_name="Lock:relation" blocked_by_pids="[103 104]" queryid="124"`, }, }, - { - name: "insufficient privilege query - no loki entries expected", - setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(append(columns, "query")).AddRow( - now, "testdb", 103, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now, sql.NullInt32{}, sql.NullInt32{}, - now, "active", now, sql.NullString{}, - sql.NullString{}, nil, now, sql.NullInt64{Int64: 125, Valid: true}, - "", - )) - // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(append(columns, "query"))) - }, - disableQueryRedaction: true, - expectedErrorLine: `err="insufficient privilege to access query`, - expectedLabels: []model.LabelSet{}, // No Loki entries expected - expectedLines: []string{}, // No Loki entries expected - }, - { - name: "null database name - no loki entries expected", - setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns).AddRow( - now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, - "testuser", "testapp", "127.0.0.1", 5432, - "client backend", now, sql.NullInt32{}, sql.NullInt32{}, - now, "active", now, sql.NullString{}, - sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, - )) - // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). - WillReturnRows(sqlmock.NewRows(columns)) - }, - expectedErrorLine: `err="database name is not valid`, - expectedLabels: []model.LabelSet{}, // No Loki entries expected - expectedLines: []string{}, // No Loki entries expected - }, { name: "query with redaction disabled", setupMock: func(mock sqlmock.Sqlmock) { @@ -209,13 +168,6 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { require.NoError(t, sampleCollector.Start(t.Context())) - // For error cases, wait for error message in logs - if tc.expectedErrorLine != "" { - require.Eventually(t, func() bool { - return strings.Contains(logBuffer.String(), tc.expectedErrorLine) - }, 5*time.Second, 100*time.Millisecond) - } - require.Eventually(t, func() bool { return len(lokiClient.Received()) == len(tc.expectedLines) }, 5*time.Second, 100*time.Millisecond) @@ -243,6 +195,123 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { } } +// TestQuerySamples_FetchQuerySamples_ErrorCases tests scenarios where errors occur +// and no Loki entries are produced. +func TestQuerySamples_FetchQuerySamples_ErrorCases(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + now := time.Now() + + columns := []string{ + "now", "datname", "pid", "leader_pid", + "usename", "application_name", "client_addr", "client_port", + "backend_type", "backend_start", "backend_xid", "backend_xmin", + "xact_start", "state", "state_change", "wait_event_type", + "wait_event", "blocked_by_pids", "query_start", "query_id", + } + + testCases := []struct { + name string + setupMock func(mock sqlmock.Sqlmock) + disableQueryRedaction bool + expectedErrorLine string + }{ + { + name: "insufficient privilege query", + setupMock: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(append(columns, "query")).AddRow( + now, "testdb", 103, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", now, sql.NullInt32{}, sql.NullInt32{}, + now, "active", now, sql.NullString{}, + sql.NullString{}, nil, now, sql.NullInt64{Int64: 125, Valid: true}, + "", + )) + // Second scrape: empty to complete cycle + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(append(columns, "query"))) + // Return error to trigger finalization + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + WillReturnError(errMockQuerySamplesFailed) + }, + disableQueryRedaction: true, + expectedErrorLine: `err="insufficient privilege to access query`, + }, + { + name: "null database name", + setupMock: func(mock sqlmock.Sqlmock) { + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns).AddRow( + now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", now, sql.NullInt32{}, sql.NullInt32{}, + now, "active", now, sql.NullString{}, + sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, + )) + // Second scrape: empty to complete cycle + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + // Return error to trigger finalization + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)). + WillReturnError(errMockQuerySamplesFailed) + }, + expectedErrorLine: `err="database name is not valid`, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + t.Parallel() + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: time.Millisecond, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + DisableQueryRedaction: tc.disableQueryRedaction, + ExcludeCurrentUser: true, + }) + require.NoError(t, err) + require.NotNil(t, sampleCollector) + + tc.setupMock(mock) + + require.NoError(t, sampleCollector.Start(t.Context())) + + // Wait for the expected error to be logged + require.Eventually(t, func() bool { + return strings.Contains(logBuffer.String(), tc.expectedErrorLine) + }, 5*time.Second, 100*time.Millisecond) + + // Wait for the error to be returned, indicating all expected queries completed + require.Eventually(t, func() bool { + return strings.Contains(logBuffer.String(), errMockQuerySamplesFailed.Error()) + }, 5*time.Second, 100*time.Millisecond) + + // Verify no Loki entries were produced + require.Empty(t, lokiClient.Received()) + + sampleCollector.Stop() + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) + }) + } +} + func TestQuerySamples_FinalizationScenarios(t *testing.T) { defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) @@ -718,9 +787,17 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 20003, Valid: true}, "SELECT * FROM users", )) + // Return error to trigger finalization + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + WillReturnError(errMockQuerySamplesFailed) require.NoError(t, sampleCollector.Start(t.Context())) + // Wait for all mock queries to complete + require.Eventually(t, func() bool { + return strings.Contains(logBuffer.String(), errMockQuerySamplesFailed.Error()) + }, 5*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return len(lokiClient.Received()) == 1 }, 5*time.Second, 100*time.Millisecond) @@ -783,9 +860,17 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 21002, Valid: true}, "SELECT 1", )) + // Return error to trigger finalization + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + WillReturnError(errMockQuerySamplesFailed) require.NoError(t, sampleCollector.Start(t.Context())) + // Wait for all mock queries to complete + require.Eventually(t, func() bool { + return strings.Contains(logBuffer.String(), errMockQuerySamplesFailed.Error()) + }, 5*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return len(lokiClient.Received()) == 1 }, 5*time.Second, 100*time.Millisecond) @@ -866,9 +951,17 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 23002, Valid: true}, "SELECT * FROM b", )) + // Return error to trigger finalization + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + WillReturnError(errMockQuerySamplesFailed) require.NoError(t, sampleCollector.Start(t.Context())) + // Wait for all mock queries to complete + require.Eventually(t, func() bool { + return strings.Contains(logBuffer.String(), errMockQuerySamplesFailed.Error()) + }, 5*time.Second, 100*time.Millisecond) + require.Eventually(t, func() bool { return len(lokiClient.Received()) == 2 }, 5*time.Second, 100*time.Millisecond)