diff --git a/.chloggen/nickange_telemetrygen_metrics_support-delta-temporality.yaml b/.chloggen/nickange_telemetrygen_metrics_support-delta-temporality.yaml new file mode 100644 index 0000000000000..23aea72755bec --- /dev/null +++ b/.chloggen/nickange_telemetrygen_metrics_support-delta-temporality.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: telemetrygen + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "Add support for `aggregation-temporality` flag in telemetrygen. Supported values (delta or cumulative)" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [38073] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/cmd/telemetrygen/pkg/metrics/aggregation_temporality.go b/cmd/telemetrygen/pkg/metrics/aggregation_temporality.go new file mode 100644 index 0000000000000..673e9f1b760a9 --- /dev/null +++ b/cmd/telemetrygen/pkg/metrics/aggregation_temporality.go @@ -0,0 +1,35 @@ +package metrics + +import ( + "fmt" + + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +type AggregationTemporality metricdata.Temporality + +func (t *AggregationTemporality) Set(v string) error { + switch v { + case "delta": + *t = AggregationTemporality(metricdata.DeltaTemporality) + return nil + case "cumulative": + *t = AggregationTemporality(metricdata.CumulativeTemporality) + return nil + default: + return fmt.Errorf(`temporality must be one of "delta" or "cumulative"`) + } +} + +func (t *AggregationTemporality) String() string { + return string(metricdata.Temporality(*t)) +} + +func (t *AggregationTemporality) Type() string { + return "temporality" +} + +// AsTemporality converts the AggregationTemporality to metricdata.Temporality +func (t AggregationTemporality) AsTemporality() metricdata.Temporality { + return metricdata.Temporality(t) +} diff --git a/cmd/telemetrygen/pkg/metrics/config.go b/cmd/telemetrygen/pkg/metrics/config.go index 3ccd66251a8f2..630b7778ac3d0 100644 --- a/cmd/telemetrygen/pkg/metrics/config.go +++ b/cmd/telemetrygen/pkg/metrics/config.go @@ -7,6 +7,7 @@ import ( "fmt" "github.com/spf13/pflag" + "go.opentelemetry.io/otel/sdk/metric/metricdata" "github.com/open-telemetry/opentelemetry-collector-contrib/cmd/telemetrygen/internal/common" ) @@ -14,11 +15,12 @@ import ( // Config describes the test scenario. type Config struct { common.Config - NumMetrics int - MetricName string - MetricType MetricType - SpanID string - TraceID string + NumMetrics int + MetricName string + MetricType MetricType + AggregationTemporality AggregationTemporality + SpanID string + TraceID string } // NewConfig creates a new Config with default values. @@ -34,11 +36,13 @@ func (c *Config) Flags(fs *pflag.FlagSet) { fs.StringVar(&c.HTTPPath, "otlp-http-url-path", c.HTTPPath, "Which URL path to write to") - fs.Var(&c.MetricType, "metric-type", "Metric type enum. must be one of 'Gauge' or 'Sum'") fs.IntVar(&c.NumMetrics, "metrics", c.NumMetrics, "Number of metrics to generate in each worker (ignored if duration is provided)") fs.StringVar(&c.TraceID, "trace-id", c.TraceID, "TraceID to use as exemplar") fs.StringVar(&c.SpanID, "span-id", c.SpanID, "SpanID to use as exemplar") + + fs.Var(&c.MetricType, "metric-type", "Metric type enum. must be one of 'Gauge' or 'Sum'") + fs.Var(&c.AggregationTemporality, "aggregation-temporality", "aggregation-temporality for metrics. Must be one of 'delta' or 'cumulative'") } // SetDefaults sets the default values for the configuration @@ -49,9 +53,12 @@ func (c *Config) SetDefaults() { c.HTTPPath = "/v1/metrics" c.NumMetrics = 1 + c.MetricName = "gen" // Use Gauge as default metric type. c.MetricType = MetricTypeGauge - c.MetricName = "gen" + // Use cumulative temporality as default. + c.AggregationTemporality = AggregationTemporality(metricdata.CumulativeTemporality) + c.TraceID = "" c.SpanID = "" } diff --git a/cmd/telemetrygen/pkg/metrics/metrics.go b/cmd/telemetrygen/pkg/metrics/metrics.go index bfbc9ab529e96..95831e2deb627 100644 --- a/cmd/telemetrygen/pkg/metrics/metrics.go +++ b/cmd/telemetrygen/pkg/metrics/metrics.go @@ -66,16 +66,17 @@ func run(c *Config, expF exporterFunc, logger *zap.Logger) error { for i := 0; i < c.WorkerCount; i++ { wg.Add(1) w := worker{ - numMetrics: c.NumMetrics, - metricName: c.MetricName, - metricType: c.MetricType, - exemplars: exemplarsFromConfig(c), - limitPerSecond: limit, - totalDuration: c.TotalDuration, - running: running, - wg: &wg, - logger: logger.With(zap.Int("worker", i)), - index: i, + numMetrics: c.NumMetrics, + metricName: c.MetricName, + metricType: c.MetricType, + aggregationTemporality: c.AggregationTemporality, + exemplars: exemplarsFromConfig(c), + limitPerSecond: limit, + totalDuration: c.TotalDuration, + running: running, + wg: &wg, + logger: logger.With(zap.Int("worker", i)), + index: i, } exp, err := expF() if err != nil { diff --git a/cmd/telemetrygen/pkg/metrics/worker.go b/cmd/telemetrygen/pkg/metrics/worker.go index 2483670e55f98..cb3ff64cb20d7 100644 --- a/cmd/telemetrygen/pkg/metrics/worker.go +++ b/cmd/telemetrygen/pkg/metrics/worker.go @@ -18,16 +18,17 @@ import ( ) type worker struct { - running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test - metricName string // name of metric to generate - metricType MetricType // type of metric to generate - exemplars []metricdata.Exemplar[int64] // exemplars to attach to the metric - numMetrics int // how many metrics the worker has to generate (only when duration==0) - totalDuration time.Duration // how long to run the test for (overrides `numMetrics`) - limitPerSecond rate.Limit // how many metrics per second to generate - wg *sync.WaitGroup // notify when done - logger *zap.Logger // logger - index int // worker index + running *atomic.Bool // pointer to shared flag that indicates it's time to stop the test + metricName string // name of metric to generate + metricType MetricType // type of metric to generate + aggregationTemporality AggregationTemporality // Temporality type to use + exemplars []metricdata.Exemplar[int64] // exemplars to attach to the metric + numMetrics int // how many metrics the worker has to generate (only when duration==0) + totalDuration time.Duration // how long to run the test for (overrides `numMetrics`) + limitPerSecond rate.Limit // how many metrics per second to generate + wg *sync.WaitGroup // notify when done + logger *zap.Logger // logger + index int // worker index } var histogramBucketSamples = []struct { @@ -103,7 +104,7 @@ func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Expor Name: w.metricName, Data: metricdata.Sum[int64]{ IsMonotonic: true, - Temporality: metricdata.CumulativeTemporality, + Temporality: w.aggregationTemporality.AsTemporality(), DataPoints: []metricdata.DataPoint[int64]{ { StartTime: time.Now().Add(-1 * time.Second), @@ -122,7 +123,7 @@ func (w worker) simulateMetrics(res *resource.Resource, exporter sdkmetric.Expor metrics = append(metrics, metricdata.Metrics{ Name: w.metricName, Data: metricdata.Histogram[int64]{ - Temporality: metricdata.CumulativeTemporality, + Temporality: w.aggregationTemporality.AsTemporality(), DataPoints: []metricdata.HistogramDataPoint[int64]{ { StartTime: time.Now().Add(-1 * time.Second), diff --git a/cmd/telemetrygen/pkg/metrics/worker_test.go b/cmd/telemetrygen/pkg/metrics/worker_test.go index fe83abdcc81f1..dfc69dba51726 100644 --- a/cmd/telemetrygen/pkg/metrics/worker_test.go +++ b/cmd/telemetrygen/pkg/metrics/worker_test.go @@ -50,6 +50,21 @@ func (m *mockExporter) Shutdown(_ context.Context) error { return nil } +func checkMetricTemporality(t *testing.T, ms metricdata.Metrics, metricType MetricType, expectedAggregationTemporality metricdata.Temporality) { + switch metricType { + case MetricTypeSum: + sumData, ok := ms.Data.(metricdata.Sum[int64]) + require.True(t, ok, "expected Sum data type") + assert.Equal(t, expectedAggregationTemporality, sumData.Temporality) + case MetricTypeHistogram: + histogramData, ok := ms.Data.(metricdata.Histogram[int64]) + require.True(t, ok, "expected Histogram data type") + assert.Equal(t, expectedAggregationTemporality, histogramData.Temporality) + default: + t.Fatalf("unsupported metric type: %v", metricType) + } +} + func TestFixedNumberOfMetrics(t *testing.T) { // arrange cfg := &Config{ @@ -98,6 +113,72 @@ func TestRateOfMetrics(t *testing.T) { assert.LessOrEqual(t, len(m.rms), 20, "there should have been less than 20 metrics, had %d", len(m.rms)) } +func TestMetricsWithTemporality(t *testing.T) { + tests := []struct { + name string + metricType MetricType + aggregationTemporality AggregationTemporality + expectedAggregationTemporality metricdata.Temporality + }{ + { + name: "Sum: delta temporality", + metricType: MetricTypeSum, + aggregationTemporality: AggregationTemporality(metricdata.DeltaTemporality), + expectedAggregationTemporality: metricdata.DeltaTemporality, + }, + { + name: "Sum: cumulative temporality", + metricType: MetricTypeSum, + aggregationTemporality: AggregationTemporality(metricdata.CumulativeTemporality), + expectedAggregationTemporality: metricdata.CumulativeTemporality, + }, + { + name: "Histogram: delta temporality", + metricType: MetricTypeHistogram, + aggregationTemporality: AggregationTemporality(metricdata.DeltaTemporality), + expectedAggregationTemporality: metricdata.DeltaTemporality, + }, + { + name: "Histogram: cumulative temporality", + metricType: MetricTypeHistogram, + aggregationTemporality: AggregationTemporality(metricdata.CumulativeTemporality), + expectedAggregationTemporality: metricdata.CumulativeTemporality, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + // arrange + cfg := &Config{ + Config: common.Config{ + WorkerCount: 1, + }, + NumMetrics: 1, + MetricName: "test", + MetricType: tt.metricType, + AggregationTemporality: tt.aggregationTemporality, + } + m := &mockExporter{} + expFunc := func() (sdkmetric.Exporter, error) { + return m, nil + } + + // act + logger, _ := zap.NewDevelopment() + require.NoError(t, run(cfg, expFunc, logger)) + + time.Sleep(1 * time.Second) + + // assert + require.Len(t, m.rms, 1) + ms := m.rms[0].ScopeMetrics[0].Metrics[0] + assert.Equal(t, "test", ms.Name) + + checkMetricTemporality(t, ms, tt.metricType, tt.expectedAggregationTemporality) + }) + } +} + func TestUnthrottled(t *testing.T) { // arrange cfg := &Config{