diff --git a/internal/component/database_observability/mysql/collector/exclude_schemas.go b/internal/component/database_observability/mysql/collector/exclude_schemas.go index 396701f7046..ab70a235f2c 100644 --- a/internal/component/database_observability/mysql/collector/exclude_schemas.go +++ b/internal/component/database_observability/mysql/collector/exclude_schemas.go @@ -1,33 +1,18 @@ package collector -import "strings" +import "github.com/grafana/alloy/internal/component/database_observability" -var defaultExcludedSchemas = []string{"mysql", "performance_schema", "sys", "information_schema"} +var excludedSchemas = []string{"mysql", "performance_schema", "sys", "information_schema"} -var defaultExclusionClause = buildExclusionClause(defaultExcludedSchemas) +var exclusionClause = database_observability.BuildExclusionClause(excludedSchemas) -func buildExcludedSchemasClause(excludedSchemas []string) string { - if len(excludedSchemas) == 0 { - return defaultExclusionClause +func buildExcludedSchemasClause(schemas []string) string { + if len(schemas) == 0 { + return exclusionClause } - allSchemas := make([]string, 0, len(defaultExcludedSchemas)+len(excludedSchemas)) - allSchemas = append(allSchemas, defaultExcludedSchemas...) - allSchemas = append(allSchemas, excludedSchemas...) - - return buildExclusionClause(allSchemas) -} - -func buildExclusionClause(schemas []string) string { - escaped := make([]string, len(schemas)) - for i, schema := range schemas { - escaped[i] = escapeSQLString(schema) - } - return "(" + strings.Join(escaped, ", ") + ")" -} - -// escapeSQLString escapes single quotes by doubling them to prevent SQL injection. -func escapeSQLString(s string) string { - escaped := strings.ReplaceAll(s, "'", "''") - return "'" + escaped + "'" + all := make([]string, 0, len(excludedSchemas)+len(schemas)) + all = append(all, excludedSchemas...) + all = append(all, schemas...) + return database_observability.BuildExclusionClause(all) } diff --git a/internal/component/database_observability/mysql/collector/exclude_schemas_test.go b/internal/component/database_observability/mysql/collector/exclude_schemas_test.go deleted file mode 100644 index 0bcfe4d3aaf..00000000000 --- a/internal/component/database_observability/mysql/collector/exclude_schemas_test.go +++ /dev/null @@ -1,58 +0,0 @@ -package collector - -import ( - "testing" - - "github.com/stretchr/testify/require" -) - -func TestBuildExcludedSchemasClause(t *testing.T) { - tests := []struct { - name string - userExcludedSchemas []string - expected string - }{ - { - name: "nil user schemas returns default schemas", - userExcludedSchemas: nil, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema')", - }, - { - name: "empty user schemas returns default schemas", - userExcludedSchemas: []string{}, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema')", - }, - { - name: "single user schema is appended to default schemas", - userExcludedSchemas: []string{"my_schema"}, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'my_schema')", - }, - { - name: "multiple user schemas are appended to default schemas", - userExcludedSchemas: []string{"schema1", "schema2", "schema3"}, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'schema1', 'schema2', 'schema3')", - }, - { - name: "schema with single quote is escaped to prevent SQL injection", - userExcludedSchemas: []string{"test'schema"}, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'test''schema')", - }, - { - name: "schema with SQL injection attempt is escaped", - userExcludedSchemas: []string{"'; DROP TABLE users; --"}, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema', '''; DROP TABLE users; --')", - }, - { - name: "schema with multiple single quotes is escaped", - userExcludedSchemas: []string{"it's a test's schema"}, - expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'it''s a test''s schema')", - }, - } - - for _, tc := range tests { - t.Run(tc.name, func(t *testing.T) { - result := buildExcludedSchemasClause(tc.userExcludedSchemas) - require.Equal(t, tc.expected, result) - }) - } -} diff --git a/internal/component/database_observability/mysql/collector/explain_plans_test.go b/internal/component/database_observability/mysql/collector/explain_plans_test.go index 9a8a6db7e60..ffc5993cd51 100644 --- a/internal/component/database_observability/mysql/collector/explain_plans_test.go +++ b/internal/component/database_observability/mysql/collector/explain_plans_test.go @@ -1529,7 +1529,7 @@ func TestExplainPlans(t *testing.T) { t.Run("uses argument value on first request", func(t *testing.T) { nextSeen := lastSeen.Add(time.Second * 45) - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_text", @@ -1551,7 +1551,7 @@ func TestExplainPlans(t *testing.T) { }) t.Run("uses oldest last seen value on subsequent requests", func(t *testing.T) { - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_text", @@ -1594,7 +1594,7 @@ func TestExplainPlans(t *testing.T) { t.Run("skips truncated queries", func(t *testing.T) { logBuffer.Reset() - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1629,7 +1629,7 @@ func TestExplainPlans(t *testing.T) { t.Run("skips non-select queries", func(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1678,7 +1678,7 @@ func TestExplainPlans(t *testing.T) { t.Run("skips no row result", func(t *testing.T) { logBuffer.Reset() - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1710,7 +1710,7 @@ func TestExplainPlans(t *testing.T) { t.Run("passes queries beginning in select", func(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1747,7 +1747,7 @@ func TestExplainPlans(t *testing.T) { t.Run("passes queries beginning in with", func(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1809,7 +1809,7 @@ func TestQueryFailureDenylist(t *testing.T) { }) require.NoError(t, err) - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1841,7 +1841,7 @@ func TestQueryFailureDenylist(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, exclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", diff --git a/internal/component/database_observability/mysql/collector/query_details_test.go b/internal/component/database_observability/mysql/collector/query_details_test.go index 5cd25cc42ca..5219e826f37 100644 --- a/internal/component/database_observability/mysql/collector/query_details_test.go +++ b/internal/component/database_observability/mysql/collector/query_details_test.go @@ -378,7 +378,7 @@ func TestQueryTables(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, exclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -439,7 +439,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, exclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", // not enough columns @@ -447,7 +447,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { "abc123", )) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, exclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -505,7 +505,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, exclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -568,9 +568,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, exclusionClause, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, exclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", diff --git a/internal/component/database_observability/mysql/collector/query_samples_test.go b/internal/component/database_observability/mysql/collector/query_samples_test.go index 63a87e33c4f..ce6fb66c39f 100644 --- a/internal/component/database_observability/mysql/collector/query_samples_test.go +++ b/internal/component/database_observability/mysql/collector/query_samples_test.go @@ -159,7 +159,7 @@ func TestQuerySamples(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 1e12, ).RowsWillBeClosed(). @@ -243,7 +243,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -338,7 +338,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -505,7 +505,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -641,7 +641,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, defaultExclusionClause, sqlTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, exclusionClause, sqlTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 1e12, ).RowsWillBeClosed(). @@ -757,7 +757,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, defaultExclusionClause, sqlTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, exclusionClause, sqlTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -865,7 +865,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1126,7 +1126,7 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, tc.expectedFields, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, tc.expectedFields, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows(tc.expectedColumns).AddRow(tc.scanValues...), ) @@ -1194,7 +1194,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1214,7 +1214,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1322,7 +1322,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 2e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 2e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1448,7 +1448,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 2e12, ).WillReturnError(fmt.Errorf("connection error")) @@ -1462,7 +1462,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 2e12, ).RowsWillBeClosed(). @@ -1590,7 +1590,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 5, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 5e12, // uptime of 5 seconds in picoseconds (modulo 0 overflows) ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1684,7 +1684,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 5, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 5e12, // uptime of 5 seconds in picoseconds (modulo 0 overflows) ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1735,7 +1735,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( // asserts that beginningAndEndOfTimeline clause is used + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( // asserts that beginningAndEndOfTimeline clause is used 3e12, 10e12, // uptimeLimit is calculated as uptime "modulo" overflows: (uptime - 1 overflow) in picoseconds ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1785,7 +1785,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( 3e12, 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1826,7 +1826,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+13, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( // asserts revert to endOfTimeline clause + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( // asserts revert to endOfTimeline clause 10e12, // asserts timerBookmark has been updated to the previous uptimeLimit 13e12, // asserts uptimeLimit is now updated to the current uptime "modulo" overflows ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1868,7 +1868,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( float64(0), 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1912,7 +1912,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { require.NoError(t, err) defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnError(fmt.Errorf("some error")) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( float64(0), 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1970,7 +1970,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(picosecondsToSeconds(math.MaxUint64)+15, 10)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(3e12, 10e12).WillReturnError(fmt.Errorf("some error")) + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(3e12, 10e12).WillReturnError(fmt.Errorf("some error")) c := &QuerySamples{ dbConnection: db, @@ -1987,7 +1987,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { require.NoError(t, err) defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(picosecondsToSeconds(math.MaxUint64)+15, 10)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 2e12, 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2137,7 +2137,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, exclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). diff --git a/internal/component/database_observability/mysql/collector/schema_details_test.go b/internal/component/database_observability/mysql/collector/schema_details_test.go index cdf2dd17377..61788ca2a46 100644 --- a/internal/component/database_observability/mysql/collector/schema_details_test.go +++ b/internal/component/database_observability/mysql/collector/schema_details_test.go @@ -40,7 +40,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -184,7 +184,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -331,7 +331,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -498,7 +498,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -645,7 +645,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -815,7 +815,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -962,7 +962,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -1049,7 +1049,7 @@ func TestSchemaDetails(t *testing.T) { // second loop, table info will be read from cache // and no further queries will be executed - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -1129,7 +1129,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -1264,7 +1264,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnRows( + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().WillReturnRows( sqlmock.NewRows( []string{"schema_name"}, ).AddRow( @@ -1313,7 +1313,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnRows( + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().WillReturnRows( sqlmock.NewRows([]string{ "schema_name", }).AddRow( @@ -1381,9 +1381,9 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) - mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnRows( + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, exclusionClause)).WithoutArgs().WillReturnRows( sqlmock.NewRows([]string{ "schema_name", }).AddRow( diff --git a/internal/component/database_observability/postgres/collector/exclude_databases.go b/internal/component/database_observability/postgres/collector/exclude_databases.go new file mode 100644 index 00000000000..f93146dd761 --- /dev/null +++ b/internal/component/database_observability/postgres/collector/exclude_databases.go @@ -0,0 +1,18 @@ +package collector + +import "github.com/grafana/alloy/internal/component/database_observability" + +var excludedDatabases = []string{"azure_maintenance"} + +var exclusionClause = database_observability.BuildExclusionClause(excludedDatabases) + +func buildExcludedDatabasesClause(databases []string) string { + if len(databases) == 0 { + return exclusionClause + } + + all := make([]string, 0, len(excludedDatabases)+len(databases)) + all = append(all, excludedDatabases...) + all = append(all, databases...) + return database_observability.BuildExclusionClause(all) +} diff --git a/internal/component/database_observability/postgres/collector/explain_plans.go b/internal/component/database_observability/postgres/collector/explain_plans.go index 239a7cd1ecf..9cf9f4d9fa2 100644 --- a/internal/component/database_observability/postgres/collector/explain_plans.go +++ b/internal/component/database_observability/postgres/collector/explain_plans.go @@ -8,7 +8,6 @@ import ( "fmt" "math" "regexp" - "slices" "strings" "time" "unicode/utf8" @@ -39,7 +38,7 @@ const selectQueriesForExplainPlanTemplate = ` FROM pg_stat_statements s JOIN pg_database d ON s.dbid = d.oid AND NOT d.datistemplate AND d.datallowconn WHERE s.queryid IS NOT NULL AND s.query IS NOT NULL -` + AND d.datname NOT IN %s` const selectExplainPlanPrefix = `EXPLAIN (FORMAT JSON) EXECUTE ` @@ -350,9 +349,10 @@ func (c *ExplainPlans) Stop() { func (c *ExplainPlans) populateQueryCache(ctx context.Context) error { var selectStatement string var resetTS time.Time + excludedDatabasesClause := buildExcludedDatabasesClause(c.excludeDatabases) version17Plus := semver.MustParseRange(">=17.0.0")(c.dbVersion) if version17Plus { - selectStatement = fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since") + selectStatement = fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", excludedDatabasesClause) } else { statReset := c.dbConnection.QueryRowContext(ctx, "SELECT stats_reset FROM pg_stat_statements_info") if err := statReset.Err(); err != nil { @@ -361,7 +361,7 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error { if err := statReset.Scan(&resetTS); err != nil { return fmt.Errorf("failed to scan stats reset time for explain plans: %w", err) } - selectStatement = fmt.Sprintf(selectQueriesForExplainPlanTemplate, "NOW() AT TIME ZONE 'UTC' AS stats_since") + selectStatement = fmt.Sprintf(selectQueriesForExplainPlanTemplate, "NOW() AT TIME ZONE 'UTC' AS stats_since", excludedDatabasesClause) } rs, err := c.dbConnection.QueryContext(ctx, selectStatement) @@ -379,24 +379,6 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error { return fmt.Errorf("failed to scan query for explain plan: %w", err) } - if slices.ContainsFunc(c.excludeDatabases, func(schema string) bool { - return strings.EqualFold(schema, datname) - }) { - - err := c.sendExplainPlansOutput( - datname, - queryId, - generatedAt, - database_observability.ExplainProcessingResultSkipped, - "query belongs to excluded schema", - nil, - ) - if err != nil { - level.Error(c.logger).Log("msg", "failed to send excluded schema skip explain plan output", "err", err) - } - continue - } - statsReset := resetTS if version17Plus { statsReset = ls diff --git a/internal/component/database_observability/postgres/collector/explain_plans_test.go b/internal/component/database_observability/postgres/collector/explain_plans_test.go index a4dda0e524b..f29a916ef45 100644 --- a/internal/component/database_observability/postgres/collector/explain_plans_test.go +++ b/internal/component/database_observability/postgres/collector/explain_plans_test.go @@ -13,6 +13,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/atomic" + "go.uber.org/goleak" "golang.org/x/tools/txtar" "github.com/grafana/alloy/internal/component/common/loki" @@ -2533,7 +2534,7 @@ func TestExplainPlan_PopulateQueryCache(t *testing.T) { rows := sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). AddRow("testdb", "123456", "SELECT * FROM users WHERE id = $1", int64(10), time.Now()) - mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "NOW() AT TIME ZONE 'UTC' AS stats_since")).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "NOW() AT TIME ZONE 'UTC' AS stats_since", exclusionClause)).WillReturnRows(rows) ctx := context.Background() err = explainPlan.populateQueryCache(ctx) @@ -2567,10 +2568,10 @@ func TestExplainPlan_PopulateQueryCache(t *testing.T) { rows := sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). AddRow("testdb", "123456", "SELECT * FROM users WHERE id = $1", int64(10), time.Now()). - AddRow("information_schema", "789012", "SELECT * FROM tables", int64(5), time.Now()). AddRow("testdb2", "345678", "SELECT * FROM orders", int64(20), time.Now()) - mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since")).WillReturnRows(rows) + expectedQuery := fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", buildExcludedDatabasesClause([]string{"information_schema"})) + mock.ExpectQuery(expectedQuery).WillReturnRows(rows) ctx := context.Background() err = explainPlan.populateQueryCache(ctx) @@ -2581,7 +2582,6 @@ func TestExplainPlan_PopulateQueryCache(t *testing.T) { assert.Contains(t, explainPlan.queryCache, "testdb123456") assert.Contains(t, explainPlan.queryCache, "testdb2345678") - assert.NotContains(t, explainPlan.queryCache, "information_schema789012") assert.NoError(t, mock.ExpectationsWereMet()) }) @@ -2604,7 +2604,7 @@ func TestExplainPlan_PopulateQueryCache(t *testing.T) { }, } - mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since")). + mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", exclusionClause)). WillReturnRows(sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). AddRow("testdb", "123456", "SELECT * FROM users WHERE id = $1", int64(10), time.Now())) @@ -2630,7 +2630,7 @@ func TestExplainPlan_PopulateQueryCache(t *testing.T) { }, } - mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since")). + mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", exclusionClause)). WillReturnRows(sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). AddRow("testdb", "123456", "SELECT * FROM users WHERE id = $1", int64(1), time.Now())) @@ -2657,7 +2657,7 @@ func TestExplainPlan_PopulateQueryCache(t *testing.T) { }, } - mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since")). + mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", exclusionClause)). WillReturnRows(sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). AddRow("testdb", "123456", "SELECT * FROM users WHERE id = $1", int64(1), time.Now().Add(-time.Minute))) @@ -2757,7 +2757,7 @@ func TestExplainPlanFetchExplainPlans(t *testing.T) { rows := sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). AddRow("testdb", "123456", "SELECT * FROM users WHERE id = $1", int64(10), time.Now()) - mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since")).WillReturnRows(rows) + mock.ExpectQuery(fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", exclusionClause)).WillReturnRows(rows) ctx := context.Background() err = explainPlan.fetchExplainPlans(ctx) @@ -3078,3 +3078,47 @@ func TestExplainPlanFetchExplainPlans(t *testing.T) { require.NoError(t, err) }) } + +func TestExplainPlans_ExcludeDatabases_NoLogSent(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + post17ver := semver.MustParse("17.0.0") + logBuffer := syncbuffer.Buffer{} + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + // Create ExplainPlans with excluded database + explainPlan := &ExplainPlans{ + dbConnection: db, + dbVersion: post17ver, + queryCache: make(map[string]*queryInfo), + queryDenylist: make(map[string]*queryInfo), + finishedQueryCache: make(map[string]*queryInfo), + excludeDatabases: []string{"excluded_db"}, + perScrapeRatio: 1.0, + logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + entryHandler: lokiClient, + } + + // Verify the query uses the custom exclusion clause that includes both default and user-provided exclusions + expectedQuery := fmt.Sprintf(selectQueriesForExplainPlanTemplate, "s.stats_since", buildExcludedDatabasesClause([]string{"excluded_db"})) + + // Return only non-excluded database rows (simulating SQL-level filtering) + rows := sqlmock.NewRows([]string{"datname", "queryid", "query", "calls", "stats_since"}). + AddRow("included_db", "222222", "SELECT * FROM included_table", int64(10), time.Now()) + + mock.ExpectQuery(expectedQuery).WillReturnRows(rows) + + err = explainPlan.populateQueryCache(context.Background()) + require.NoError(t, err) + + // Verify only included_db query is in the cache + assert.Len(t, explainPlan.queryCache, 1) + assert.Contains(t, explainPlan.queryCache, "included_db222222") + + assert.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/component/database_observability/postgres/collector/query_details.go b/internal/component/database_observability/postgres/collector/query_details.go index 2a63531ea14..b8e1f1b876c 100644 --- a/internal/component/database_observability/postgres/collector/query_details.go +++ b/internal/component/database_observability/postgres/collector/query_details.go @@ -38,25 +38,28 @@ var selectQueriesFromActivity = ` WITHIN GROUP (ORDER BY total_exec_time) FROM pg_stat_statements ) + AND pg_database.datname NOT IN %s ORDER BY total_exec_time DESC LIMIT 100 ` type QueryDetailsArguments struct { - DB *sql.DB - CollectInterval time.Duration - EntryHandler loki.EntryHandler - TableRegistry *TableRegistry + DB *sql.DB + CollectInterval time.Duration + ExcludeDatabases []string + EntryHandler loki.EntryHandler + TableRegistry *TableRegistry Logger log.Logger } type QueryDetails struct { - dbConnection *sql.DB - collectInterval time.Duration - entryHandler loki.EntryHandler - tableRegistry *TableRegistry - normalizer *sqllexer.Normalizer + dbConnection *sql.DB + collectInterval time.Duration + excludeDatabases []string + entryHandler loki.EntryHandler + tableRegistry *TableRegistry + normalizer *sqllexer.Normalizer logger log.Logger running *atomic.Bool @@ -66,13 +69,14 @@ type QueryDetails struct { func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) { return &QueryDetails{ - dbConnection: args.DB, - collectInterval: args.CollectInterval, - entryHandler: args.EntryHandler, - tableRegistry: args.TableRegistry, - normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true), sqllexer.WithCollectComments(true)), - logger: log.With(args.Logger, "collector", QueryDetailsCollector), - running: &atomic.Bool{}, + dbConnection: args.DB, + collectInterval: args.CollectInterval, + excludeDatabases: args.ExcludeDatabases, + entryHandler: args.EntryHandler, + tableRegistry: args.TableRegistry, + normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true), sqllexer.WithCollectComments(true)), + logger: log.With(args.Logger, "collector", QueryDetailsCollector), + running: &atomic.Bool{}, }, nil } @@ -123,7 +127,8 @@ func (c *QueryDetails) Stop() { } func (c QueryDetails) fetchAndAssociate(ctx context.Context) error { - rs, err := c.dbConnection.QueryContext(ctx, selectQueriesFromActivity) + query := fmt.Sprintf(selectQueriesFromActivity, buildExcludedDatabasesClause(c.excludeDatabases)) + rs, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to fetch statements from pg_stat_statements view: %w", err) } diff --git a/internal/component/database_observability/postgres/collector/query_details_test.go b/internal/component/database_observability/postgres/collector/query_details_test.go index 382d10c902f..a995f1936a8 100644 --- a/internal/component/database_observability/postgres/collector/query_details_test.go +++ b/internal/component/database_observability/postgres/collector/query_details_test.go @@ -432,7 +432,7 @@ func TestQueryDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueriesFromActivity).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "queryid", @@ -493,7 +493,7 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueriesFromActivity).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "queryid", // not enough columns @@ -501,7 +501,7 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { "abc123", )) - mock.ExpectQuery(selectQueriesFromActivity).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "queryid", @@ -556,7 +556,7 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueriesFromActivity).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "queryid", @@ -615,9 +615,9 @@ func TestQueryDetails_SQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueriesFromActivity).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, exclusionClause)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) - mock.ExpectQuery(selectQueriesFromActivity).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "queryid", @@ -815,3 +815,58 @@ func TestQueryDetails_RemoveComments(t *testing.T) { }) } } + +func TestQueryDetails_ExcludeDatabases(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + collector, err := NewQueryDetails(QueryDetailsArguments{ + DB: db, + CollectInterval: time.Second, + ExcludeDatabases: []string{"excluded_database"}, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + require.NotNil(t, collector) + + mock.ExpectQuery(fmt.Sprintf(selectQueriesFromActivity, buildExcludedDatabasesClause([]string{"excluded_database"}))).WithoutArgs().RowsWillBeClosed(). + WillReturnRows( + sqlmock.NewRows([]string{ + "queryid", + "query", + "datname", + }).AddRow( + "def456", + "SELECT * FROM orders", + "another_database", + ), + ) + + err = collector.Start(t.Context()) + require.NoError(t, err) + + // Only the another_database should have logs emitted + require.Eventually(t, func() bool { + return len(lokiClient.Received()) >= 2 // query_association + query_parsed_table_name + }, 5*time.Second, 100*time.Millisecond) + + collector.Stop() + + require.Eventually(t, func() bool { + return collector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.NoError(t, mock.ExpectationsWereMet()) + + // Verify only another_database logs were emitted + for _, entry := range lokiClient.Received() { + require.Contains(t, entry.Line, "another_database", "included database should appear in logs") + } +} diff --git a/internal/component/database_observability/postgres/collector/query_samples.go b/internal/component/database_observability/postgres/collector/query_samples.go index 551fafb374e..dfb946b6077 100644 --- a/internal/component/database_observability/postgres/collector/query_samples.go +++ b/internal/component/database_observability/postgres/collector/query_samples.go @@ -66,6 +66,7 @@ const selectPgStatActivity = ` s.query_id != 0 ) ) + AND d.datname NOT IN %s %s ` @@ -100,6 +101,7 @@ type QuerySamplesInfo struct { type QuerySamplesArguments struct { DB *sql.DB CollectInterval time.Duration + ExcludeDatabases []string EntryHandler loki.EntryHandler Logger log.Logger DisableQueryRedaction bool @@ -109,6 +111,7 @@ type QuerySamplesArguments struct { type QuerySamples struct { dbConnection *sql.DB collectInterval time.Duration + excludeDatabases []string entryHandler loki.EntryHandler disableQueryRedaction bool excludeCurrentUser bool @@ -215,6 +218,7 @@ func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { return &QuerySamples{ dbConnection: args.DB, collectInterval: args.CollectInterval, + excludeDatabases: args.ExcludeDatabases, entryHandler: args.EntryHandler, disableQueryRedaction: args.DisableQueryRedaction, excludeCurrentUser: args.ExcludeCurrentUser, @@ -285,7 +289,9 @@ func (c *QuerySamples) fetchQuerySample(ctx context.Context) error { if c.excludeCurrentUser { excludeCurrentUserClauseField = excludeCurrentUserClause } - query := fmt.Sprintf(selectPgStatActivity, queryTextField, excludeCurrentUserClauseField) + + excludedDatabasesClause := buildExcludedDatabasesClause(c.excludeDatabases) + query := fmt.Sprintf(selectPgStatActivity, queryTextField, excludedDatabasesClause, excludeCurrentUserClauseField) rows, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to query pg_stat_activity: %w", err) diff --git a/internal/component/database_observability/postgres/collector/query_samples_test.go b/internal/component/database_observability/postgres/collector/query_samples_test.go index a0b3e187ea8..6f8f5126952 100644 --- a/internal/component/database_observability/postgres/collector/query_samples_test.go +++ b/internal/component/database_observability/postgres/collector/query_samples_test.go @@ -50,7 +50,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "active query without wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -59,7 +59,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 123, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) }, expectedLabels: []model.LabelSet{ @@ -72,7 +72,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "parallel query with leader PID", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 101, sql.NullInt64{Int64: 100, Valid: true}, "testuser", "testapp", "127.0.0.1", 5432, @@ -81,7 +81,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{}, nil, now, sql.NullInt64{Int64: 123, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) }, expectedLabels: []model.LabelSet{ @@ -94,7 +94,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with wait event", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 102, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -103,7 +103,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { sql.NullString{String: "relation", Valid: true}, pq.Int64Array{103, 104}, now, sql.NullInt64{Int64: 124, Valid: true}, )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) }, expectedLabels: []model.LabelSet{ @@ -118,7 +118,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { { name: "query with redaction disabled", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(append(columns, "query")).AddRow( now, "testdb", 106, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -128,7 +128,7 @@ func TestQuerySamples_FetchQuerySamples(t *testing.T) { "SELECT * FROM users WHERE id = 123 AND email = 'test@example.com'", )) // Second scrape: empty to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(append(columns, "query"))) }, disableQueryRedaction: true, @@ -219,7 +219,7 @@ func TestQuerySamples_FetchQuerySamples_ErrorCases(t *testing.T) { { name: "insufficient privilege query", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(append(columns, "query")).AddRow( now, "testdb", 103, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -229,10 +229,10 @@ func TestQuerySamples_FetchQuerySamples_ErrorCases(t *testing.T) { "", )) // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(append(columns, "query"))) // Return error to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)). WillReturnError(errMockQuerySamplesFailed) }, disableQueryRedaction: true, @@ -241,7 +241,7 @@ func TestQuerySamples_FetchQuerySamples_ErrorCases(t *testing.T) { { name: "null database name", setupMock: func(mock sqlmock.Sqlmock) { - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, sql.NullString{Valid: false}, 104, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -250,10 +250,10 @@ func TestQuerySamples_FetchQuerySamples_ErrorCases(t *testing.T) { sql.NullString{}, nil, now, sql.NullInt64{Int64: 126, Valid: true}, )) // Second scrape: empty to complete cycle - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) // Return error to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause)). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause)). WillReturnError(errMockQuerySamplesFailed) }, expectedErrorLine: `err="database name is not valid`, @@ -352,7 +352,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, err) // First scrape: active row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 1000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -362,7 +362,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Second scrape: no rows -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -410,7 +410,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: wait event with unordered/dup PIDs - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -420,7 +420,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: same wait event with normalized PIDs - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 300, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -430,7 +430,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -477,7 +477,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: wait event - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -487,7 +487,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 2: active with no wait -> close occurrence - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 301, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -497,7 +497,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE users SET status = 'active'", )) // Scrape 3: disappear - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -545,7 +545,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: active CPU snapshot (10s) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -555,7 +555,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: waiting with wait_event; state_change 7s ago - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 402, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -565,7 +565,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -613,7 +613,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: wait event set A - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -623,7 +623,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 2: same event, set changes -> new occurrence - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 403, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -633,7 +633,7 @@ func TestQuerySamples_FinalizationScenarios(t *testing.T) { "UPDATE t SET c=1", )) // Scrape 3: disappear -> finalize - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns)) require.NoError(t, sampleCollector.Start(t.Context())) @@ -702,7 +702,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: active row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -712,7 +712,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM t", )) // Scrape 2: same key turns idle; state_change denotes end - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2000, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -768,7 +768,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: only idle row - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2001, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -778,7 +778,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM users", )) // Scrape 2: same idle row again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2001, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -788,7 +788,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM users", )) // Return error to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)). WillReturnError(errMockQuerySamplesFailed) require.NoError(t, sampleCollector.Start(t.Context())) @@ -841,7 +841,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: idle in transaction (aborted) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -851,7 +851,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT 1", )) // Scrape 2: same idle row again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns).AddRow( now, "testdb", 2100, sql.NullInt64{}, "testuser", "testapp", "127.0.0.1", 5432, @@ -861,7 +861,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT 1", )) // Return error to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)). WillReturnError(errMockQuerySamplesFailed) require.NoError(t, sampleCollector.Start(t.Context())) @@ -914,7 +914,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { require.NoError(t, err) // Scrape 1: two idle-only rows with different keys (PID/QueryID) - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns). AddRow( now, "testdb", 2200, sql.NullInt64{}, @@ -933,7 +933,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM b", )) // Scrape 2: same idle rows again -> should not re-emit - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)).RowsWillBeClosed(). WillReturnRows(sqlmock.NewRows(columns). AddRow( now, "testdb", 2200, sql.NullInt64{}, @@ -952,7 +952,7 @@ func TestQuerySamples_IdleScenarios(t *testing.T) { "SELECT * FROM b", )) // Return error to trigger finalization - mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, excludeCurrentUserClause)). + mock.ExpectQuery(fmt.Sprintf(selectPgStatActivity, queryTextClause, exclusionClause, excludeCurrentUserClause)). WillReturnError(errMockQuerySamplesFailed) require.NoError(t, sampleCollector.Start(t.Context())) @@ -1020,12 +1020,12 @@ func TestQuerySamples_ExcludeCurrentUser(t *testing.T) { { name: "ExcludeCurrentUser enabled", excludeCurrentUser: true, - expectedQuery: fmt.Sprintf(selectPgStatActivity, "", excludeCurrentUserClause), + expectedQuery: fmt.Sprintf(selectPgStatActivity, "", exclusionClause, excludeCurrentUserClause), }, { name: "ExcludeCurrentUser disabled", excludeCurrentUser: false, - expectedQuery: fmt.Sprintf(selectPgStatActivity, "", ""), + expectedQuery: fmt.Sprintf(selectPgStatActivity, "", exclusionClause, ""), }, } @@ -1086,3 +1086,83 @@ func TestQuerySamples_ExcludeCurrentUser(t *testing.T) { }) } } + +func TestQuerySamples_ExcludeDatabases(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + now := time.Now() + stateChangeTime := now.Add(-10 * time.Second) + queryStartTime := now.Add(-30 * time.Second) + xactStartTime := now.Add(-2 * time.Minute) + backendStartTime := now.Add(-1 * time.Hour) + + columns := []string{ + "now", "datname", "pid", "leader_pid", + "usename", "application_name", "client_addr", "client_port", + "backend_type", "backend_start", "backend_xid", "backend_xmin", + "xact_start", "state", "state_change", "wait_event_type", + "wait_event", "blocked_by_pids", "query_start", "query_id", + } + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + logBuffer := syncbuffer.Buffer{} + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + // Exclude "excluded_db" database + sampleCollector, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + CollectInterval: time.Millisecond, + ExcludeDatabases: []string{"excluded_db"}, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(log.NewSyncWriter(&logBuffer)), + }) + require.NoError(t, err) + require.NotNil(t, sampleCollector) + + // Verify the query uses the custom exclusion clause that includes both default and user-provided exclusions + expectedQuery := fmt.Sprintf(selectPgStatActivity, "", buildExcludedDatabasesClause([]string{"excluded_db"}), "") + + // First scrape: return only non-excluded database rows (simulating SQL-level filtering) + mock.ExpectQuery(expectedQuery).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns). + AddRow( + now, "included_db", 101, sql.NullInt64{}, + "testuser", "testapp", "127.0.0.1", 5432, + "client backend", backendStartTime, sql.NullInt32{Int32: 501, Valid: true}, sql.NullInt32{Int32: 401, Valid: true}, + xactStartTime, "active", stateChangeTime, sql.NullString{}, + sql.NullString{}, nil, queryStartTime, sql.NullInt64{Int64: 456, Valid: true}, + )) + + // Second scrape: empty to trigger finalization + mock.ExpectQuery(expectedQuery).RowsWillBeClosed(). + WillReturnRows(sqlmock.NewRows(columns)) + + err = sampleCollector.Start(t.Context()) + require.NoError(t, err) + + // Only the included_db sample should be emitted + require.Eventually(t, func() bool { + return len(lokiClient.Received()) == 1 + }, 5*time.Second, 100*time.Millisecond) + + entries := lokiClient.Received() + require.Len(t, entries, 1) + + // Verify only included_db logs were emitted + for _, entry := range entries { + require.Contains(t, entry.Line, "included_db", "included database should appear in logs") + } + + sampleCollector.Stop() + require.Eventually(t, func() bool { + return sampleCollector.Stopped() + }, 5*time.Second, 100*time.Millisecond) + + require.Eventually(t, func() bool { + return mock.ExpectationsWereMet() == nil + }, 5*time.Second, 100*time.Millisecond) +} diff --git a/internal/component/database_observability/postgres/collector/schema_details.go b/internal/component/database_observability/postgres/collector/schema_details.go index b0ed6ca4ba6..8e07336dc09 100644 --- a/internal/component/database_observability/postgres/collector/schema_details.go +++ b/internal/component/database_observability/postgres/collector/schema_details.go @@ -29,15 +29,13 @@ const ( ) const ( - excludedDatabases = `('azure_maintenance')` - // selectAllDatabases makes use of the initial DB connection to discover other databases on the same Postgres instance selectAllDatabases = ` SELECT datname FROM pg_database WHERE datistemplate = false AND has_database_privilege(datname, 'CONNECT') - AND datname NOT IN ` + excludedDatabases + AND datname NOT IN %s` // selectSchemaNames gets all user-defined schemas, excluding system schemas selectSchemaNames = ` @@ -213,9 +211,11 @@ type foreignKey struct { ReferencedColumnName string `json:"referenced_column_name"` } -type database string -type schema string -type table string +type ( + database string + schema string + table string +) // TableRegistry is a source-of-truth cache that keeps track of databases, schemas, tables type TableRegistry struct { @@ -285,10 +285,11 @@ func parseSchemaQualifiedIfAny(parsedTableName string) (schema, table) { } type SchemaDetailsArguments struct { - DB *sql.DB - DSN string - CollectInterval time.Duration - EntryHandler loki.EntryHandler + DB *sql.DB + DSN string + CollectInterval time.Duration + ExcludeDatabases []string + EntryHandler loki.EntryHandler CacheEnabled bool CacheSize int @@ -304,6 +305,7 @@ type SchemaDetails struct { dbDSN string dbConnectionFactory databaseConnectionFactory collectInterval time.Duration + excludeDatabases []string entryHandler loki.EntryHandler // Cache of table definitions. Entries are removed after a configurable TTL. @@ -330,6 +332,7 @@ func NewSchemaDetails(args SchemaDetailsArguments) (*SchemaDetails, error) { dbDSN: args.DSN, dbConnectionFactory: factory, collectInterval: args.CollectInterval, + excludeDatabases: args.ExcludeDatabases, entryHandler: args.EntryHandler, tableRegistry: NewTableRegistry(), logger: log.With(args.Logger, "collector", SchemaDetailsCollector), @@ -394,7 +397,8 @@ func (c *SchemaDetails) Stop() { } func (c *SchemaDetails) getAllDatabases(ctx context.Context) ([]string, error) { - rows, err := c.initialConnection.QueryContext(ctx, selectAllDatabases) + query := fmt.Sprintf(selectAllDatabases, buildExcludedDatabasesClause(c.excludeDatabases)) + rows, err := c.initialConnection.QueryContext(ctx, query) if err != nil { level.Error(c.logger).Log("msg", "failed to discover databases", "err", err) return nil, fmt.Errorf("failed to discover databases: %w", err) diff --git a/internal/component/database_observability/postgres/collector/schema_details_test.go b/internal/component/database_observability/postgres/collector/schema_details_test.go index e40836fe040..e26e6eceb40 100644 --- a/internal/component/database_observability/postgres/collector/schema_details_test.go +++ b/internal/component/database_observability/postgres/collector/schema_details_test.go @@ -50,7 +50,7 @@ func Test_Postgres_SchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -157,7 +157,7 @@ func Test_Postgres_SchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -372,7 +372,7 @@ func Test_Postgres_SchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - initialConnectionMock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + initialConnectionMock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{"datname"}). AddRow("db1"). @@ -462,7 +462,7 @@ func Test_Postgres_SchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -575,7 +575,7 @@ func Test_Postgres_SchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -631,7 +631,7 @@ func Test_Postgres_SchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -742,7 +742,7 @@ func Test_Postgres_SchemaDetails_collector_detects_auto_increment_column(t *test require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -848,7 +848,7 @@ func Test_Postgres_SchemaDetails_collector_detects_auto_increment_column(t *test require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -955,7 +955,7 @@ func Test_Postgres_SchemaDetails_collector_detects_auto_increment_column(t *test require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -1063,7 +1063,7 @@ func Test_Postgres_SchemaDetails_caching(t *testing.T) { // first run mock declarations // selectDatabaseName, selectSchemaNames, selectTableNames always called - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -1127,7 +1127,7 @@ func Test_Postgres_SchemaDetails_caching(t *testing.T) { // second run mock declarations // selectDatabaseName, selectSchemaNames, selectTableNames always called - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -1192,7 +1192,7 @@ func Test_Postgres_SchemaDetails_caching(t *testing.T) { // declare mocks for two runs for i := 0; i < 2; i++ { - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -1277,7 +1277,7 @@ func Test_Postgres_SchemaDetails_ErrorCases(t *testing.T) { db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) require.NoError(t, err) defer db.Close() - mock.ExpectQuery(selectAllDatabases). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)). WillReturnRows( sqlmock.NewRows([]string{"datname"}). AddRow("testdb"). @@ -1609,7 +1609,7 @@ func Test_SchemaDetails_populates_TableRegistry(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectAllDatabases).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectAllDatabases, exclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "datname", @@ -1707,11 +1707,37 @@ func Test_SchemaDetails_populates_TableRegistry(t *testing.T) { }) } -func Test_Postgres_SchemaDetails_query_excludes_databases(t *testing.T) { - assert.Equal(t, ` - SELECT datname - FROM pg_database - WHERE datistemplate = false - AND has_database_privilege(datname, 'CONNECT') - AND datname NOT IN ('azure_maintenance')`, selectAllDatabases) +func Test_Postgres_SchemaDetails_ExcludeDatabases(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + collector, err := NewSchemaDetails(SchemaDetailsArguments{ + DB: db, + DSN: "postgres://user:pass@localhost:5432/testdb", + CollectInterval: time.Millisecond, + ExcludeDatabases: []string{"excluded_database"}, + EntryHandler: lokiClient, + CacheEnabled: false, + Logger: log.NewLogfmtLogger(os.Stderr), + dbConnectionFactory: func(dsn string) (*sql.DB, error) { + return db, nil + }, + }) + require.NoError(t, err) + require.NotNil(t, collector) + + // Verify the query uses the custom exclusion clause that includes both default and user-provided exclusions + expectedQuery := fmt.Sprintf(selectAllDatabases, buildExcludedDatabasesClause([]string{"excluded_database"})) + mock.ExpectQuery(expectedQuery). + WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"datname"})) + + _, err = collector.getAllDatabases(context.Background()) + require.NoError(t, err) + require.NoError(t, mock.ExpectationsWereMet()) } diff --git a/internal/component/database_observability/postgres/component.go b/internal/component/database_observability/postgres/component.go index ff1ce72dc96..324a373945e 100644 --- a/internal/component/database_observability/postgres/component.go +++ b/internal/component/database_observability/postgres/component.go @@ -391,14 +391,15 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud if collectors[collector.SchemaDetailsCollector] { stCollector, err := collector.NewSchemaDetails(collector.SchemaDetailsArguments{ - DB: c.dbConnection, - DSN: string(c.args.DataSourceName), - CollectInterval: c.args.SchemaDetailsArguments.CollectInterval, - CacheEnabled: c.args.SchemaDetailsArguments.CacheEnabled, - CacheSize: c.args.SchemaDetailsArguments.CacheSize, - CacheTTL: c.args.SchemaDetailsArguments.CacheTTL, - EntryHandler: entryHandler, - Logger: c.opts.Logger, + DB: c.dbConnection, + DSN: string(c.args.DataSourceName), + CollectInterval: c.args.SchemaDetailsArguments.CollectInterval, + ExcludeDatabases: c.args.ExcludeDatabases, + CacheEnabled: c.args.SchemaDetailsArguments.CacheEnabled, + CacheSize: c.args.SchemaDetailsArguments.CacheSize, + CacheTTL: c.args.SchemaDetailsArguments.CacheTTL, + EntryHandler: entryHandler, + Logger: c.opts.Logger, }) if err != nil { logStartError(collector.SchemaDetailsCollector, "create", err) @@ -412,11 +413,12 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud if collectors[collector.QueryDetailsCollector] { qCollector, err := collector.NewQueryDetails(collector.QueryDetailsArguments{ - DB: c.dbConnection, - CollectInterval: c.args.QueryTablesArguments.CollectInterval, - EntryHandler: entryHandler, - TableRegistry: tableRegistry, - Logger: c.opts.Logger, + DB: c.dbConnection, + CollectInterval: c.args.QueryTablesArguments.CollectInterval, + ExcludeDatabases: c.args.ExcludeDatabases, + EntryHandler: entryHandler, + TableRegistry: tableRegistry, + Logger: c.opts.Logger, }) if err != nil { logStartError(collector.QueryDetailsCollector, "create", err) @@ -431,6 +433,7 @@ func (c *Component) startCollectors(systemID string, engineVersion string, cloud aCollector, err := collector.NewQuerySamples(collector.QuerySamplesArguments{ DB: c.dbConnection, CollectInterval: c.args.QuerySampleArguments.CollectInterval, + ExcludeDatabases: c.args.ExcludeDatabases, EntryHandler: entryHandler, Logger: c.opts.Logger, DisableQueryRedaction: c.args.QuerySampleArguments.DisableQueryRedaction, diff --git a/internal/component/database_observability/sql_exclusion.go b/internal/component/database_observability/sql_exclusion.go new file mode 100644 index 00000000000..5608bad5936 --- /dev/null +++ b/internal/component/database_observability/sql_exclusion.go @@ -0,0 +1,18 @@ +package database_observability + +import "strings" + +// BuildExclusionClause builds a SQL IN clause from a list of items. +func BuildExclusionClause(items []string) string { + escaped := make([]string, len(items)) + for i, item := range items { + escaped[i] = EscapeSQLString(item) + } + return "(" + strings.Join(escaped, ", ") + ")" +} + +// EscapeSQLString escapes single quotes by doubling them to prevent SQL injection. +func EscapeSQLString(s string) string { + escaped := strings.ReplaceAll(s, "'", "''") + return "'" + escaped + "'" +} diff --git a/internal/component/database_observability/sql_exclusion_test.go b/internal/component/database_observability/sql_exclusion_test.go new file mode 100644 index 00000000000..f75e40f1f57 --- /dev/null +++ b/internal/component/database_observability/sql_exclusion_test.go @@ -0,0 +1,84 @@ +package database_observability + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBuildExclusionClause(t *testing.T) { + tests := []struct { + name string + items []string + expected string + }{ + { + name: "single item", + items: []string{"information_schema"}, + expected: "('information_schema')", + }, + { + name: "multiple items", + items: []string{"information_schema", "performance_schema", "sys"}, + expected: "('information_schema', 'performance_schema', 'sys')", + }, + { + name: "items with special characters", + items: []string{"test'value", "normal"}, + expected: "('test''value', 'normal')", + }, + { + name: "empty slice", + items: []string{}, + expected: "()", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := BuildExclusionClause(tc.items) + require.Equal(t, tc.expected, result) + }) + } +} + +func TestEscapeSQLString(t *testing.T) { + tests := []struct { + name string + input string + expected string + }{ + { + name: "simple string", + input: "test", + expected: "'test'", + }, + { + name: "string with single quote", + input: "test'value", + expected: "'test''value'", + }, + { + name: "string with multiple single quotes", + input: "it's a test's value", + expected: "'it''s a test''s value'", + }, + { + name: "SQL injection attempt", + input: "'; DROP TABLE users; --", + expected: "'''; DROP TABLE users; --'", + }, + { + name: "empty string", + input: "", + expected: "''", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := EscapeSQLString(tc.input) + require.Equal(t, tc.expected, result) + }) + } +}