diff --git a/internal/component/database_observability/mysql/collector/constants.go b/internal/component/database_observability/mysql/collector/constants.go deleted file mode 100644 index 67464940fef..00000000000 --- a/internal/component/database_observability/mysql/collector/constants.go +++ /dev/null @@ -1,3 +0,0 @@ -package collector - -const EXCLUDED_SCHEMAS = `('mysql', 'performance_schema', 'sys', 'information_schema')` diff --git a/internal/component/database_observability/mysql/collector/exclude_schemas.go b/internal/component/database_observability/mysql/collector/exclude_schemas.go new file mode 100644 index 00000000000..396701f7046 --- /dev/null +++ b/internal/component/database_observability/mysql/collector/exclude_schemas.go @@ -0,0 +1,33 @@ +package collector + +import "strings" + +var defaultExcludedSchemas = []string{"mysql", "performance_schema", "sys", "information_schema"} + +var defaultExclusionClause = buildExclusionClause(defaultExcludedSchemas) + +func buildExcludedSchemasClause(excludedSchemas []string) string { + if len(excludedSchemas) == 0 { + return defaultExclusionClause + } + + allSchemas := make([]string, 0, len(defaultExcludedSchemas)+len(excludedSchemas)) + allSchemas = append(allSchemas, defaultExcludedSchemas...) + allSchemas = append(allSchemas, excludedSchemas...) + + return buildExclusionClause(allSchemas) +} + +func buildExclusionClause(schemas []string) string { + escaped := make([]string, len(schemas)) + for i, schema := range schemas { + escaped[i] = escapeSQLString(schema) + } + return "(" + strings.Join(escaped, ", ") + ")" +} + +// escapeSQLString escapes single quotes by doubling them to prevent SQL injection. +func escapeSQLString(s string) string { + escaped := strings.ReplaceAll(s, "'", "''") + return "'" + escaped + "'" +} diff --git a/internal/component/database_observability/mysql/collector/exclude_schemas_test.go b/internal/component/database_observability/mysql/collector/exclude_schemas_test.go new file mode 100644 index 00000000000..0bcfe4d3aaf --- /dev/null +++ b/internal/component/database_observability/mysql/collector/exclude_schemas_test.go @@ -0,0 +1,58 @@ +package collector + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestBuildExcludedSchemasClause(t *testing.T) { + tests := []struct { + name string + userExcludedSchemas []string + expected string + }{ + { + name: "nil user schemas returns default schemas", + userExcludedSchemas: nil, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema')", + }, + { + name: "empty user schemas returns default schemas", + userExcludedSchemas: []string{}, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema')", + }, + { + name: "single user schema is appended to default schemas", + userExcludedSchemas: []string{"my_schema"}, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'my_schema')", + }, + { + name: "multiple user schemas are appended to default schemas", + userExcludedSchemas: []string{"schema1", "schema2", "schema3"}, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'schema1', 'schema2', 'schema3')", + }, + { + name: "schema with single quote is escaped to prevent SQL injection", + userExcludedSchemas: []string{"test'schema"}, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'test''schema')", + }, + { + name: "schema with SQL injection attempt is escaped", + userExcludedSchemas: []string{"'; DROP TABLE users; --"}, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema', '''; DROP TABLE users; --')", + }, + { + name: "schema with multiple single quotes is escaped", + userExcludedSchemas: []string{"it's a test's schema"}, + expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'it''s a test''s schema')", + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + result := buildExcludedSchemasClause(tc.userExcludedSchemas) + require.Equal(t, tc.expected, result) + }) + } +} diff --git a/internal/component/database_observability/mysql/collector/explain_plans.go b/internal/component/database_observability/mysql/collector/explain_plans.go index c48e04f54a7..0d65c066738 100644 --- a/internal/component/database_observability/mysql/collector/explain_plans.go +++ b/internal/component/database_observability/mysql/collector/explain_plans.go @@ -8,7 +8,6 @@ import ( "encoding/json" "fmt" "math" - "slices" "strconv" "strings" "time" @@ -42,7 +41,7 @@ const selectDigestsForExplainPlan = ` WHERE LAST_SEEN > ? AND QUERY_SAMPLE_TEXT IS NOT NULL AND DIGEST IS NOT NULL - AND SCHEMA_NAME NOT IN ` + EXCLUDED_SCHEMAS + AND SCHEMA_NAME NOT IN %s` const selectExplainPlanPrefix = `EXPLAIN FORMAT=JSON ` @@ -520,7 +519,8 @@ func (c *ExplainPlans) Stop() { } func (c *ExplainPlans) populateQueryCache(ctx context.Context) error { - rs, err := c.dbConnection.QueryContext(ctx, selectDigestsForExplainPlan, c.lastSeen) + query := fmt.Sprintf(selectDigestsForExplainPlan, buildExcludedSchemasClause(c.excludeSchemas)) + rs, err := c.dbConnection.QueryContext(ctx, query, c.lastSeen) if err != nil { level.Error(c.logger).Log("msg", "failed to fetch digests for explain plans", "err", err) return err @@ -536,23 +536,6 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error { level.Error(c.logger).Log("msg", "failed to scan digest for explain plans", "err", err) return err } - if slices.ContainsFunc(c.excludeSchemas, func(schema string) bool { - return strings.EqualFold(schema, schemaName) - }) { - - err := c.sendExplainPlansOutput( - schemaName, - digest, - generatedAt, - database_observability.ExplainProcessingResultSkipped, - "query belongs to excluded schema", - nil, - ) - if err != nil { - level.Error(c.logger).Log("msg", "failed to send excluded schema skip explain plan output", "err", err) - } - continue - } qi := newQueryInfo(schemaName, digest, queryText) if _, ok := c.queryDenylist[qi.uniqueKey]; !ok { diff --git a/internal/component/database_observability/mysql/collector/explain_plans_test.go b/internal/component/database_observability/mysql/collector/explain_plans_test.go index 89c8e25b710..9a8a6db7e60 100644 --- a/internal/component/database_observability/mysql/collector/explain_plans_test.go +++ b/internal/component/database_observability/mysql/collector/explain_plans_test.go @@ -1529,7 +1529,7 @@ func TestExplainPlans(t *testing.T) { t.Run("uses argument value on first request", func(t *testing.T) { nextSeen := lastSeen.Add(time.Second * 45) - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_text", @@ -1551,7 +1551,7 @@ func TestExplainPlans(t *testing.T) { }) t.Run("uses oldest last seen value on subsequent requests", func(t *testing.T) { - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_text", @@ -1594,7 +1594,7 @@ func TestExplainPlans(t *testing.T) { t.Run("skips truncated queries", func(t *testing.T) { logBuffer.Reset() - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1629,7 +1629,7 @@ func TestExplainPlans(t *testing.T) { t.Run("skips non-select queries", func(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1678,7 +1678,7 @@ func TestExplainPlans(t *testing.T) { t.Run("skips no row result", func(t *testing.T) { logBuffer.Reset() - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1710,7 +1710,7 @@ func TestExplainPlans(t *testing.T) { t.Run("passes queries beginning in select", func(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1747,7 +1747,7 @@ func TestExplainPlans(t *testing.T) { t.Run("passes queries beginning in with", func(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1809,7 +1809,7 @@ func TestQueryFailureDenylist(t *testing.T) { }) require.NoError(t, err) - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1841,7 +1841,7 @@ func TestQueryFailureDenylist(t *testing.T) { lokiClient.Clear() logBuffer.Reset() - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", @@ -1899,17 +1899,12 @@ func TestSchemaDenylist(t *testing.T) { }) require.NoError(t, err) - mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, buildExcludedSchemasClause([]string{"some_schema"}))).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ "schema_name", "digest", "query_sample_text", "last_seen", }).AddRow( - "some_schema", - "some_digest1", - "select * from some_table where id = 1", - lastSeen, - ).AddRow( "different_schema", "some_digest2", "select * from some_table where id = 2", diff --git a/internal/component/database_observability/mysql/collector/query_details.go b/internal/component/database_observability/mysql/collector/query_details.go index 53a7b8a8d21..7546d0ba0a9 100644 --- a/internal/component/database_observability/mysql/collector/query_details.go +++ b/internal/component/database_observability/mysql/collector/query_details.go @@ -39,6 +39,7 @@ type QueryDetailsArguments struct { DB *sql.DB CollectInterval time.Duration StatementsLimit int + ExcludeSchemas []string EntryHandler loki.EntryHandler Logger log.Logger @@ -48,6 +49,7 @@ type QueryDetails struct { dbConnection *sql.DB collectInterval time.Duration statementsLimit int + excludeSchemas []string entryHandler loki.EntryHandler sqlParser parser.Parser normalizer *sqllexer.Normalizer @@ -63,6 +65,7 @@ func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) { dbConnection: args.DB, collectInterval: args.CollectInterval, statementsLimit: args.StatementsLimit, + excludeSchemas: args.ExcludeSchemas, entryHandler: args.EntryHandler, sqlParser: parser.NewTiDBSqlParser(), normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true)), @@ -120,7 +123,7 @@ func (c *QueryDetails) Stop() { } func (c *QueryDetails) tablesFromEventsStatements(ctx context.Context) error { - query := fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, c.statementsLimit) + query := fmt.Sprintf(selectQueryTablesSamples, buildExcludedSchemasClause(c.excludeSchemas), c.statementsLimit) rs, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to fetch summary table samples: %w", err) diff --git a/internal/component/database_observability/mysql/collector/query_details_test.go b/internal/component/database_observability/mysql/collector/query_details_test.go index 9ef19960d41..5cd25cc42ca 100644 --- a/internal/component/database_observability/mysql/collector/query_details_test.go +++ b/internal/component/database_observability/mysql/collector/query_details_test.go @@ -378,7 +378,7 @@ func TestQueryTables(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -439,7 +439,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", // not enough columns @@ -447,7 +447,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { "abc123", )) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -505,7 +505,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -568,9 +568,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) - mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -609,3 +609,33 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { require.Equal(t, `level="info" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line) }) } + +func TestQueryDetailsExcludeSchemas(t *testing.T) { + defer goleak.VerifyNone(t) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + c, err := NewQueryDetails(QueryDetailsArguments{ + DB: db, + CollectInterval: time.Millisecond, + StatementsLimit: 250, + ExcludeSchemas: []string{"excluded_schema"}, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + + // Verify the query uses the custom exclusion clause + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, buildExcludedSchemasClause([]string{"excluded_schema"}), 250)). + WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "digest", "digest_text", "schema_name", "query_sample_text", + })) + + c.tablesFromEventsStatements(t.Context()) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/component/database_observability/mysql/collector/query_samples.go b/internal/component/database_observability/mysql/collector/query_samples.go index 4b6ca5f6512..0fa561c21eb 100644 --- a/internal/component/database_observability/mysql/collector/query_samples.go +++ b/internal/component/database_observability/mysql/collector/query_samples.go @@ -67,8 +67,8 @@ LEFT JOIN AND statements.EVENT_ID = waits.NESTING_EVENT_ID WHERE statements.DIGEST IS NOT NULL - AND statements.CURRENT_SCHEMA NOT IN ` + EXCLUDED_SCHEMAS + - ` %s %s` + AND statements.CURRENT_SCHEMA NOT IN %s + %s %s` const updateSetupConsumers = ` UPDATE performance_schema.setup_consumers @@ -79,6 +79,7 @@ type QuerySamplesArguments struct { DB *sql.DB EngineVersion semver.Version CollectInterval time.Duration + ExcludeSchemas []string EntryHandler loki.EntryHandler DisableQueryRedaction bool AutoEnableSetupConsumers bool @@ -91,6 +92,7 @@ type QuerySamples struct { dbConnection *sql.DB engineVersion semver.Version collectInterval time.Duration + excludeSchemas []string entryHandler loki.EntryHandler disableQueryRedaction bool autoEnableSetupConsumers bool @@ -110,6 +112,7 @@ func NewQuerySamples(args QuerySamplesArguments) (*QuerySamples, error) { dbConnection: args.DB, engineVersion: args.EngineVersion, collectInterval: args.CollectInterval, + excludeSchemas: args.ExcludeSchemas, entryHandler: args.EntryHandler, disableQueryRedaction: args.DisableQueryRedaction, autoEnableSetupConsumers: args.AutoEnableSetupConsumers, @@ -232,15 +235,17 @@ func (c *QuerySamples) fetchQuerySamples(ctx context.Context) error { textNotNullClause = digestTextNotNullClause } + excludedSchemasClause := buildExcludedSchemasClause(c.excludeSchemas) + query := "" if semver.MustParseRange("<8.0.28")(c.engineVersion) { - query = fmt.Sprintf(selectQuerySamples, textField, textNotNullClause, timerClause) + query = fmt.Sprintf(selectQuerySamples, textField, excludedSchemasClause, textNotNullClause, timerClause) } else if semver.MustParseRange("<8.0.31")(c.engineVersion) { additionalFields := cpuTimeField + textField - query = fmt.Sprintf(selectQuerySamples, additionalFields, textNotNullClause, timerClause) + query = fmt.Sprintf(selectQuerySamples, additionalFields, excludedSchemasClause, textNotNullClause, timerClause) } else { additionalFields := cpuTimeField + maxControlledMemoryField + maxTotalMemoryField + textField - query = fmt.Sprintf(selectQuerySamples, additionalFields, textNotNullClause, timerClause) + query = fmt.Sprintf(selectQuerySamples, additionalFields, excludedSchemasClause, textNotNullClause, timerClause) } rs, err := c.dbConnection.QueryContext(ctx, query, c.timerBookmark, limit) diff --git a/internal/component/database_observability/mysql/collector/query_samples_test.go b/internal/component/database_observability/mysql/collector/query_samples_test.go index 38dc05f9af8..63a87e33c4f 100644 --- a/internal/component/database_observability/mysql/collector/query_samples_test.go +++ b/internal/component/database_observability/mysql/collector/query_samples_test.go @@ -159,7 +159,7 @@ func TestQuerySamples(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 1e12, ).RowsWillBeClosed(). @@ -243,7 +243,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -338,7 +338,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -505,7 +505,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { mock.ExpectQuery(selectUptime).WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"uptime"}).AddRow("1")) mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -641,7 +641,7 @@ func TestQuerySamples_WaitEvents(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, sqlTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, defaultExclusionClause, sqlTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 1e12, ).RowsWillBeClosed(). @@ -757,7 +757,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, sqlTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField+sqlTextField, defaultExclusionClause, sqlTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -865,7 +865,7 @@ func TestQuerySamples_DisableQueryRedaction(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1126,7 +1126,7 @@ func TestQuerySamplesMySQLVersions(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, tc.expectedFields, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, tc.expectedFields, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 1e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows(tc.expectedColumns).AddRow(tc.scanValues...), ) @@ -1194,7 +1194,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1214,7 +1214,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -1322,7 +1322,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 2e12).RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(1e12, 2e12).RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "statements.CURRENT_SCHEMA", @@ -1448,7 +1448,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 2e12, ).WillReturnError(fmt.Errorf("connection error")) @@ -1462,7 +1462,7 @@ func TestQuerySamples_SQLDriverErrors(t *testing.T) { 2, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 2e12, ).RowsWillBeClosed(). @@ -1590,7 +1590,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 5, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 5e12, // uptime of 5 seconds in picoseconds (modulo 0 overflows) ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1684,7 +1684,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 5, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, // initial timerBookmark 5e12, // uptime of 5 seconds in picoseconds (modulo 0 overflows) ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1735,7 +1735,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( // asserts that beginningAndEndOfTimeline clause is used + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( // asserts that beginningAndEndOfTimeline clause is used 3e12, 10e12, // uptimeLimit is calculated as uptime "modulo" overflows: (uptime - 1 overflow) in picoseconds ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1785,7 +1785,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, beginningAndEndOfTimeline)).WithArgs( 3e12, 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1826,7 +1826,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { picosecondsToSeconds(math.MaxUint64)+13, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( // asserts revert to endOfTimeline clause + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( // asserts revert to endOfTimeline clause 10e12, // asserts timerBookmark has been updated to the previous uptimeLimit 13e12, // asserts uptimeLimit is now updated to the current uptime "modulo" overflows ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1868,7 +1868,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { 10, ), ) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( float64(0), 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1912,7 +1912,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { require.NoError(t, err) defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnError(fmt.Errorf("some error")) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( float64(0), 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -1970,7 +1970,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(picosecondsToSeconds(math.MaxUint64)+15, 10)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", digestTextNotNullClause, endOfTimeline)).WithArgs(3e12, 10e12).WillReturnError(fmt.Errorf("some error")) + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, "", defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs(3e12, 10e12).WillReturnError(fmt.Errorf("some error")) c := &QuerySamples{ dbConnection: db, @@ -1987,7 +1987,7 @@ func TestQuerySamples_handles_timer_overflows(t *testing.T) { require.NoError(t, err) defer db.Close() mock.ExpectQuery(selectNowAndUptime).WithoutArgs().WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(picosecondsToSeconds(math.MaxUint64)+15, 10)) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 2e12, 10e12, ).WillReturnRows(sqlmock.NewRows([]string{ @@ -2137,7 +2137,7 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { 1, )) - mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, digestTextNotNullClause, endOfTimeline)).WithArgs( + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, defaultExclusionClause, digestTextNotNullClause, endOfTimeline)).WithArgs( 1e12, 1e12, ).RowsWillBeClosed(). @@ -2255,3 +2255,43 @@ func TestQuerySamples_AutoEnableSetupConsumers(t *testing.T) { require.NoError(t, err) }) } + +func TestQuerySamplesExcludeSchemas(t *testing.T) { + defer goleak.VerifyNone(t) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + c, err := NewQuerySamples(QuerySamplesArguments{ + DB: db, + EngineVersion: latestCompatibleVersion, + CollectInterval: time.Millisecond, + ExcludeSchemas: []string{"excluded_schema"}, + EntryHandler: lokiClient, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + + // Initialize the timerBookmark as Start() would do + c.timerBookmark = 1e12 + + mock.ExpectQuery(selectNowAndUptime).WithoutArgs(). + WillReturnRows(sqlmock.NewRows([]string{"now", "uptime"}).AddRow(5, 1)) + + // Verify the query uses the custom exclusion clause + customClause := buildExcludedSchemasClause([]string{"excluded_schema"}) + mock.ExpectQuery(fmt.Sprintf(selectQuerySamples, cpuTimeField+maxControlledMemoryField+maxTotalMemoryField, customClause, digestTextNotNullClause, endOfTimeline)). + WithArgs(1e12, 1e12).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{ + "current_schema", "thread_id", "event_id", "end_event_id", "digest", + "timer_end", "timer_wait", "rows_examined", "rows_sent", "rows_affected", + "errors", "object_schema", "object_name", "object_type", "index_name", + "lock_time", "digest_text", "cpu_time", "max_controlled_memory", "max_total_memory", + })) + + c.fetchQuerySamples(t.Context()) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/component/database_observability/mysql/collector/schema_details.go b/internal/component/database_observability/mysql/collector/schema_details.go index 61452160abd..cab92fd7309 100644 --- a/internal/component/database_observability/mysql/collector/schema_details.go +++ b/internal/component/database_observability/mysql/collector/schema_details.go @@ -26,15 +26,15 @@ const ( OP_CREATE_STATEMENT = "create_statement" ) -const ( - selectSchemaName = ` +const selectSchemaNameTemplate = ` SELECT SCHEMA_NAME FROM information_schema.schemata WHERE - SCHEMA_NAME NOT IN ` + EXCLUDED_SCHEMAS + SCHEMA_NAME NOT IN %s` +const ( selectTableName = ` SELECT TABLE_NAME, @@ -98,6 +98,7 @@ const ( type SchemaDetailsArguments struct { DB *sql.DB CollectInterval time.Duration + ExcludeSchemas []string EntryHandler loki.EntryHandler CacheEnabled bool @@ -110,6 +111,7 @@ type SchemaDetailsArguments struct { type SchemaDetails struct { dbConnection *sql.DB collectInterval time.Duration + excludeSchemas []string entryHandler loki.EntryHandler // Cache of table definitions. Entries are removed after a configurable TTL. @@ -168,6 +170,7 @@ func NewSchemaDetails(args SchemaDetailsArguments) (*SchemaDetails, error) { c := &SchemaDetails{ dbConnection: args.DB, collectInterval: args.CollectInterval, + excludeSchemas: args.ExcludeSchemas, entryHandler: args.EntryHandler, logger: log.With(args.Logger, "collector", SchemaDetailsCollector), running: &atomic.Bool{}, @@ -227,7 +230,8 @@ func (c *SchemaDetails) Stop() { } func (c *SchemaDetails) extractSchema(ctx context.Context) error { - rs, err := c.dbConnection.QueryContext(ctx, selectSchemaName) + query := fmt.Sprintf(selectSchemaNameTemplate, buildExcludedSchemasClause(c.excludeSchemas)) + rs, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to query schemata: %w", err) } diff --git a/internal/component/database_observability/mysql/collector/schema_details_test.go b/internal/component/database_observability/mysql/collector/schema_details_test.go index 6f777ae9695..cdf2dd17377 100644 --- a/internal/component/database_observability/mysql/collector/schema_details_test.go +++ b/internal/component/database_observability/mysql/collector/schema_details_test.go @@ -40,7 +40,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -184,7 +184,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -331,7 +331,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -498,7 +498,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -645,7 +645,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -815,7 +815,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -962,7 +962,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -1049,7 +1049,7 @@ func TestSchemaDetails(t *testing.T) { // second loop, table info will be read from cache // and no further queries will be executed - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -1129,7 +1129,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "schema_name", @@ -1264,7 +1264,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows( + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnRows( sqlmock.NewRows( []string{"schema_name"}, ).AddRow( @@ -1313,7 +1313,7 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows( + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnRows( sqlmock.NewRows([]string{ "schema_name", }).AddRow( @@ -1381,9 +1381,9 @@ func TestSchemaDetails(t *testing.T) { require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) - mock.ExpectQuery(selectSchemaName).WithoutArgs().WillReturnRows( + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, defaultExclusionClause)).WithoutArgs().WillReturnRows( sqlmock.NewRows([]string{ "schema_name", }).AddRow( @@ -1489,3 +1489,31 @@ func TestSchemaDetails(t *testing.T) { require.Equal(t, fmt.Sprintf(`level="info" schema="some_schema" table="some_table" create_statement="%s" table_spec="%s"`, expectedCreateStmt, expectedTableSpec), lokiEntries[2].Line) }) } + +func TestSchemaDetailsExcludeSchemas(t *testing.T) { + defer goleak.VerifyNone(t, goleak.IgnoreTopFunction("github.com/hashicorp/golang-lru/v2/expirable.NewLRU[...].func1")) + + db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual)) + require.NoError(t, err) + defer db.Close() + + lokiClient := loki.NewCollectingHandler() + defer lokiClient.Stop() + + c, err := NewSchemaDetails(SchemaDetailsArguments{ + DB: db, + CollectInterval: time.Millisecond, + ExcludeSchemas: []string{"excluded_schema"}, + EntryHandler: lokiClient, + CacheEnabled: false, + Logger: log.NewLogfmtLogger(os.Stderr), + }) + require.NoError(t, err) + + // Verify the query uses the custom exclusion clause + mock.ExpectQuery(fmt.Sprintf(selectSchemaNameTemplate, buildExcludedSchemasClause([]string{"excluded_schema"}))). + WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{"schema_name"})) + + c.extractSchema(t.Context()) + require.NoError(t, mock.ExpectationsWereMet()) +} diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index 6cd784a5bd3..dedde49c201 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -450,6 +450,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse DB: c.dbConnection, CollectInterval: c.args.QueryDetailsArguments.CollectInterval, StatementsLimit: c.args.QueryDetailsArguments.StatementsLimit, + ExcludeSchemas: c.args.ExcludeSchemas, EntryHandler: entryHandler, Logger: c.opts.Logger, }) @@ -467,6 +468,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse stCollector, err := collector.NewSchemaDetails(collector.SchemaDetailsArguments{ DB: c.dbConnection, CollectInterval: c.args.SchemaDetailsArguments.CollectInterval, + ExcludeSchemas: c.args.ExcludeSchemas, CacheEnabled: c.args.SchemaDetailsArguments.CacheEnabled, CacheSize: c.args.SchemaDetailsArguments.CacheSize, CacheTTL: c.args.SchemaDetailsArguments.CacheTTL, @@ -492,6 +494,7 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse DB: c.dbConnection, EngineVersion: parsedEngineVersion, CollectInterval: c.args.QuerySamplesArguments.CollectInterval, + ExcludeSchemas: c.args.ExcludeSchemas, EntryHandler: entryHandler, Logger: c.opts.Logger, DisableQueryRedaction: c.args.QuerySamplesArguments.DisableQueryRedaction,