Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions .chloggen/add-row-condition.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/sqlquery

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "Add `row_condition` to metric configuration for filtering result rows by column value"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [45862]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Enables extracting individual metrics from pivot-style result sets where each row
represents a different metric (e.g. pgbouncer's `SHOW LISTS` command). When
`row_condition` is configured on a metric, only rows where the specified column
equals the specified value are used; all other rows are silently skipped.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user, api]
16 changes: 16 additions & 0 deletions internal/sqlquery/config.go
Comment thread
bruegth marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,16 @@ func (config LogsCfg) Validate() error {
return errors.Join(errs...)
}

// RowCondition filters query result rows for a metric. Only rows where the
// specified column equals the specified value are used to produce the metric.
// This is useful when a single query returns a pivot-style result set (e.g.
// pgbouncer's SHOW LISTS) where each row represents a different metric and
// must be selected individually.
type RowCondition struct {
Column string `mapstructure:"column"`
Value string `mapstructure:"value"`
}

type MetricCfg struct {
MetricName string `mapstructure:"metric_name"`
ValueColumn string `mapstructure:"value_column"`
Expand All @@ -138,6 +148,7 @@ type MetricCfg struct {
StaticAttributes map[string]string `mapstructure:"static_attributes"`
StartTsColumn string `mapstructure:"start_ts_column"`
TsColumn string `mapstructure:"ts_column"`
RowCondition *RowCondition `mapstructure:"row_condition"`
}

func (c MetricCfg) Validate() error {
Expand All @@ -160,6 +171,11 @@ func (c MetricCfg) Validate() error {
if c.DataType == MetricTypeGauge && c.Aggregation != "" {
errs = append(errs, fmt.Errorf("aggregation=%s but data_type=%s does not support aggregation", c.Aggregation, c.DataType))
}
if c.RowCondition != nil {
if c.RowCondition.Column == "" {
errs = append(errs, errors.New("'row_condition.column' cannot be empty"))
}
}
if errs != nil && c.MetricName != "" {
errs = append(errs, fmt.Errorf("invalid metric config with metric_name '%s'", c.MetricName))
}
Expand Down
11 changes: 11 additions & 0 deletions internal/sqlquery/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ $defs:
type: string
monotonic:
type: boolean
row_condition:
x-pointer: true
$ref: row_condition
start_ts_column:
type: string
static_attributes:
Expand Down Expand Up @@ -95,6 +98,14 @@ $defs:
type: string
tracking_start_value:
type: string
row_condition:
description: RowCondition filters query result rows for a metric. Only rows where the specified column equals the specified value are used to produce the metric. This is useful when a single query returns a pivot-style result set (e.g. pgbouncer's SHOW LISTS) where each row represents a different metric and must be selected individually.
type: object
properties:
column:
type: string
value:
type: string
telemetry_config:
type: object
properties:
Expand Down
68 changes: 68 additions & 0 deletions internal/sqlquery/config_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
// Copyright The OpenTelemetry Authors
// SPDX-License-Identifier: Apache-2.0

package sqlquery

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestMetricCfg_Validate_RowCondition(t *testing.T) {
t.Parallel()
tests := []struct {
name string
cfg MetricCfg
expectErr string
}{
{
name: "valid row_condition with column set",
cfg: MetricCfg{
MetricName: "my.metric",
ValueColumn: "val",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: &RowCondition{
Column: "list",
Value: "pools",
},
},
},
{
name: "row_condition with empty column is invalid",
cfg: MetricCfg{
MetricName: "my.metric",
ValueColumn: "val",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: &RowCondition{
Column: "",
Value: "pools",
},
},
expectErr: "'row_condition.column' cannot be empty",
},
{
name: "nil row_condition is valid",
cfg: MetricCfg{
MetricName: "my.metric",
ValueColumn: "val",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: nil,
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
t.Parallel()
err := tt.cfg.Validate()
if tt.expectErr != "" {
require.ErrorContains(t, err, tt.expectErr)
} else {
require.NoError(t, err)
}
})
}
}
6 changes: 6 additions & 0 deletions internal/sqlquery/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ func (s *Scraper) ScrapeMetrics(ctx context.Context) (pmetric.Metrics, error) {
for i := range s.Query.Metrics {
metricCfg := &s.Query.Metrics[i]
for j, row := range rows {
if metricCfg.RowCondition != nil {
val, found := row[metricCfg.RowCondition.Column]
if !found || val != metricCfg.RowCondition.Value {
continue
}
}
if err = rowToMetric(row, metricCfg, ms.AppendEmpty(), s.StartTime, ts, s.ScrapeCfg); err != nil {
err = fmt.Errorf("row %d: %w", j, err)
errs = append(errs, err)
Expand Down
108 changes: 108 additions & 0 deletions internal/sqlquery/scraper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -529,6 +529,114 @@ func TestScraper_StartAndTS_ErrorOnParse(t *testing.T) {
assert.Error(t, err)
}

func TestScraper_RowCondition_FiltersRows(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
{"list": "databases", "items": "8"},
{"list": "pools", "items": "4"},
{"list": "users", "items": "2"},
}},
}
scrpr := Scraper{
InstrumentationScope: pcommon.NewInstrumentationScope(),
Client: client,
Query: Query{
Metrics: []MetricCfg{
{
MetricName: "pgbouncer.lists.pools",
ValueColumn: "items",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: &RowCondition{
Column: "list",
Value: "pools",
},
},
{
MetricName: "pgbouncer.lists.databases",
ValueColumn: "items",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: &RowCondition{
Column: "list",
Value: "databases",
},
},
},
},
}
metrics, err := scrpr.ScrapeMetrics(t.Context())
require.NoError(t, err)
ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
require.Equal(t, 2, ms.Len())

poolsMetric := ms.At(0)
assert.Equal(t, "pgbouncer.lists.pools", poolsMetric.Name())
assert.Equal(t, 1, poolsMetric.Gauge().DataPoints().Len())
assert.EqualValues(t, 4, poolsMetric.Gauge().DataPoints().At(0).IntValue())

dbMetric := ms.At(1)
assert.Equal(t, "pgbouncer.lists.databases", dbMetric.Name())
assert.Equal(t, 1, dbMetric.Gauge().DataPoints().Len())
assert.EqualValues(t, 8, dbMetric.Gauge().DataPoints().At(0).IntValue())
}

func TestScraper_RowCondition_NoMatch_ProducesNoDataPoints(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
{"list": "databases", "items": "8"},
{"list": "pools", "items": "4"},
}},
}
scrpr := Scraper{
InstrumentationScope: pcommon.NewInstrumentationScope(),
Client: client,
Query: Query{
Metrics: []MetricCfg{{
MetricName: "pgbouncer.lists.peers",
ValueColumn: "items",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: &RowCondition{
Column: "list",
Value: "peers",
},
}},
},
}
metrics, err := scrpr.ScrapeMetrics(t.Context())
require.NoError(t, err)
ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()

assert.Equal(t, 0, ms.Len())
}

func TestScraper_RowCondition_NilCondition_AllRowsUsed(t *testing.T) {
client := &FakeDBClient{
StringMaps: [][]StringMap{{
{"count": "1"},
{"count": "2"},
}},
}
scrpr := Scraper{
InstrumentationScope: pcommon.NewInstrumentationScope(),
Client: client,
Query: Query{
Metrics: []MetricCfg{{
MetricName: "my.count",
ValueColumn: "count",
ValueType: MetricValueTypeInt,
DataType: MetricTypeGauge,
RowCondition: nil,
}},
},
}
metrics, err := scrpr.ScrapeMetrics(t.Context())
require.NoError(t, err)
ms := metrics.ResourceMetrics().At(0).ScopeMetrics().At(0).Metrics()
assert.Equal(t, 2, ms.Len())
}

func TestBuildDataSourceString(t *testing.T) {
t.Parallel()
tests := []struct {
Expand Down
53 changes: 53 additions & 0 deletions receiver/sqlqueryreceiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,11 @@ Each _metric_ in the configuration will produce one OTel metric per row returned
sum.
- `ts_column` (optional): the name of the column containing the timestamp, the value of which is applied to the
metric's timestamp. This can be current timestamp depending upon the time of last recorded metric's datapoint.
- `row_condition` (optional): when set, only rows where the specified column equals the specified value are used
to produce this metric. Rows that do not match are silently skipped. This is useful for pivot-style result
sets where each row encodes a different metric (e.g. `SHOW LISTS` in pgbouncer).
- `column` (required): the column name to evaluate.
- `value` (required): the expected string value the column must equal for the row to be included.

### Example

Expand Down Expand Up @@ -257,6 +262,54 @@ Data point attributes:
Value: 1
```

#### Row Condition Example

Some databases expose pivot-style views where each row represents a different metric, identified by a name column.
For example, pgbouncer's `SHOW LISTS` command returns:

```
list | items
---------------+-------
databases | 8
pools | 4
users | 2
free_clients | 25
used_servers | 5
```

Use `row_condition` to extract a single row as a dedicated metric:

```yaml
receivers:
sqlquery:
driver: postgres
datasource: "host=pgbouncer port=5432 user=pgbouncer dbname=pgbouncer sslmode=disable"
queries:
- sql: "SHOW LISTS"
metrics:
- metric_name: pgbouncer.lists.pools
value_column: items
value_type: int
row_condition:
column: list
value: pools
- metric_name: pgbouncer.lists.databases
value_column: items
value_type: int
row_condition:
column: list
value: databases
- metric_name: pgbouncer.lists.users
value_column: items
value_type: int
row_condition:
column: list
value: users
```

This produces three separate metrics (`pgbouncer.lists.pools`, `pgbouncer.lists.databases`,
`pgbouncer.lists.users`), each sourced from the matching row only.

#### NULL values

Avoid queries that produce any NULL values. If a query produces a NULL value, a warning will be logged. Furthermore,
Expand Down
Loading