diff --git a/docs/sources/reference/components/database_observability/database_observability.mysql.md b/docs/sources/reference/components/database_observability/database_observability.mysql.md index 1aadce499d0..c4d4a1c075d 100644 --- a/docs/sources/reference/components/database_observability/database_observability.mysql.md +++ b/docs/sources/reference/components/database_observability/database_observability.mysql.md @@ -117,6 +117,7 @@ The `azure` block supplies the identifying information for the database being mo | Name | Type | Description | Default | Required | |--------------------|------------|------------------------------------------------------|---------|----------| | `collect_interval` | `duration` | How frequently to collect information from database. | `"1m"` | no | +| `statements_limit` | `integer` | Max number of recent queries to collect details for. | `250` | no | ### `schema_details` diff --git a/internal/component/database_observability/mysql/collector/query_details.go b/internal/component/database_observability/mysql/collector/query_details.go index c037e894da3..53a7b8a8d21 100644 --- a/internal/component/database_observability/mysql/collector/query_details.go +++ b/internal/component/database_observability/mysql/collector/query_details.go @@ -31,11 +31,14 @@ const selectQueryTablesSamples = ` query_sample_text FROM performance_schema.events_statements_summary_by_digest WHERE last_seen > DATE_SUB(NOW(), INTERVAL 1 DAY) - AND schema_name NOT IN ` + EXCLUDED_SCHEMAS + AND schema_name NOT IN %s + ORDER BY last_seen DESC + LIMIT %d` type QueryDetailsArguments struct { DB *sql.DB CollectInterval time.Duration + StatementsLimit int EntryHandler loki.EntryHandler Logger log.Logger @@ -44,6 +47,7 @@ type QueryDetailsArguments struct { type QueryDetails struct { dbConnection *sql.DB collectInterval time.Duration + statementsLimit int entryHandler loki.EntryHandler sqlParser parser.Parser normalizer *sqllexer.Normalizer @@ -58,6 +62,7 @@ func NewQueryDetails(args QueryDetailsArguments) (*QueryDetails, error) { c := &QueryDetails{ dbConnection: args.DB, collectInterval: args.CollectInterval, + statementsLimit: args.StatementsLimit, entryHandler: args.EntryHandler, sqlParser: parser.NewTiDBSqlParser(), normalizer: sqllexer.NewNormalizer(sqllexer.WithCollectTables(true)), @@ -115,7 +120,8 @@ func (c *QueryDetails) Stop() { } func (c *QueryDetails) tablesFromEventsStatements(ctx context.Context) error { - rs, err := c.dbConnection.QueryContext(ctx, selectQueryTablesSamples) + query := fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, c.statementsLimit) + rs, err := c.dbConnection.QueryContext(ctx, query) if err != nil { return fmt.Errorf("failed to fetch summary table samples: %w", err) } diff --git a/internal/component/database_observability/mysql/collector/query_details_test.go b/internal/component/database_observability/mysql/collector/query_details_test.go index 7220569573a..9ef19960d41 100644 --- a/internal/component/database_observability/mysql/collector/query_details_test.go +++ b/internal/component/database_observability/mysql/collector/query_details_test.go @@ -371,13 +371,14 @@ func TestQueryTables(t *testing.T) { collector, err := NewQueryDetails(QueryDetailsArguments{ DB: db, CollectInterval: time.Second, + StatementsLimit: 250, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), }) require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueryTablesSamples).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -431,13 +432,14 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { collector, err := NewQueryDetails(QueryDetailsArguments{ DB: db, CollectInterval: time.Second, + StatementsLimit: 250, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), }) require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueryTablesSamples).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", // not enough columns @@ -445,7 +447,7 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { "abc123", )) - mock.ExpectQuery(selectQueryTablesSamples).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -496,13 +498,14 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { collector, err := NewQueryDetails(QueryDetailsArguments{ DB: db, CollectInterval: time.Second, + StatementsLimit: 250, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), }) require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueryTablesSamples).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", @@ -558,15 +561,16 @@ func TestQueryTablesSQLDriverErrors(t *testing.T) { collector, err := NewQueryDetails(QueryDetailsArguments{ DB: db, CollectInterval: time.Second, + StatementsLimit: 250, EntryHandler: lokiClient, Logger: log.NewLogfmtLogger(os.Stderr), }) require.NoError(t, err) require.NotNil(t, collector) - mock.ExpectQuery(selectQueryTablesSamples).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().WillReturnError(fmt.Errorf("connection error")) - mock.ExpectQuery(selectQueryTablesSamples).WithoutArgs().RowsWillBeClosed(). + mock.ExpectQuery(fmt.Sprintf(selectQueryTablesSamples, EXCLUDED_SCHEMAS, 250)).WithoutArgs().RowsWillBeClosed(). WillReturnRows( sqlmock.NewRows([]string{ "digest", diff --git a/internal/component/database_observability/mysql/component.go b/internal/component/database_observability/mysql/component.go index d64d3ad1246..940ef86c8a5 100644 --- a/internal/component/database_observability/mysql/component.go +++ b/internal/component/database_observability/mysql/component.go @@ -65,8 +65,8 @@ type Arguments struct { CloudProvider *CloudProvider `alloy:"cloud_provider,block,optional"` SetupConsumersArguments SetupConsumersArguments `alloy:"setup_consumers,block,optional"` SetupActorsArguments SetupActorsArguments `alloy:"setup_actors,block,optional"` - QueryTablesArguments QueryTablesArguments `alloy:"query_details,block,optional"` - SchemaTablesArguments SchemaDetailsArguments `alloy:"schema_details,block,optional"` + QueryDetailsArguments QueryDetailsArguments `alloy:"query_details,block,optional"` + SchemaDetailsArguments SchemaDetailsArguments `alloy:"schema_details,block,optional"` ExplainPlansArguments ExplainPlansArguments `alloy:"explain_plans,block,optional"` LocksArguments LocksArguments `alloy:"locks,block,optional"` QuerySamplesArguments QuerySamplesArguments `alloy:"query_samples,block,optional"` @@ -88,8 +88,9 @@ type AzureCloudProviderInfo struct { ServerName string `alloy:"server_name,attr,optional"` } -type QueryTablesArguments struct { +type QueryDetailsArguments struct { CollectInterval time.Duration `alloy:"collect_interval,attr,optional"` + StatementsLimit int `alloy:"statements_limit,attr,optional"` } type SchemaDetailsArguments struct { @@ -134,11 +135,12 @@ type HealthCheckArguments struct { var DefaultArguments = Arguments{ AllowUpdatePerfSchemaSettings: false, - QueryTablesArguments: QueryTablesArguments{ + QueryDetailsArguments: QueryDetailsArguments{ CollectInterval: 1 * time.Minute, + StatementsLimit: 250, }, - SchemaTablesArguments: SchemaDetailsArguments{ + SchemaDetailsArguments: SchemaDetailsArguments{ CollectInterval: 1 * time.Minute, CacheEnabled: true, CacheSize: 256, @@ -445,7 +447,8 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse if collectors[collector.QueryDetailsCollector] { qtCollector, err := collector.NewQueryDetails(collector.QueryDetailsArguments{ DB: c.dbConnection, - CollectInterval: c.args.QueryTablesArguments.CollectInterval, + CollectInterval: c.args.QueryDetailsArguments.CollectInterval, + StatementsLimit: c.args.QueryDetailsArguments.StatementsLimit, EntryHandler: entryHandler, Logger: c.opts.Logger, }) @@ -462,10 +465,10 @@ func (c *Component) startCollectors(serverID string, engineVersion string, parse if collectors[collector.SchemaDetailsCollector] { stCollector, err := collector.NewSchemaDetails(collector.SchemaDetailsArguments{ DB: c.dbConnection, - CollectInterval: c.args.SchemaTablesArguments.CollectInterval, - CacheEnabled: c.args.SchemaTablesArguments.CacheEnabled, - CacheSize: c.args.SchemaTablesArguments.CacheSize, - CacheTTL: c.args.SchemaTablesArguments.CacheTTL, + CollectInterval: c.args.SchemaDetailsArguments.CollectInterval, + CacheEnabled: c.args.SchemaDetailsArguments.CacheEnabled, + CacheSize: c.args.SchemaDetailsArguments.CacheSize, + CacheTTL: c.args.SchemaDetailsArguments.CacheTTL, EntryHandler: entryHandler, Logger: c.opts.Logger, })