Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package collector

import "strings"

var defaultExcludedSchemas = []string{"mysql", "performance_schema", "sys", "information_schema"}

var defaultExclusionClause = buildExclusionClause(defaultExcludedSchemas)

func buildExcludedSchemasClause(excludedSchemas []string) string {
if len(excludedSchemas) == 0 {
return defaultExclusionClause
}

allSchemas := make([]string, 0, len(defaultExcludedSchemas)+len(excludedSchemas))
allSchemas = append(allSchemas, defaultExcludedSchemas...)
allSchemas = append(allSchemas, excludedSchemas...)

return buildExclusionClause(allSchemas)
}

func buildExclusionClause(schemas []string) string {
escaped := make([]string, len(schemas))
for i, schema := range schemas {
escaped[i] = escapeSQLString(schema)
}
return "(" + strings.Join(escaped, ", ") + ")"
}

// escapeSQLString escapes single quotes by doubling them to prevent SQL injection.
func escapeSQLString(s string) string {
escaped := strings.ReplaceAll(s, "'", "''")
return "'" + escaped + "'"
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package collector

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBuildExcludedSchemasClause(t *testing.T) {
tests := []struct {
name string
userExcludedSchemas []string
expected string
}{
{
name: "nil user schemas returns default schemas",
userExcludedSchemas: nil,
expected: "('mysql', 'performance_schema', 'sys', 'information_schema')",
},
{
name: "empty user schemas returns default schemas",
userExcludedSchemas: []string{},
expected: "('mysql', 'performance_schema', 'sys', 'information_schema')",
},
{
name: "single user schema is appended to default schemas",
userExcludedSchemas: []string{"my_schema"},
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'my_schema')",
},
{
name: "multiple user schemas are appended to default schemas",
userExcludedSchemas: []string{"schema1", "schema2", "schema3"},
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'schema1', 'schema2', 'schema3')",
},
{
name: "schema with single quote is escaped to prevent SQL injection",
userExcludedSchemas: []string{"test'schema"},
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'test''schema')",
},
{
name: "schema with SQL injection attempt is escaped",
userExcludedSchemas: []string{"'; DROP TABLE users; --"},
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', '''; DROP TABLE users; --')",
},
{
name: "schema with multiple single quotes is escaped",
userExcludedSchemas: []string{"it's a test's schema"},
expected: "('mysql', 'performance_schema', 'sys', 'information_schema', 'it''s a test''s schema')",
},
}

