From 7bd5d64e278c699d78d030dc362f8f0a797b02e2 Mon Sep 17 00:00:00 2001 From: Pavel Lazureykis Date: Thu, 9 Apr 2026 12:15:50 -0400 Subject: [PATCH] [receiver/chrony] Enable re-aggregation feature Set reaggregation_enabled: true and requirement_level: recommended for the leap.status attribute, allowing users to reduce metric cardinality by dropping the attribute and aggregating datapoints. Fixes #46350 Assisted-by: Claude Opus 4.6 --- .../46350-chrony-enable-reaggregation.yaml | 27 ++ receiver/chronyreceiver/factory_test.go | 15 +- .../chronyreceiver/generated_package_test.go | 3 +- .../internal/metadata/config.schema.yaml | 80 +++++ .../internal/metadata/generated_config.go | 316 ++++++++++++++++-- .../metadata/generated_config_test.go | 71 ++-- .../internal/metadata/generated_metrics.go | 269 ++++++++++++--- .../metadata/generated_metrics_test.go | 303 +++++++++++++---- .../internal/metadata/testdata/config.yaml | 31 ++ receiver/chronyreceiver/metadata.yaml | 2 + 10 files changed, 951 insertions(+), 166 deletions(-) create mode 100644 .chloggen/46350-chrony-enable-reaggregation.yaml diff --git a/.chloggen/46350-chrony-enable-reaggregation.yaml b/.chloggen/46350-chrony-enable-reaggregation.yaml new file mode 100644 index 0000000000000..13a29c76092cd --- /dev/null +++ b/.chloggen/46350-chrony-enable-reaggregation.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/chrony + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Enables dynamic metric reaggregation in the Chrony receiver. This does not break existing configuration files. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46350] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/receiver/chronyreceiver/factory_test.go b/receiver/chronyreceiver/factory_test.go index a31ad2434e8b8..11cfb2162719a 100644 --- a/receiver/chronyreceiver/factory_test.go +++ b/receiver/chronyreceiver/factory_test.go @@ -35,14 +35,13 @@ func TestCreatingMetricsReceiver(t *testing.T) { factory := NewFactory() mbc := metadata.DefaultMetricsBuilderConfig() - mbc.Metrics = metadata.MetricsConfig{ - NtpTimeCorrection: metadata.MetricConfig{ - Enabled: true, - }, - NtpSkew: metadata.MetricConfig{ - Enabled: true, - }, - } + mbc.Metrics.NtpFrequencyOffset.Enabled = false + mbc.Metrics.NtpSkew.Enabled = true + mbc.Metrics.NtpStratum.Enabled = false + mbc.Metrics.NtpTimeCorrection.Enabled = true + mbc.Metrics.NtpTimeLastOffset.Enabled = false + mbc.Metrics.NtpTimeRmsOffset.Enabled = false + mbc.Metrics.NtpTimeRootDelay.Enabled = false mem, err := factory.CreateMetrics( t.Context(), receivertest.NewNopSettings(metadata.Type), diff --git a/receiver/chronyreceiver/generated_package_test.go b/receiver/chronyreceiver/generated_package_test.go index 6bfc2a7a6d0bc..943b89b0a37d8 100644 --- a/receiver/chronyreceiver/generated_package_test.go +++ b/receiver/chronyreceiver/generated_package_test.go @@ -3,9 +3,8 @@ package chronyreceiver import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/receiver/chronyreceiver/internal/metadata/config.schema.yaml b/receiver/chronyreceiver/internal/metadata/config.schema.yaml index a4370e9bec396..fef99d9ad307b 100644 --- a/receiver/chronyreceiver/internal/metadata/config.schema.yaml +++ b/receiver/chronyreceiver/internal/metadata/config.schema.yaml @@ -11,6 +11,22 @@ $defs: enabled: type: boolean default: false + aggregation_strategy: + type: string + enum: + - "sum" + - "avg" + - "min" + - "max" + default: "avg" + attributes: + type: array + items: + type: string + enum: + - "leap.status" + default: + - "leap.status" ntp.skew: description: "NtpSkewMetricConfig provides config for the ntp.skew metric." type: object @@ -32,6 +48,22 @@ $defs: enabled: type: boolean default: true + aggregation_strategy: + type: string + enum: + - "sum" + - "avg" + - "min" + - "max" + default: "avg" + attributes: + type: array + items: + type: string + enum: + - "leap.status" + default: + - "leap.status" ntp.time.last_offset: description: "NtpTimeLastOffsetMetricConfig provides config for the ntp.time.last_offset metric." type: object @@ -39,6 +71,22 @@ $defs: enabled: type: boolean default: true + aggregation_strategy: + type: string + enum: + - "sum" + - "avg" + - "min" + - "max" + default: "avg" + attributes: + type: array + items: + type: string + enum: + - "leap.status" + default: + - "leap.status" ntp.time.rms_offset: description: "NtpTimeRmsOffsetMetricConfig provides config for the ntp.time.rms_offset metric." type: object @@ -46,6 +94,22 @@ $defs: enabled: type: boolean default: false + aggregation_strategy: + type: string + enum: + - "sum" + - "avg" + - "min" + - "max" + default: "avg" + attributes: + type: array + items: + type: string + enum: + - "leap.status" + default: + - "leap.status" ntp.time.root_delay: description: "NtpTimeRootDelayMetricConfig provides config for the ntp.time.root_delay metric." type: object @@ -53,6 +117,22 @@ $defs: enabled: type: boolean default: false + aggregation_strategy: + type: string + enum: + - "sum" + - "avg" + - "min" + - "max" + default: "avg" + attributes: + type: array + items: + type: string + enum: + - "leap.status" + default: + - "leap.status" metrics_builder_config: description: MetricsBuilderConfig is a configuration for chrony metrics builder. type: object diff --git a/receiver/chronyreceiver/internal/metadata/generated_config.go b/receiver/chronyreceiver/internal/metadata/generated_config.go index e22b0983bf3e4..40b8669374d3b 100644 --- a/receiver/chronyreceiver/internal/metadata/generated_config.go +++ b/receiver/chronyreceiver/internal/metadata/generated_config.go @@ -3,16 +3,116 @@ package metadata import ( + "fmt" + "go.opentelemetry.io/collector/confmap" ) -// MetricConfig provides common config for a particular metric. -type MetricConfig struct { +// NtpFrequencyOffsetMetricAttributeKey specifies the key of an attribute for the ntp.frequency.offset metric. +type NtpFrequencyOffsetMetricAttributeKey string + +const ( + NtpFrequencyOffsetMetricAttributeKeyLeapStatus NtpFrequencyOffsetMetricAttributeKey = "leap.status" +) + +// NtpFrequencyOffsetMetricConfig provides config for the ntp.frequency.offset metric. +type NtpFrequencyOffsetMetricConfig struct { + Enabled bool `mapstructure:"enabled"` + enabledSetByUser bool + + AggregationStrategy string `mapstructure:"aggregation_strategy"` + EnabledAttributes []NtpFrequencyOffsetMetricAttributeKey `mapstructure:"attributes"` +} + +func (ms *NtpFrequencyOffsetMetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + + err := parser.Unmarshal(ms) + if err != nil { + return err + } + + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +func (ms *NtpFrequencyOffsetMetricConfig) Validate() error { + for _, val := range ms.EnabledAttributes { + switch val { + case NtpFrequencyOffsetMetricAttributeKeyLeapStatus: + default: + return fmt.Errorf("metric ntp.frequency.offset doesn't have an attribute %v, valid attributes: [leap.status]", val) + } + } + + switch ms.AggregationStrategy { + case AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax: + default: + return fmt.Errorf("invalid aggregation strategy %q, valid strategies: [%s, %s, %s, %s]", ms.AggregationStrategy, AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax) + } + + return nil +} + +// NtpSkewMetricConfig provides config for the ntp.skew metric. +type NtpSkewMetricConfig struct { + Enabled bool `mapstructure:"enabled"` + enabledSetByUser bool +} + +func (ms *NtpSkewMetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + + err := parser.Unmarshal(ms) + if err != nil { + return err + } + + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// NtpStratumMetricConfig provides config for the ntp.stratum metric. +type NtpStratumMetricConfig struct { + Enabled bool `mapstructure:"enabled"` + enabledSetByUser bool +} + +func (ms *NtpStratumMetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + + err := parser.Unmarshal(ms) + if err != nil { + return err + } + + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +// NtpTimeCorrectionMetricAttributeKey specifies the key of an attribute for the ntp.time.correction metric. +type NtpTimeCorrectionMetricAttributeKey string + +const ( + NtpTimeCorrectionMetricAttributeKeyLeapStatus NtpTimeCorrectionMetricAttributeKey = "leap.status" +) + +// NtpTimeCorrectionMetricConfig provides config for the ntp.time.correction metric. +type NtpTimeCorrectionMetricConfig struct { Enabled bool `mapstructure:"enabled"` enabledSetByUser bool + + AggregationStrategy string `mapstructure:"aggregation_strategy"` + EnabledAttributes []NtpTimeCorrectionMetricAttributeKey `mapstructure:"attributes"` } -func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { +func (ms *NtpTimeCorrectionMetricConfig) Unmarshal(parser *confmap.Conf) error { if parser == nil { return nil } @@ -26,39 +126,211 @@ func (ms *MetricConfig) Unmarshal(parser *confmap.Conf) error { return nil } +func (ms *NtpTimeCorrectionMetricConfig) Validate() error { + for _, val := range ms.EnabledAttributes { + switch val { + case NtpTimeCorrectionMetricAttributeKeyLeapStatus: + default: + return fmt.Errorf("metric ntp.time.correction doesn't have an attribute %v, valid attributes: [leap.status]", val) + } + } + + switch ms.AggregationStrategy { + case AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax: + default: + return fmt.Errorf("invalid aggregation strategy %q, valid strategies: [%s, %s, %s, %s]", ms.AggregationStrategy, AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax) + } + + return nil +} + +// NtpTimeLastOffsetMetricAttributeKey specifies the key of an attribute for the ntp.time.last_offset metric. +type NtpTimeLastOffsetMetricAttributeKey string + +const ( + NtpTimeLastOffsetMetricAttributeKeyLeapStatus NtpTimeLastOffsetMetricAttributeKey = "leap.status" +) + +// NtpTimeLastOffsetMetricConfig provides config for the ntp.time.last_offset metric. +type NtpTimeLastOffsetMetricConfig struct { + Enabled bool `mapstructure:"enabled"` + enabledSetByUser bool + + AggregationStrategy string `mapstructure:"aggregation_strategy"` + EnabledAttributes []NtpTimeLastOffsetMetricAttributeKey `mapstructure:"attributes"` +} + +func (ms *NtpTimeLastOffsetMetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + + err := parser.Unmarshal(ms) + if err != nil { + return err + } + + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +func (ms *NtpTimeLastOffsetMetricConfig) Validate() error { + for _, val := range ms.EnabledAttributes { + switch val { + case NtpTimeLastOffsetMetricAttributeKeyLeapStatus: + default: + return fmt.Errorf("metric ntp.time.last_offset doesn't have an attribute %v, valid attributes: [leap.status]", val) + } + } + + switch ms.AggregationStrategy { + case AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax: + default: + return fmt.Errorf("invalid aggregation strategy %q, valid strategies: [%s, %s, %s, %s]", ms.AggregationStrategy, AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax) + } + + return nil +} + +// NtpTimeRmsOffsetMetricAttributeKey specifies the key of an attribute for the ntp.time.rms_offset metric. +type NtpTimeRmsOffsetMetricAttributeKey string + +const ( + NtpTimeRmsOffsetMetricAttributeKeyLeapStatus NtpTimeRmsOffsetMetricAttributeKey = "leap.status" +) + +// NtpTimeRmsOffsetMetricConfig provides config for the ntp.time.rms_offset metric. +type NtpTimeRmsOffsetMetricConfig struct { + Enabled bool `mapstructure:"enabled"` + enabledSetByUser bool + + AggregationStrategy string `mapstructure:"aggregation_strategy"` + EnabledAttributes []NtpTimeRmsOffsetMetricAttributeKey `mapstructure:"attributes"` +} + +func (ms *NtpTimeRmsOffsetMetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + + err := parser.Unmarshal(ms) + if err != nil { + return err + } + + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +func (ms *NtpTimeRmsOffsetMetricConfig) Validate() error { + for _, val := range ms.EnabledAttributes { + switch val { + case NtpTimeRmsOffsetMetricAttributeKeyLeapStatus: + default: + return fmt.Errorf("metric ntp.time.rms_offset doesn't have an attribute %v, valid attributes: [leap.status]", val) + } + } + + switch ms.AggregationStrategy { + case AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax: + default: + return fmt.Errorf("invalid aggregation strategy %q, valid strategies: [%s, %s, %s, %s]", ms.AggregationStrategy, AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax) + } + + return nil +} + +// NtpTimeRootDelayMetricAttributeKey specifies the key of an attribute for the ntp.time.root_delay metric. +type NtpTimeRootDelayMetricAttributeKey string + +const ( + NtpTimeRootDelayMetricAttributeKeyLeapStatus NtpTimeRootDelayMetricAttributeKey = "leap.status" +) + +// NtpTimeRootDelayMetricConfig provides config for the ntp.time.root_delay metric. +type NtpTimeRootDelayMetricConfig struct { + Enabled bool `mapstructure:"enabled"` + enabledSetByUser bool + + AggregationStrategy string `mapstructure:"aggregation_strategy"` + EnabledAttributes []NtpTimeRootDelayMetricAttributeKey `mapstructure:"attributes"` +} + +func (ms *NtpTimeRootDelayMetricConfig) Unmarshal(parser *confmap.Conf) error { + if parser == nil { + return nil + } + + err := parser.Unmarshal(ms) + if err != nil { + return err + } + + ms.enabledSetByUser = parser.IsSet("enabled") + return nil +} + +func (ms *NtpTimeRootDelayMetricConfig) Validate() error { + for _, val := range ms.EnabledAttributes { + switch val { + case NtpTimeRootDelayMetricAttributeKeyLeapStatus: + default: + return fmt.Errorf("metric ntp.time.root_delay doesn't have an attribute %v, valid attributes: [leap.status]", val) + } + } + + switch ms.AggregationStrategy { + case AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax: + default: + return fmt.Errorf("invalid aggregation strategy %q, valid strategies: [%s, %s, %s, %s]", ms.AggregationStrategy, AggregationStrategySum, AggregationStrategyAvg, AggregationStrategyMin, AggregationStrategyMax) + } + + return nil +} + // MetricsConfig provides config for chrony metrics. type MetricsConfig struct { - NtpFrequencyOffset MetricConfig `mapstructure:"ntp.frequency.offset"` - NtpSkew MetricConfig `mapstructure:"ntp.skew"` - NtpStratum MetricConfig `mapstructure:"ntp.stratum"` - NtpTimeCorrection MetricConfig `mapstructure:"ntp.time.correction"` - NtpTimeLastOffset MetricConfig `mapstructure:"ntp.time.last_offset"` - NtpTimeRmsOffset MetricConfig `mapstructure:"ntp.time.rms_offset"` - NtpTimeRootDelay MetricConfig `mapstructure:"ntp.time.root_delay"` + NtpFrequencyOffset NtpFrequencyOffsetMetricConfig `mapstructure:"ntp.frequency.offset"` + NtpSkew NtpSkewMetricConfig `mapstructure:"ntp.skew"` + NtpStratum NtpStratumMetricConfig `mapstructure:"ntp.stratum"` + NtpTimeCorrection NtpTimeCorrectionMetricConfig `mapstructure:"ntp.time.correction"` + NtpTimeLastOffset NtpTimeLastOffsetMetricConfig `mapstructure:"ntp.time.last_offset"` + NtpTimeRmsOffset NtpTimeRmsOffsetMetricConfig `mapstructure:"ntp.time.rms_offset"` + NtpTimeRootDelay NtpTimeRootDelayMetricConfig `mapstructure:"ntp.time.root_delay"` } func DefaultMetricsConfig() MetricsConfig { return MetricsConfig{ - NtpFrequencyOffset: MetricConfig{ - Enabled: false, + NtpFrequencyOffset: NtpFrequencyOffsetMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpFrequencyOffsetMetricAttributeKey{NtpFrequencyOffsetMetricAttributeKeyLeapStatus}, }, - NtpSkew: MetricConfig{ + NtpSkew: NtpSkewMetricConfig{ Enabled: true, }, - NtpStratum: MetricConfig{ + NtpStratum: NtpStratumMetricConfig{ Enabled: false, }, - NtpTimeCorrection: MetricConfig{ - Enabled: true, + NtpTimeCorrection: NtpTimeCorrectionMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeCorrectionMetricAttributeKey{NtpTimeCorrectionMetricAttributeKeyLeapStatus}, }, - NtpTimeLastOffset: MetricConfig{ - Enabled: true, + NtpTimeLastOffset: NtpTimeLastOffsetMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeLastOffsetMetricAttributeKey{NtpTimeLastOffsetMetricAttributeKeyLeapStatus}, }, - NtpTimeRmsOffset: MetricConfig{ - Enabled: false, + NtpTimeRmsOffset: NtpTimeRmsOffsetMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeRmsOffsetMetricAttributeKey{NtpTimeRmsOffsetMetricAttributeKeyLeapStatus}, }, - NtpTimeRootDelay: MetricConfig{ - Enabled: false, + NtpTimeRootDelay: NtpTimeRootDelayMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeRootDelayMetricAttributeKey{NtpTimeRootDelayMetricAttributeKeyLeapStatus}, }, } } diff --git a/receiver/chronyreceiver/internal/metadata/generated_config_test.go b/receiver/chronyreceiver/internal/metadata/generated_config_test.go index 2ec54db492be2..df510e096e78e 100644 --- a/receiver/chronyreceiver/internal/metadata/generated_config_test.go +++ b/receiver/chronyreceiver/internal/metadata/generated_config_test.go @@ -9,6 +9,7 @@ import ( "github.com/google/go-cmp/cmp" "github.com/google/go-cmp/cmp/cmpopts" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/confmap" "go.opentelemetry.io/collector/confmap/confmaptest" ) @@ -26,26 +27,36 @@ func TestMetricsBuilderConfig(t *testing.T) { name: "all_set", want: MetricsBuilderConfig{ Metrics: MetricsConfig{ - NtpFrequencyOffset: MetricConfig{ - Enabled: true, + NtpFrequencyOffset: NtpFrequencyOffsetMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpFrequencyOffsetMetricAttributeKey{NtpFrequencyOffsetMetricAttributeKeyLeapStatus}, }, - NtpSkew: MetricConfig{ + NtpSkew: NtpSkewMetricConfig{ Enabled: true, }, - NtpStratum: MetricConfig{ + NtpStratum: NtpStratumMetricConfig{ Enabled: true, }, - NtpTimeCorrection: MetricConfig{ - Enabled: true, + NtpTimeCorrection: NtpTimeCorrectionMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeCorrectionMetricAttributeKey{NtpTimeCorrectionMetricAttributeKeyLeapStatus}, }, - NtpTimeLastOffset: MetricConfig{ - Enabled: true, + NtpTimeLastOffset: NtpTimeLastOffsetMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeLastOffsetMetricAttributeKey{NtpTimeLastOffsetMetricAttributeKeyLeapStatus}, }, - NtpTimeRmsOffset: MetricConfig{ - Enabled: true, + NtpTimeRmsOffset: NtpTimeRmsOffsetMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeRmsOffsetMetricAttributeKey{NtpTimeRmsOffsetMetricAttributeKeyLeapStatus}, }, - NtpTimeRootDelay: MetricConfig{ - Enabled: true, + NtpTimeRootDelay: NtpTimeRootDelayMetricConfig{ + Enabled: true, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeRootDelayMetricAttributeKey{NtpTimeRootDelayMetricAttributeKeyLeapStatus}, }, }, }, @@ -54,26 +65,36 @@ func TestMetricsBuilderConfig(t *testing.T) { name: "none_set", want: MetricsBuilderConfig{ Metrics: MetricsConfig{ - NtpFrequencyOffset: MetricConfig{ - Enabled: false, + NtpFrequencyOffset: NtpFrequencyOffsetMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpFrequencyOffsetMetricAttributeKey{NtpFrequencyOffsetMetricAttributeKeyLeapStatus}, }, - NtpSkew: MetricConfig{ + NtpSkew: NtpSkewMetricConfig{ Enabled: false, }, - NtpStratum: MetricConfig{ + NtpStratum: NtpStratumMetricConfig{ Enabled: false, }, - NtpTimeCorrection: MetricConfig{ - Enabled: false, + NtpTimeCorrection: NtpTimeCorrectionMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeCorrectionMetricAttributeKey{NtpTimeCorrectionMetricAttributeKeyLeapStatus}, }, - NtpTimeLastOffset: MetricConfig{ - Enabled: false, + NtpTimeLastOffset: NtpTimeLastOffsetMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeLastOffsetMetricAttributeKey{NtpTimeLastOffsetMetricAttributeKeyLeapStatus}, }, - NtpTimeRmsOffset: MetricConfig{ - Enabled: false, + NtpTimeRmsOffset: NtpTimeRmsOffsetMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeRmsOffsetMetricAttributeKey{NtpTimeRmsOffsetMetricAttributeKeyLeapStatus}, }, - NtpTimeRootDelay: MetricConfig{ - Enabled: false, + NtpTimeRootDelay: NtpTimeRootDelayMetricConfig{ + Enabled: false, + AggregationStrategy: AggregationStrategyAvg, + EnabledAttributes: []NtpTimeRootDelayMetricAttributeKey{NtpTimeRootDelayMetricAttributeKeyLeapStatus}, }, }, }, @@ -82,7 +103,7 @@ func TestMetricsBuilderConfig(t *testing.T) { for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { cfg := loadMetricsBuilderConfig(t, tt.name) - diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(MetricConfig{})) + diff := cmp.Diff(tt.want, cfg, cmpopts.IgnoreUnexported(NtpFrequencyOffsetMetricConfig{}, NtpSkewMetricConfig{}, NtpStratumMetricConfig{}, NtpTimeCorrectionMetricConfig{}, NtpTimeLastOffsetMetricConfig{}, NtpTimeRmsOffsetMetricConfig{}, NtpTimeRootDelayMetricConfig{})) require.Emptyf(t, diff, "Config mismatch (-expected +actual):\n%s", diff) }) } diff --git a/receiver/chronyreceiver/internal/metadata/generated_metrics.go b/receiver/chronyreceiver/internal/metadata/generated_metrics.go index dbbd23e81b9c4..dec6fb13adddc 100644 --- a/receiver/chronyreceiver/internal/metadata/generated_metrics.go +++ b/receiver/chronyreceiver/internal/metadata/generated_metrics.go @@ -3,6 +3,7 @@ package metadata import ( + "slices" "time" "go.opentelemetry.io/collector/component" @@ -11,6 +12,13 @@ import ( "go.opentelemetry.io/collector/receiver" ) +const ( + AggregationStrategySum = "sum" + AggregationStrategyAvg = "avg" + AggregationStrategyMin = "min" + AggregationStrategyMax = "max" +) + // AttributeLeapStatus specifies the value leap.status attribute. type AttributeLeapStatus int @@ -84,9 +92,10 @@ type metricInfo struct { } type metricNtpFrequencyOffset struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpFrequencyOffsetMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. + aggDataPoints []float64 // slice containing number of aggregated datapoints at each index } // init fills ntp.frequency.offset metric with initial data. @@ -96,17 +105,48 @@ func (m *metricNtpFrequencyOffset) init() { m.data.SetUnit("ppm") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) + m.aggDataPoints = m.aggDataPoints[:0] } func (m *metricNtpFrequencyOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64, leapStatusAttributeValue string) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + + dp := pmetric.NewNumberDataPoint() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) + if slices.Contains(m.config.EnabledAttributes, NtpFrequencyOffsetMetricAttributeKeyLeapStatus) { + dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + } + + var s string + dps := m.data.Gauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dpi := dps.At(i) + if dp.Attributes().Equal(dpi.Attributes()) && dp.StartTimestamp() == dpi.StartTimestamp() && dp.Timestamp() == dpi.Timestamp() { + switch s = m.config.AggregationStrategy; s { + case AggregationStrategySum, AggregationStrategyAvg: + dpi.SetDoubleValue(dpi.DoubleValue() + val) + m.aggDataPoints[i] += 1 + return + case AggregationStrategyMin: + if dpi.DoubleValue() > val { + dpi.SetDoubleValue(val) + } + return + case AggregationStrategyMax: + if dpi.DoubleValue() < val { + dpi.SetDoubleValue(val) + } + return + } + } + } + dp.SetDoubleValue(val) - dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + m.aggDataPoints = append(m.aggDataPoints, 1) + dp.MoveTo(dps.AppendEmpty()) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -119,13 +159,18 @@ func (m *metricNtpFrequencyOffset) updateCapacity() { // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricNtpFrequencyOffset) emit(metrics pmetric.MetricSlice) { if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.AggregationStrategy == AggregationStrategyAvg { + for i, aggCount := range m.aggDataPoints { + m.data.Gauge().DataPoints().At(i).SetDoubleValue(m.data.Gauge().DataPoints().At(i).DoubleValue() / aggCount) + } + } m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() } } -func newMetricNtpFrequencyOffset(cfg MetricConfig) metricNtpFrequencyOffset { +func newMetricNtpFrequencyOffset(cfg NtpFrequencyOffsetMetricConfig) metricNtpFrequencyOffset { m := metricNtpFrequencyOffset{config: cfg} if cfg.Enabled { @@ -136,9 +181,9 @@ func newMetricNtpFrequencyOffset(cfg MetricConfig) metricNtpFrequencyOffset { } type metricNtpSkew struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpSkewMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. } // init fills ntp.skew metric with initial data. @@ -175,7 +220,7 @@ func (m *metricNtpSkew) emit(metrics pmetric.MetricSlice) { } } -func newMetricNtpSkew(cfg MetricConfig) metricNtpSkew { +func newMetricNtpSkew(cfg NtpSkewMetricConfig) metricNtpSkew { m := metricNtpSkew{config: cfg} if cfg.Enabled { @@ -186,9 +231,9 @@ func newMetricNtpSkew(cfg MetricConfig) metricNtpSkew { } type metricNtpStratum struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpStratumMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. } // init fills ntp.stratum metric with initial data. @@ -225,7 +270,7 @@ func (m *metricNtpStratum) emit(metrics pmetric.MetricSlice) { } } -func newMetricNtpStratum(cfg MetricConfig) metricNtpStratum { +func newMetricNtpStratum(cfg NtpStratumMetricConfig) metricNtpStratum { m := metricNtpStratum{config: cfg} if cfg.Enabled { @@ -236,9 +281,10 @@ func newMetricNtpStratum(cfg MetricConfig) metricNtpStratum { } type metricNtpTimeCorrection struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpTimeCorrectionMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. + aggDataPoints []float64 // slice containing number of aggregated datapoints at each index } // init fills ntp.time.correction metric with initial data. @@ -248,17 +294,48 @@ func (m *metricNtpTimeCorrection) init() { m.data.SetUnit("seconds") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) + m.aggDataPoints = m.aggDataPoints[:0] } func (m *metricNtpTimeCorrection) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64, leapStatusAttributeValue string) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + + dp := pmetric.NewNumberDataPoint() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) + if slices.Contains(m.config.EnabledAttributes, NtpTimeCorrectionMetricAttributeKeyLeapStatus) { + dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + } + + var s string + dps := m.data.Gauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dpi := dps.At(i) + if dp.Attributes().Equal(dpi.Attributes()) && dp.StartTimestamp() == dpi.StartTimestamp() && dp.Timestamp() == dpi.Timestamp() { + switch s = m.config.AggregationStrategy; s { + case AggregationStrategySum, AggregationStrategyAvg: + dpi.SetDoubleValue(dpi.DoubleValue() + val) + m.aggDataPoints[i] += 1 + return + case AggregationStrategyMin: + if dpi.DoubleValue() > val { + dpi.SetDoubleValue(val) + } + return + case AggregationStrategyMax: + if dpi.DoubleValue() < val { + dpi.SetDoubleValue(val) + } + return + } + } + } + dp.SetDoubleValue(val) - dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + m.aggDataPoints = append(m.aggDataPoints, 1) + dp.MoveTo(dps.AppendEmpty()) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -271,13 +348,18 @@ func (m *metricNtpTimeCorrection) updateCapacity() { // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricNtpTimeCorrection) emit(metrics pmetric.MetricSlice) { if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.AggregationStrategy == AggregationStrategyAvg { + for i, aggCount := range m.aggDataPoints { + m.data.Gauge().DataPoints().At(i).SetDoubleValue(m.data.Gauge().DataPoints().At(i).DoubleValue() / aggCount) + } + } m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() } } -func newMetricNtpTimeCorrection(cfg MetricConfig) metricNtpTimeCorrection { +func newMetricNtpTimeCorrection(cfg NtpTimeCorrectionMetricConfig) metricNtpTimeCorrection { m := metricNtpTimeCorrection{config: cfg} if cfg.Enabled { @@ -288,9 +370,10 @@ func newMetricNtpTimeCorrection(cfg MetricConfig) metricNtpTimeCorrection { } type metricNtpTimeLastOffset struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpTimeLastOffsetMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. + aggDataPoints []float64 // slice containing number of aggregated datapoints at each index } // init fills ntp.time.last_offset metric with initial data. @@ -300,17 +383,48 @@ func (m *metricNtpTimeLastOffset) init() { m.data.SetUnit("seconds") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) + m.aggDataPoints = m.aggDataPoints[:0] } func (m *metricNtpTimeLastOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64, leapStatusAttributeValue string) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + + dp := pmetric.NewNumberDataPoint() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) + if slices.Contains(m.config.EnabledAttributes, NtpTimeLastOffsetMetricAttributeKeyLeapStatus) { + dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + } + + var s string + dps := m.data.Gauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dpi := dps.At(i) + if dp.Attributes().Equal(dpi.Attributes()) && dp.StartTimestamp() == dpi.StartTimestamp() && dp.Timestamp() == dpi.Timestamp() { + switch s = m.config.AggregationStrategy; s { + case AggregationStrategySum, AggregationStrategyAvg: + dpi.SetDoubleValue(dpi.DoubleValue() + val) + m.aggDataPoints[i] += 1 + return + case AggregationStrategyMin: + if dpi.DoubleValue() > val { + dpi.SetDoubleValue(val) + } + return + case AggregationStrategyMax: + if dpi.DoubleValue() < val { + dpi.SetDoubleValue(val) + } + return + } + } + } + dp.SetDoubleValue(val) - dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + m.aggDataPoints = append(m.aggDataPoints, 1) + dp.MoveTo(dps.AppendEmpty()) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -323,13 +437,18 @@ func (m *metricNtpTimeLastOffset) updateCapacity() { // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricNtpTimeLastOffset) emit(metrics pmetric.MetricSlice) { if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.AggregationStrategy == AggregationStrategyAvg { + for i, aggCount := range m.aggDataPoints { + m.data.Gauge().DataPoints().At(i).SetDoubleValue(m.data.Gauge().DataPoints().At(i).DoubleValue() / aggCount) + } + } m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() } } -func newMetricNtpTimeLastOffset(cfg MetricConfig) metricNtpTimeLastOffset { +func newMetricNtpTimeLastOffset(cfg NtpTimeLastOffsetMetricConfig) metricNtpTimeLastOffset { m := metricNtpTimeLastOffset{config: cfg} if cfg.Enabled { @@ -340,9 +459,10 @@ func newMetricNtpTimeLastOffset(cfg MetricConfig) metricNtpTimeLastOffset { } type metricNtpTimeRmsOffset struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpTimeRmsOffsetMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. + aggDataPoints []float64 // slice containing number of aggregated datapoints at each index } // init fills ntp.time.rms_offset metric with initial data. @@ -352,17 +472,48 @@ func (m *metricNtpTimeRmsOffset) init() { m.data.SetUnit("seconds") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) + m.aggDataPoints = m.aggDataPoints[:0] } func (m *metricNtpTimeRmsOffset) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64, leapStatusAttributeValue string) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + + dp := pmetric.NewNumberDataPoint() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) + if slices.Contains(m.config.EnabledAttributes, NtpTimeRmsOffsetMetricAttributeKeyLeapStatus) { + dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + } + + var s string + dps := m.data.Gauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dpi := dps.At(i) + if dp.Attributes().Equal(dpi.Attributes()) && dp.StartTimestamp() == dpi.StartTimestamp() && dp.Timestamp() == dpi.Timestamp() { + switch s = m.config.AggregationStrategy; s { + case AggregationStrategySum, AggregationStrategyAvg: + dpi.SetDoubleValue(dpi.DoubleValue() + val) + m.aggDataPoints[i] += 1 + return + case AggregationStrategyMin: + if dpi.DoubleValue() > val { + dpi.SetDoubleValue(val) + } + return + case AggregationStrategyMax: + if dpi.DoubleValue() < val { + dpi.SetDoubleValue(val) + } + return + } + } + } + dp.SetDoubleValue(val) - dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + m.aggDataPoints = append(m.aggDataPoints, 1) + dp.MoveTo(dps.AppendEmpty()) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -375,13 +526,18 @@ func (m *metricNtpTimeRmsOffset) updateCapacity() { // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricNtpTimeRmsOffset) emit(metrics pmetric.MetricSlice) { if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.AggregationStrategy == AggregationStrategyAvg { + for i, aggCount := range m.aggDataPoints { + m.data.Gauge().DataPoints().At(i).SetDoubleValue(m.data.Gauge().DataPoints().At(i).DoubleValue() / aggCount) + } + } m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() } } -func newMetricNtpTimeRmsOffset(cfg MetricConfig) metricNtpTimeRmsOffset { +func newMetricNtpTimeRmsOffset(cfg NtpTimeRmsOffsetMetricConfig) metricNtpTimeRmsOffset { m := metricNtpTimeRmsOffset{config: cfg} if cfg.Enabled { @@ -392,9 +548,10 @@ func newMetricNtpTimeRmsOffset(cfg MetricConfig) metricNtpTimeRmsOffset { } type metricNtpTimeRootDelay struct { - data pmetric.Metric // data buffer for generated metric. - config MetricConfig // metric config provided by user. - capacity int // max observed number of data points added to the metric. + data pmetric.Metric // data buffer for generated metric. + config NtpTimeRootDelayMetricConfig // metric config provided by user. + capacity int // max observed number of data points added to the metric. + aggDataPoints []float64 // slice containing number of aggregated datapoints at each index } // init fills ntp.time.root_delay metric with initial data. @@ -404,17 +561,48 @@ func (m *metricNtpTimeRootDelay) init() { m.data.SetUnit("seconds") m.data.SetEmptyGauge() m.data.Gauge().DataPoints().EnsureCapacity(m.capacity) + m.aggDataPoints = m.aggDataPoints[:0] } func (m *metricNtpTimeRootDelay) recordDataPoint(start pcommon.Timestamp, ts pcommon.Timestamp, val float64, leapStatusAttributeValue string) { if !m.config.Enabled { return } - dp := m.data.Gauge().DataPoints().AppendEmpty() + + dp := pmetric.NewNumberDataPoint() dp.SetStartTimestamp(start) dp.SetTimestamp(ts) + if slices.Contains(m.config.EnabledAttributes, NtpTimeRootDelayMetricAttributeKeyLeapStatus) { + dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + } + + var s string + dps := m.data.Gauge().DataPoints() + for i := 0; i < dps.Len(); i++ { + dpi := dps.At(i) + if dp.Attributes().Equal(dpi.Attributes()) && dp.StartTimestamp() == dpi.StartTimestamp() && dp.Timestamp() == dpi.Timestamp() { + switch s = m.config.AggregationStrategy; s { + case AggregationStrategySum, AggregationStrategyAvg: + dpi.SetDoubleValue(dpi.DoubleValue() + val) + m.aggDataPoints[i] += 1 + return + case AggregationStrategyMin: + if dpi.DoubleValue() > val { + dpi.SetDoubleValue(val) + } + return + case AggregationStrategyMax: + if dpi.DoubleValue() < val { + dpi.SetDoubleValue(val) + } + return + } + } + } + dp.SetDoubleValue(val) - dp.Attributes().PutStr("leap.status", leapStatusAttributeValue) + m.aggDataPoints = append(m.aggDataPoints, 1) + dp.MoveTo(dps.AppendEmpty()) } // updateCapacity saves max length of data point slices that will be used for the slice capacity. @@ -427,13 +615,18 @@ func (m *metricNtpTimeRootDelay) updateCapacity() { // emit appends recorded metric data to a metrics slice and prepares it for recording another set of data points. func (m *metricNtpTimeRootDelay) emit(metrics pmetric.MetricSlice) { if m.config.Enabled && m.data.Gauge().DataPoints().Len() > 0 { + if m.config.AggregationStrategy == AggregationStrategyAvg { + for i, aggCount := range m.aggDataPoints { + m.data.Gauge().DataPoints().At(i).SetDoubleValue(m.data.Gauge().DataPoints().At(i).DoubleValue() / aggCount) + } + } m.updateCapacity() m.data.MoveTo(metrics.AppendEmpty()) m.init() } } -func newMetricNtpTimeRootDelay(cfg MetricConfig) metricNtpTimeRootDelay { +func newMetricNtpTimeRootDelay(cfg NtpTimeRootDelayMetricConfig) metricNtpTimeRootDelay { m := metricNtpTimeRootDelay{config: cfg} if cfg.Enabled { diff --git a/receiver/chronyreceiver/internal/metadata/generated_metrics_test.go b/receiver/chronyreceiver/internal/metadata/generated_metrics_test.go index de42917f3fd1f..4bbb36147d637 100644 --- a/receiver/chronyreceiver/internal/metadata/generated_metrics_test.go +++ b/receiver/chronyreceiver/internal/metadata/generated_metrics_test.go @@ -19,6 +19,7 @@ const ( testDataSetDefault testDataSet = iota testDataSetAll testDataSetNone + testDataSetReag ) func TestMetricsBuilder(t *testing.T) { @@ -36,6 +37,11 @@ func TestMetricsBuilder(t *testing.T) { metricsSet: testDataSetAll, resAttrsSet: testDataSetAll, }, + { + name: "reaggregate_set", + metricsSet: testDataSetReag, + resAttrsSet: testDataSetReag, + }, { name: "none_set", metricsSet: testDataSetNone, @@ -51,15 +57,26 @@ func TestMetricsBuilder(t *testing.T) { settings := receivertest.NewNopSettings(receivertest.NopType) settings.Logger = zap.New(observedZapCore) mb := NewMetricsBuilder(loadMetricsBuilderConfig(t, tt.name), settings, WithStartTime(start)) + aggMap := make(map[string]string) // contains the aggregation strategies for each metric name + aggMap["NtpFrequencyOffset"] = mb.metricNtpFrequencyOffset.config.AggregationStrategy + aggMap["NtpTimeCorrection"] = mb.metricNtpTimeCorrection.config.AggregationStrategy + aggMap["NtpTimeLastOffset"] = mb.metricNtpTimeLastOffset.config.AggregationStrategy + aggMap["NtpTimeRmsOffset"] = mb.metricNtpTimeRmsOffset.config.AggregationStrategy + aggMap["NtpTimeRootDelay"] = mb.metricNtpTimeRootDelay.config.AggregationStrategy expectedWarnings := 0 - assert.Equal(t, expectedWarnings, observedLogs.Len()) + if tt.metricsSet != testDataSetReag { + assert.Equal(t, expectedWarnings, observedLogs.Len()) + } defaultMetricsCount := 0 allMetricsCount := 0 allMetricsCount++ mb.RecordNtpFrequencyOffsetDataPoint(ts, 1, AttributeLeapStatusNormal) + if tt.name == "reaggregate_set" { + mb.RecordNtpFrequencyOffsetDataPoint(ts, 3, AttributeLeapStatusInsertSecond) + } defaultMetricsCount++ allMetricsCount++ @@ -71,19 +88,38 @@ func TestMetricsBuilder(t *testing.T) { defaultMetricsCount++ allMetricsCount++ mb.RecordNtpTimeCorrectionDataPoint(ts, 1, AttributeLeapStatusNormal) + if tt.name == "reaggregate_set" { + mb.RecordNtpTimeCorrectionDataPoint(ts, 3, AttributeLeapStatusInsertSecond) + } defaultMetricsCount++ allMetricsCount++ mb.RecordNtpTimeLastOffsetDataPoint(ts, 1, AttributeLeapStatusNormal) + if tt.name == "reaggregate_set" { + mb.RecordNtpTimeLastOffsetDataPoint(ts, 3, AttributeLeapStatusInsertSecond) + } allMetricsCount++ mb.RecordNtpTimeRmsOffsetDataPoint(ts, 1, AttributeLeapStatusNormal) + if tt.name == "reaggregate_set" { + mb.RecordNtpTimeRmsOffsetDataPoint(ts, 3, AttributeLeapStatusInsertSecond) + } allMetricsCount++ mb.RecordNtpTimeRootDelayDataPoint(ts, 1, AttributeLeapStatusNormal) + if tt.name == "reaggregate_set" { + mb.RecordNtpTimeRootDelayDataPoint(ts, 3, AttributeLeapStatusInsertSecond) + } res := pcommon.NewResource() metrics := mb.Emit(WithResource(res)) + if tt.name == "reaggregate_set" { + assert.Empty(t, mb.metricNtpFrequencyOffset.aggDataPoints) + assert.Empty(t, mb.metricNtpTimeCorrection.aggDataPoints) + assert.Empty(t, mb.metricNtpTimeLastOffset.aggDataPoints) + assert.Empty(t, mb.metricNtpTimeRmsOffset.aggDataPoints) + assert.Empty(t, mb.metricNtpTimeRootDelay.aggDataPoints) + } if tt.expectEmpty { assert.Equal(t, 0, metrics.ResourceMetrics().Len()) @@ -111,20 +147,45 @@ func TestMetricsBuilder(t *testing.T) { for _, mi := range allMetricsList { switch mi.Name() { case "ntp.frequency.offset": - assert.False(t, validatedMetrics["ntp.frequency.offset"], "Found a duplicate in the metrics slice: ntp.frequency.offset") - validatedMetrics["ntp.frequency.offset"] = true - assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) - assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) - assert.Equal(t, "The frequency is the rate by which the system s clock would be wrong if chronyd was not correcting it.", mi.Description()) - assert.Equal(t, "ppm", mi.Unit()) - dp := mi.Gauge().DataPoints().At(0) - assert.Equal(t, start, dp.StartTimestamp()) - assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) - leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") - assert.True(t, ok) - assert.Equal(t, "normal", leapStatusAttrVal.Str()) + if tt.name != "reaggregate_set" { + assert.False(t, validatedMetrics["ntp.frequency.offset"], "Found a duplicate in the metrics slice: ntp.frequency.offset") + validatedMetrics["ntp.frequency.offset"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "The frequency is the rate by which the system s clock would be wrong if chronyd was not correcting it.", mi.Description()) + assert.Equal(t, "ppm", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") + assert.True(t, ok) + assert.Equal(t, "normal", leapStatusAttrVal.Str()) + } else { + assert.False(t, validatedMetrics["ntp.frequency.offset"], "Found a duplicate in the metrics slice: ntp.frequency.offset") + validatedMetrics["ntp.frequency.offset"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "The frequency is the rate by which the system s clock would be wrong if chronyd was not correcting it.", mi.Description()) + assert.Equal(t, "ppm", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + switch aggMap["ntp.frequency.offset"] { + case "sum": + assert.InDelta(t, float64(4), dp.DoubleValue(), 0.01) + case "avg": + assert.InDelta(t, float64(2), dp.DoubleValue(), 0.01) + case "min": + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "max": + assert.InDelta(t, float64(3), dp.DoubleValue(), 0.01) + } + _, ok := dp.Attributes().Get("leap.status") + assert.False(t, ok) + } case "ntp.skew": assert.False(t, validatedMetrics["ntp.skew"], "Found a duplicate in the metrics slice: ntp.skew") validatedMetrics["ntp.skew"] = true @@ -150,65 +211,165 @@ func TestMetricsBuilder(t *testing.T) { assert.Equal(t, pmetric.NumberDataPointValueTypeInt, dp.ValueType()) assert.Equal(t, int64(1), dp.IntValue()) case "ntp.time.correction": - assert.False(t, validatedMetrics["ntp.time.correction"], "Found a duplicate in the metrics slice: ntp.time.correction") - validatedMetrics["ntp.time.correction"] = true - assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) - assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) - assert.Equal(t, "The number of seconds difference between the system's clock and the reference clock", mi.Description()) - assert.Equal(t, "seconds", mi.Unit()) - dp := mi.Gauge().DataPoints().At(0) - assert.Equal(t, start, dp.StartTimestamp()) - assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) - leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") - assert.True(t, ok) - assert.Equal(t, "normal", leapStatusAttrVal.Str()) + if tt.name != "reaggregate_set" { + assert.False(t, validatedMetrics["ntp.time.correction"], "Found a duplicate in the metrics slice: ntp.time.correction") + validatedMetrics["ntp.time.correction"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "The number of seconds difference between the system's clock and the reference clock", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") + assert.True(t, ok) + assert.Equal(t, "normal", leapStatusAttrVal.Str()) + } else { + assert.False(t, validatedMetrics["ntp.time.correction"], "Found a duplicate in the metrics slice: ntp.time.correction") + validatedMetrics["ntp.time.correction"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "The number of seconds difference between the system's clock and the reference clock", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + switch aggMap["ntp.time.correction"] { + case "sum": + assert.InDelta(t, float64(4), dp.DoubleValue(), 0.01) + case "avg": + assert.InDelta(t, float64(2), dp.DoubleValue(), 0.01) + case "min": + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "max": + assert.InDelta(t, float64(3), dp.DoubleValue(), 0.01) + } + _, ok := dp.Attributes().Get("leap.status") + assert.False(t, ok) + } case "ntp.time.last_offset": - assert.False(t, validatedMetrics["ntp.time.last_offset"], "Found a duplicate in the metrics slice: ntp.time.last_offset") - validatedMetrics["ntp.time.last_offset"] = true - assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) - assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) - assert.Equal(t, "The estimated local offset on the last clock update", mi.Description()) - assert.Equal(t, "seconds", mi.Unit()) - dp := mi.Gauge().DataPoints().At(0) - assert.Equal(t, start, dp.StartTimestamp()) - assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) - leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") - assert.True(t, ok) - assert.Equal(t, "normal", leapStatusAttrVal.Str()) + if tt.name != "reaggregate_set" { + assert.False(t, validatedMetrics["ntp.time.last_offset"], "Found a duplicate in the metrics slice: ntp.time.last_offset") + validatedMetrics["ntp.time.last_offset"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "The estimated local offset on the last clock update", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") + assert.True(t, ok) + assert.Equal(t, "normal", leapStatusAttrVal.Str()) + } else { + assert.False(t, validatedMetrics["ntp.time.last_offset"], "Found a duplicate in the metrics slice: ntp.time.last_offset") + validatedMetrics["ntp.time.last_offset"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "The estimated local offset on the last clock update", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + switch aggMap["ntp.time.last_offset"] { + case "sum": + assert.InDelta(t, float64(4), dp.DoubleValue(), 0.01) + case "avg": + assert.InDelta(t, float64(2), dp.DoubleValue(), 0.01) + case "min": + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "max": + assert.InDelta(t, float64(3), dp.DoubleValue(), 0.01) + } + _, ok := dp.Attributes().Get("leap.status") + assert.False(t, ok) + } case "ntp.time.rms_offset": - assert.False(t, validatedMetrics["ntp.time.rms_offset"], "Found a duplicate in the metrics slice: ntp.time.rms_offset") - validatedMetrics["ntp.time.rms_offset"] = true - assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) - assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) - assert.Equal(t, "the long term average of the offset value", mi.Description()) - assert.Equal(t, "seconds", mi.Unit()) - dp := mi.Gauge().DataPoints().At(0) - assert.Equal(t, start, dp.StartTimestamp()) - assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) - leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") - assert.True(t, ok) - assert.Equal(t, "normal", leapStatusAttrVal.Str()) + if tt.name != "reaggregate_set" { + assert.False(t, validatedMetrics["ntp.time.rms_offset"], "Found a duplicate in the metrics slice: ntp.time.rms_offset") + validatedMetrics["ntp.time.rms_offset"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "the long term average of the offset value", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") + assert.True(t, ok) + assert.Equal(t, "normal", leapStatusAttrVal.Str()) + } else { + assert.False(t, validatedMetrics["ntp.time.rms_offset"], "Found a duplicate in the metrics slice: ntp.time.rms_offset") + validatedMetrics["ntp.time.rms_offset"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "the long term average of the offset value", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + switch aggMap["ntp.time.rms_offset"] { + case "sum": + assert.InDelta(t, float64(4), dp.DoubleValue(), 0.01) + case "avg": + assert.InDelta(t, float64(2), dp.DoubleValue(), 0.01) + case "min": + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "max": + assert.InDelta(t, float64(3), dp.DoubleValue(), 0.01) + } + _, ok := dp.Attributes().Get("leap.status") + assert.False(t, ok) + } case "ntp.time.root_delay": - assert.False(t, validatedMetrics["ntp.time.root_delay"], "Found a duplicate in the metrics slice: ntp.time.root_delay") - validatedMetrics["ntp.time.root_delay"] = true - assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) - assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) - assert.Equal(t, "This is the total of the network path delays to the stratum-1 system from which the system is ultimately synchronised.", mi.Description()) - assert.Equal(t, "seconds", mi.Unit()) - dp := mi.Gauge().DataPoints().At(0) - assert.Equal(t, start, dp.StartTimestamp()) - assert.Equal(t, ts, dp.Timestamp()) - assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) - assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) - leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") - assert.True(t, ok) - assert.Equal(t, "normal", leapStatusAttrVal.Str()) + if tt.name != "reaggregate_set" { + assert.False(t, validatedMetrics["ntp.time.root_delay"], "Found a duplicate in the metrics slice: ntp.time.root_delay") + validatedMetrics["ntp.time.root_delay"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "This is the total of the network path delays to the stratum-1 system from which the system is ultimately synchronised.", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + leapStatusAttrVal, ok := dp.Attributes().Get("leap.status") + assert.True(t, ok) + assert.Equal(t, "normal", leapStatusAttrVal.Str()) + } else { + assert.False(t, validatedMetrics["ntp.time.root_delay"], "Found a duplicate in the metrics slice: ntp.time.root_delay") + validatedMetrics["ntp.time.root_delay"] = true + assert.Equal(t, pmetric.MetricTypeGauge, mi.Type()) + assert.Equal(t, 1, mi.Gauge().DataPoints().Len()) + assert.Equal(t, "This is the total of the network path delays to the stratum-1 system from which the system is ultimately synchronised.", mi.Description()) + assert.Equal(t, "seconds", mi.Unit()) + dp := mi.Gauge().DataPoints().At(0) + assert.Equal(t, start, dp.StartTimestamp()) + assert.Equal(t, ts, dp.Timestamp()) + assert.Equal(t, pmetric.NumberDataPointValueTypeDouble, dp.ValueType()) + switch aggMap["ntp.time.root_delay"] { + case "sum": + assert.InDelta(t, float64(4), dp.DoubleValue(), 0.01) + case "avg": + assert.InDelta(t, float64(2), dp.DoubleValue(), 0.01) + case "min": + assert.InDelta(t, float64(1), dp.DoubleValue(), 0.01) + case "max": + assert.InDelta(t, float64(3), dp.DoubleValue(), 0.01) + } + _, ok := dp.Attributes().Get("leap.status") + assert.False(t, ok) + } } } }) diff --git a/receiver/chronyreceiver/internal/metadata/testdata/config.yaml b/receiver/chronyreceiver/internal/metadata/testdata/config.yaml index 847e154859b15..b7a8081e61c7e 100644 --- a/receiver/chronyreceiver/internal/metadata/testdata/config.yaml +++ b/receiver/chronyreceiver/internal/metadata/testdata/config.yaml @@ -3,31 +3,62 @@ all_set: metrics: ntp.frequency.offset: enabled: true + attributes: ["leap.status"] ntp.skew: enabled: true ntp.stratum: enabled: true ntp.time.correction: enabled: true + attributes: ["leap.status"] ntp.time.last_offset: enabled: true + attributes: ["leap.status"] ntp.time.rms_offset: enabled: true + attributes: ["leap.status"] ntp.time.root_delay: enabled: true + attributes: ["leap.status"] +reaggregate_set: + metrics: + ntp.frequency.offset: + enabled: true + attributes: [] + ntp.skew: + enabled: true + ntp.stratum: + enabled: true + ntp.time.correction: + enabled: true + attributes: [] + ntp.time.last_offset: + enabled: true + attributes: [] + ntp.time.rms_offset: + enabled: true + attributes: [] + ntp.time.root_delay: + enabled: true + attributes: [] none_set: metrics: ntp.frequency.offset: enabled: false + attributes: ["leap.status"] ntp.skew: enabled: false ntp.stratum: enabled: false ntp.time.correction: enabled: false + attributes: ["leap.status"] ntp.time.last_offset: enabled: false + attributes: ["leap.status"] ntp.time.rms_offset: enabled: false + attributes: ["leap.status"] ntp.time.root_delay: enabled: false + attributes: ["leap.status"] diff --git a/receiver/chronyreceiver/metadata.yaml b/receiver/chronyreceiver/metadata.yaml index efb35f23e27a9..6fab75eddb443 100644 --- a/receiver/chronyreceiver/metadata.yaml +++ b/receiver/chronyreceiver/metadata.yaml @@ -1,5 +1,6 @@ display_name: Chrony Receiver type: chrony +reaggregation_enabled: true status: class: receiver @@ -12,6 +13,7 @@ status: attributes: leap.status: description: how the chrony is handling leap seconds + requirement_level: recommended type: string enum: - normal