diff --git a/.chloggen/add-row-condition.yaml b/.chloggen/add-row-condition.yaml new file mode 100644 index 0000000000000..dc1bdcf7bb386 --- /dev/null +++ b/.chloggen/add-row-condition.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/sqlquery + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add `row_condition` to metric configuration for filtering result rows by column value" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [45862] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Enables extracting individual metrics from pivot-style result sets where each row + represents a different metric (e.g. pgbouncer's `SHOW LISTS` command). When + `row_condition` is configured on a metric, only rows where the specified column + equals the specified value are used; all other rows are silently skipped. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/internal/sqlquery/config.go b/internal/sqlquery/config.go index af1348ddd2ff6..82f164e6fa47c 100644 --- a/internal/sqlquery/config.go +++ b/internal/sqlquery/config.go @@ -125,6 +125,16 @@ func (config LogsCfg) Validate() error { return errors.Join(errs...) } +// RowCondition filters query result rows for a metric. Only rows where the +// specified column equals the specified value are used to produce the metric. +// This is useful when a single query returns a pivot-style result set (e.g. +// pgbouncer's SHOW LISTS) where each row represents a different metric and +// must be selected individually. +type RowCondition struct { + Column string `mapstructure:"column"` + Value string `mapstructure:"value"` +} + type MetricCfg struct { MetricName string `mapstructure:"metric_name"` ValueColumn string `mapstructure:"value_column"` @@ -138,6 +148,7 @@ type MetricCfg struct { StaticAttributes map[string]string `mapstructure:"static_attributes"` StartTsColumn string `mapstructure:"start_ts_column"` TsColumn string `mapstructure:"ts_column"` + RowCondition *RowCondition `mapstructure:"row_condition"` } func (c MetricCfg) Validate() error { @@ -160,6 +171,11 @@ func (c MetricCfg) Validate() error { if c.DataType == MetricTypeGauge && c.Aggregation != "" { errs = append(errs, fmt.Errorf("aggregation=%s but data_type=%s does not support aggregation", c.Aggregation, c.DataType)) } + if c.RowCondition != nil { + if c.RowCondition.Column == "" { + errs = append(errs, errors.New("'row_condition.column' cannot be empty")) + } + } if errs != nil && c.MetricName != "" { errs = append(errs, fmt.Errorf("invalid metric config with metric_name '%s'", c.MetricName)) } diff --git a/internal/sqlquery/config.schema.yaml b/internal/sqlquery/config.schema.yaml index 812880b3e144a..a90b64f558e5c 100644 --- a/internal/sqlquery/config.schema.yaml +++ b/internal/sqlquery/config.schema.yaml @@ -60,6 +60,9 @@ $defs: type: string monotonic: type: boolean + row_condition: + x-pointer: true + $ref: row_condition start_ts_column: type: string static_attributes: @@ -95,6 +98,14 @@ $defs: type: string tracking_start_value: type: string + row_condition: + description: RowCondition filters query result rows for a metric. Only rows where the specified column equals the specified value are used to produce the metric. This is useful when a single query returns a pivot-style result set (e.g. pgbouncer's SHOW LISTS) where each row represents a different metric and must be selected individually. + type: object + properties: + column: + type: string + value: + type: string telemetry_config: type: object properties: diff --git a/internal/sqlquery/config_test.go b/internal/sqlquery/config_test.go new file mode 100644 index 0000000000000..422123b1df58a --- /dev/null +++ b/internal/sqlquery/config_test.go @@ -0,0 +1,68 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package sqlquery + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestMetricCfg_Validate_RowCondition(t *testing.T) { + t.Parallel() + tests := []struct { + name string + cfg MetricCfg + expectErr string + }{ + { + name: "valid row_condition with column set", + cfg: MetricCfg{ + MetricName: "my.metric", + ValueColumn: "val", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: &RowCondition{ + Column: "list", + Value: "pools", + }, + }, + }, + { + name: "row_condition with empty column is invalid", + cfg: MetricCfg{ + MetricName: "my.metric", + ValueColumn: "val", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: &RowCondition{ + Column: "", + Value: "pools", + }, + }, + expectErr: "'row_condition.column' cannot be empty", + }, + { + name: "nil row_condition is valid", + cfg: MetricCfg{ + MetricName: "my.metric", + ValueColumn: "val", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: nil, + }, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + err := tt.cfg.Validate() + if tt.expectErr != "" { + require.ErrorContains(t, err, tt.expectErr) + } else { + require.NoError(t, err) + } + }) + } +} diff --git a/internal/sqlquery/scraper.go b/internal/sqlquery/scraper.go index c3a33d9b63710..49acae55d7868 100644 --- a/internal/sqlquery/scraper.go +++ b/internal/sqlquery/scraper.go @@ -92,6 +92,12 @@ func (s *Scraper) ScrapeMetrics(ctx context.Context) (pmetric.Metrics, error) { for i := range s.Query.Metrics { metricCfg := &s.Query.Metrics[i] for j, row := range rows { + if metricCfg.RowCondition != nil { + val, found := row[metricCfg.RowCondition.Column] + if !found || val != metricCfg.RowCondition.Value { + continue + } + } if err = rowToMetric(row, metricCfg, ms.AppendEmpty(), s.StartTime, ts, s.ScrapeCfg); err != nil { err = fmt.Errorf("row %d: %w", j, err) errs = append(errs, err) diff --git a/internal/sqlquery/scraper_test.go b/internal/sqlquery/scraper_test.go index f692ce3b790ca..3a20d8fdf4823 100644 --- a/internal/sqlquery/scraper_test.go +++ b/internal/sqlquery/scraper_test.go @@ -529,6 +529,114 @@ func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) { assert.Error(t, err) } +func TestScraper_RowCondition_FiltersRows(t *testing.T) { + client := &FakeDBClient{ + StringMaps: [][]StringMap{{ + {"list": "databases", "items": "8"}, + {"list": "pools", "items": "4"}, + {"list": "users", "items": "2"}, + }}, + } + scrpr := Scraper{ + InstrumentationScope: pcommon.NewInstrumentationScope(), + Client: client, + Query: Query{ + Metrics: []MetricCfg{ + { + MetricName: "pgbouncer.lists.pools", + ValueColumn: "items", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: &RowCondition{ + Column: "list", + Value: "pools", + }, + }, + { + MetricName: "pgbouncer.lists.databases", + ValueColumn: "items", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: &RowCondition{ + Column: "list", + Value: "databases", + }, + }, + }, + }, + } + metrics, err := scrpr.ScrapeMetrics(t.Context()) + require.NoError(t, err) + ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + require.Equal(t, 2, ms.Len()) + + poolsMetric := ms.At(0) + assert.Equal(t, "pgbouncer.lists.pools", poolsMetric.Name()) + assert.Equal(t, 1, poolsMetric.Gauge().DataPoints().Len()) + assert.EqualValues(t, 4, poolsMetric.Gauge().DataPoints().At(0).IntValue()) + + dbMetric := ms.At(1) + assert.Equal(t, "pgbouncer.lists.databases", dbMetric.Name()) + assert.Equal(t, 1, dbMetric.Gauge().DataPoints().Len()) + assert.EqualValues(t, 8, dbMetric.Gauge().DataPoints().At(0).IntValue()) +} + +func TestScraper_RowCondition_NoMatch_ProducesNoDataPoints(t *testing.T) { + client := &FakeDBClient{ + StringMaps: [][]StringMap{{ + {"list": "databases", "items": "8"}, + {"list": "pools", "items": "4"}, + }}, + } + scrpr := Scraper{ + InstrumentationScope: pcommon.NewInstrumentationScope(), + Client: client, + Query: Query{ + Metrics: []MetricCfg{{ + MetricName: "pgbouncer.lists.peers", + ValueColumn: "items", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: &RowCondition{ + Column: "list", + Value: "peers", + }, + }}, + }, + } + metrics, err := scrpr.ScrapeMetrics(t.Context()) + require.NoError(t, err) + ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + + assert.Equal(t, 0, ms.Len()) +} + +func TestScraper_RowCondition_NilCondition_AllRowsUsed(t *testing.T) { + client := &FakeDBClient{ + StringMaps: [][]StringMap{{ + {"count": "1"}, + {"count": "2"}, + }}, + } + scrpr := Scraper{ + InstrumentationScope: pcommon.NewInstrumentationScope(), + Client: client, + Query: Query{ + Metrics: []MetricCfg{{ + MetricName: "my.count", + ValueColumn: "count", + ValueType: MetricValueTypeInt, + DataType: MetricTypeGauge, + RowCondition: nil, + }}, + }, + } + metrics, err := scrpr.ScrapeMetrics(t.Context()) + require.NoError(t, err) + ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics() + assert.Equal(t, 2, ms.Len()) +} + func TestBuildDataSourceString(t *testing.T) { t.Parallel() tests := []struct { diff --git a/receiver/sqlqueryreceiver/README.md b/receiver/sqlqueryreceiver/README.md index 49fa29ca18bc9..1151965f13a41 100644 --- a/receiver/sqlqueryreceiver/README.md +++ b/receiver/sqlqueryreceiver/README.md @@ -193,6 +193,11 @@ Each _metric_ in the configuration will produce one OTel metric per row returned sum. - `ts_column` (optional): the name of the column containing the timestamp, the value of which is applied to the metric's timestamp. This can be current timestamp depending upon the time of last recorded metric's datapoint. +- `row_condition` (optional): when set, only rows where the specified column equals the specified value are used + to produce this metric. Rows that do not match are silently skipped. This is useful for pivot-style result + sets where each row encodes a different metric (e.g. `SHOW LISTS` in pgbouncer). + - `column` (required): the column name to evaluate. + - `value` (required): the expected string value the column must equal for the row to be included. ### Example @@ -257,6 +262,54 @@ Data point attributes: Value: 1 ``` +#### Row Condition Example + +Some databases expose pivot-style views where each row represents a different metric, identified by a name column. +For example, pgbouncer's `SHOW LISTS` command returns: + +``` + list | items +---------------+------- + databases | 8 + pools | 4 + users | 2 + free_clients | 25 + used_servers | 5 +``` + +Use `row_condition` to extract a single row as a dedicated metric: + +```yaml +receivers: + sqlquery: + driver: postgres + datasource: "host=pgbouncer port=5432 user=pgbouncer dbname=pgbouncer sslmode=disable" + queries: + - sql: "SHOW LISTS" + metrics: + - metric_name: pgbouncer.lists.pools + value_column: items + value_type: int + row_condition: + column: list + value: pools + - metric_name: pgbouncer.lists.databases + value_column: items + value_type: int + row_condition: + column: list + value: databases + - metric_name: pgbouncer.lists.users + value_column: items + value_type: int + row_condition: + column: list + value: users +``` + +This produces three separate metrics (`pgbouncer.lists.pools`, `pgbouncer.lists.databases`, +`pgbouncer.lists.users`), each sourced from the matching row only. + #### NULL values Avoid queries that produce any NULL values. If a query produces a NULL value, a warning will be logged. Furthermore,