for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
result := buildExcludedSchemasClause(tc.userExcludedSchemas)
require.Equal(t, tc.expected, result)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"encoding/json"
"fmt"
"math"
"slices"
"strconv"
"strings"
"time"
Expand Down Expand Up @@ -42,7 +41,7 @@ const selectDigestsForExplainPlan = `
WHERE LAST_SEEN > ?
AND QUERY_SAMPLE_TEXT IS NOT NULL
AND DIGEST IS NOT NULL
AND SCHEMA_NAME NOT IN ` + EXCLUDED_SCHEMAS
AND SCHEMA_NAME NOT IN %s`

const selectExplainPlanPrefix = `EXPLAIN FORMAT=JSON `

Expand Down Expand Up @@ -520,7 +519,8 @@ func (c *ExplainPlans) Stop() {
}

func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {
rs, err := c.dbConnection.QueryContext(ctx, selectDigestsForExplainPlan, c.lastSeen)
query := fmt.Sprintf(selectDigestsForExplainPlan, buildExcludedSchemasClause(c.excludeSchemas))
rs, err := c.dbConnection.QueryContext(ctx, query, c.lastSeen)
if err != nil {
level.Error(c.logger).Log("msg", "failed to fetch digests for explain plans", "err", err)
return err
Expand All @@ -536,23 +536,6 @@ func (c *ExplainPlans) populateQueryCache(ctx context.Context) error {
level.Error(c.logger).Log("msg", "failed to scan digest for explain plans", "err", err)
return err
}
if slices.ContainsFunc(c.excludeSchemas, func(schema string) bool {
return strings.EqualFold(schema, schemaName)
}) {

err := c.sendExplainPlansOutput(
schemaName,
digest,
generatedAt,
database_observability.ExplainProcessingResultSkipped,
"query belongs to excluded schema",
nil,
)
if err != nil {
level.Error(c.logger).Log("msg", "failed to send excluded schema skip explain plan output", "err", err)
}
continue
}

qi := newQueryInfo(schemaName, digest, queryText)
if _, ok := c.queryDenylist[qi.uniqueKey]; !ok {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1529,7 +1529,7 @@ func TestExplainPlans(t *testing.T) {

t.Run("uses argument value on first request", func(t *testing.T) {
nextSeen := lastSeen.Add(time.Second * 45)
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_text",
Expand All @@ -1551,7 +1551,7 @@ func TestExplainPlans(t *testing.T) {
})

t.Run("uses oldest last seen value on subsequent requests", func(t *testing.T) {
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_text",
Expand Down Expand Up @@ -1594,7 +1594,7 @@ func TestExplainPlans(t *testing.T) {

t.Run("skips truncated queries", func(t *testing.T) {
logBuffer.Reset()
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1629,7 +1629,7 @@ func TestExplainPlans(t *testing.T) {
t.Run("skips non-select queries", func(t *testing.T) {
lokiClient.Clear()
logBuffer.Reset()
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1678,7 +1678,7 @@ func TestExplainPlans(t *testing.T) {

t.Run("skips no row result", func(t *testing.T) {
logBuffer.Reset()
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1710,7 +1710,7 @@ func TestExplainPlans(t *testing.T) {
t.Run("passes queries beginning in select", func(t *testing.T) {
lokiClient.Clear()
logBuffer.Reset()
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1747,7 +1747,7 @@ func TestExplainPlans(t *testing.T) {
t.Run("passes queries beginning in with", func(t *testing.T) {
lokiClient.Clear()
logBuffer.Reset()
mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1809,7 +1809,7 @@ func TestQueryFailureDenylist(t *testing.T) {
})
require.NoError(t, err)

mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1841,7 +1841,7 @@ func TestQueryFailureDenylist(t *testing.T) {
lokiClient.Clear()
logBuffer.Reset()

mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, defaultExclusionClause)).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
Expand Down Expand Up @@ -1899,17 +1899,12 @@ func TestSchemaDenylist(t *testing.T) {
})
require.NoError(t, err)

mock.ExpectQuery(selectDigestsForExplainPlan).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
mock.ExpectQuery(fmt.Sprintf(selectDigestsForExplainPlan, buildExcludedSchemasClause([]string{"some_schema"}))).WithArgs(lastSeen).RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"schema_name",
"digest",
"query_sample_text",
"last_seen",
}).AddRow(
"some_schema",
"some_digest1",
"select * from some_table where id = 1",
lastSeen,
).AddRow(
"different_schema",
"some_digest2",
"select * from some_table where id = 2",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ type QueryDetailsArguments struct {
DB *sql.DB
CollectInterval time.Duration
StatementsLimit int
ExcludeSchemas []string
EntryHandler loki.EntryHandler

Logger log.Logger
Expand All @@ -48,6 +49,7 @@ type QueryDetails struct {
dbConnection *sql.DB
collectInterval time.Duration
statementsLimit int
excludeSchemas []string
entryHandler loki.EntryHandler
sqlParser parser.Parser
normalizer *sqllexer.Normalizer
Expand All @@ -63,6 +65,7 @@ func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) {
dbConnection: args.DB,
collectInterval: args.CollectInterval,
statementsLimit: args.StatementsLimit,
excludeSchemas: args.ExcludeSchemas,
entryHandler: args.EntryHandler,
sqlParser: parser.NewTiDBSqlParser(),
normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true)),
Expand Down Expand Up @@ -120,7 +123,7 @@ func (c *QueryDetails) Stop() {
}

func (c *QueryDetails) tablesFromEventsStatements(ctx context.Context) error {
query := fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, c.statementsLimit)
query := fmt.Sprintf(selectQueryTablesSamples, buildExcludedSchemasClause(c.excludeSchemas), c.statementsLimit)
rs, err := c.dbConnection.QueryContext(ctx, query)
if err != nil {
return fmt.Errorf("failed to fetch summary table samples: %w", err)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -378,7 +378,7 @@ func TestQueryTables(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"digest",
Expand Down Expand Up @@ -439,15 +439,15 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"digest", // not enough columns
}).AddRow(
"abc123",
))

mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"digest",
Expand Down Expand Up @@ -505,7 +505,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"digest",
Expand Down Expand Up @@ -568,9 +568,9 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, collector)

mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error"))
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error"))

mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed().
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, defaultExclusionClause, 250)).WithoutArgs().RowsWillBeClosed().
WillReturnRows(
sqlmock.NewRows([]string{
"digest",
Expand Down Expand Up @@ -609,3 +609,33 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) {
require.Equal(t, `level="info" schema="some_schema" digest="abc123" table="some_table"`, lokiEntries[1].Line)
})
}

func TestQueryDetailsExcludeSchemas(t *testing.T) {
defer goleak.VerifyNone(t)

db, mock, err := sqlmock.New(sqlmock.QueryMatcherOption(sqlmock.QueryMatcherEqual))
require.NoError(t, err)
defer db.Close()

lokiClient := loki.NewCollectingHandler()
defer lokiClient.Stop()

c, err := NewQueryDetails(QueryDetailsArguments{
DB: db,
CollectInterval: time.Millisecond,
StatementsLimit: 250,
ExcludeSchemas: []string{"excluded_schema"},
EntryHandler: lokiClient,
Logger: log.NewLogfmtLogger(os.Stderr),
})
require.NoError(t, err)

// Verify the query uses the custom exclusion clause
mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, buildExcludedSchemasClause([]string{"excluded_schema"}), 250)).
WithoutArgs().RowsWillBeClosed().WillReturnRows(sqlmock.NewRows([]string{
"digest", "digest_text", "schema_name", "query_sample_text",
}))

c.tablesFromEventsStatements(t.Context())
require.NoError(t, mock.ExpectationsWereMet())
}
Loading
Loading