From f759f170d8f7b3f4a9efa3fd6c1165be5c583f51 Mon Sep 17 00:00:00 2001 From: Tyler Yahn Date: Thu, 10 Aug 2023 13:31:49 -0700 Subject: [PATCH] Add Aggregation to the metric pkg --- sdk/metric/aggregation.go | 201 +++++++++++++++++++++++++++ sdk/metric/aggregation_test.go | 95 +++++++++++++ sdk/metric/benchmark_test.go | 3 +- sdk/metric/config_test.go | 3 +- sdk/metric/exporter.go | 3 +- sdk/metric/instrument.go | 3 +- sdk/metric/manual_reader.go | 9 +- sdk/metric/meter_test.go | 11 +- sdk/metric/periodic_reader.go | 3 +- sdk/metric/periodic_reader_test.go | 3 +- sdk/metric/pipeline.go | 35 +++-- sdk/metric/pipeline_registry_test.go | 101 +++++++------- sdk/metric/reader.go | 13 +- sdk/metric/reader_test.go | 2 +- sdk/metric/view.go | 7 +- sdk/metric/view_test.go | 22 ++- 16 files changed, 397 insertions(+), 117 deletions(-) create mode 100644 sdk/metric/aggregation.go create mode 100644 sdk/metric/aggregation_test.go diff --git a/sdk/metric/aggregation.go b/sdk/metric/aggregation.go new file mode 100644 index 000000000000..5583a448d141 --- /dev/null +++ b/sdk/metric/aggregation.go @@ -0,0 +1,201 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric // import "go.opentelemetry.io/otel/sdk/metric" + +import ( + "errors" + "fmt" +) + +// errAgg is wrapped by misconfigured aggregations. +var errAgg = errors.New("aggregation") + +// Aggregation is the aggregation used to summarize recorded measurements. +type Aggregation interface { + // copy returns a deep copy of the Aggregation. + copy() Aggregation + + // err returns an error for any misconfigured Aggregation. + err() error +} + +// AggregationDrop is an Aggregation that drops all recorded data. +type AggregationDrop struct{} // Drop has no parameters. + +var _ Aggregation = AggregationDrop{} + +// copy returns a deep copy of d. +func (d AggregationDrop) copy() Aggregation { return d } + +// err returns an error for any misconfiguration. A Drop aggregation has no +// parameters and cannot be misconfigured, therefore this always returns nil. +func (AggregationDrop) err() error { return nil } + +// AggregationDefault is an Aggregation that uses the default instrument kind selection +// mapping to select another Aggregation. A metric reader can be configured to +// make an aggregation selection based on instrument kind that differs from +// the default. This Aggregation ensures the default is used. +// +// See the "go.opentelemetry.io/otel/sdk/metric".DefaultAggregationSelector +// for information about the default instrument kind selection mapping. +type AggregationDefault struct{} // Default has no parameters. + +var _ Aggregation = AggregationDefault{} + +// copy returns a deep copy of d. +func (d AggregationDefault) copy() Aggregation { return d } + +// err returns an error for any misconfiguration. A Default aggregation has no +// parameters and cannot be misconfigured, therefore this always returns nil. +func (AggregationDefault) err() error { return nil } + +// Sum is an Aggregation that summarizes a set of measurements as their +// arithmetic sum. +type Sum struct{} // Sum has no parameters. + +var _ Aggregation = Sum{} + +// copy returns a deep copy of s. +func (s Sum) copy() Aggregation { return s } + +// err returns an error for any misconfiguration. A Sum aggregation has no +// parameters and cannot be misconfigured, therefore this always returns nil. +func (Sum) err() error { return nil } + +// AggregationLastValue is an Aggregation that summarizes a set of measurements as the +// last one made. +type AggregationLastValue struct{} // LastValue has no parameters. + +var _ Aggregation = AggregationLastValue{} + +// copy returns a deep copy of l. +func (l AggregationLastValue) copy() Aggregation { return l } + +// err returns an error for any misconfiguration. A LastValue aggregation has +// no parameters and cannot be misconfigured, therefore this always returns +// nil. +func (AggregationLastValue) err() error { return nil } + +// AggregationExplicitBucketHistogram is an Aggregation that summarizes a set of +// measurements as an histogram with explicitly defined buckets. +type AggregationExplicitBucketHistogram struct { + // Boundaries are the increasing bucket boundary values. Boundary values + // define bucket upper bounds. Buckets are exclusive of their lower + // boundary and inclusive of their upper bound (except at positive + // infinity). A measurement is defined to fall into the greatest-numbered + // bucket with a boundary that is greater than or equal to the + // measurement. As an example, boundaries defined as: + // + // []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000} + // + // Will define these buckets: + // + // (-∞, 0], (0, 5.0], (5.0, 10.0], (10.0, 25.0], (25.0, 50.0], + // (50.0, 75.0], (75.0, 100.0], (100.0, 250.0], (250.0, 500.0], + // (500.0, 1000.0], (1000.0, +∞) + Boundaries []float64 + // NoMinMax indicates whether to not record the min and max of the + // distribution. By default, these extrema are recorded. + // + // Recording these extrema for cumulative data is expected to have little + // value, they will represent the entire life of the instrument instead of + // just the current collection cycle. It is recommended to set this to true + // for that type of data to avoid computing the low-value extrema. + NoMinMax bool +} + +var _ Aggregation = AggregationExplicitBucketHistogram{} + +// errHist is returned by misconfigured ExplicitBucketHistograms. +var errHist = fmt.Errorf("%w: explicit bucket histogram", errAgg) + +// err returns an error for any misconfiguration. +func (h AggregationExplicitBucketHistogram) err() error { + if len(h.Boundaries) <= 1 { + return nil + } + + // Check boundaries are monotonic. + i := h.Boundaries[0] + for _, j := range h.Boundaries[1:] { + if i >= j { + return fmt.Errorf("%w: non-monotonic boundaries: %v", errHist, h.Boundaries) + } + i = j + } + + return nil +} + +// copy returns a deep copy of h. +func (h AggregationExplicitBucketHistogram) copy() Aggregation { + b := make([]float64, len(h.Boundaries)) + copy(b, h.Boundaries) + return AggregationExplicitBucketHistogram{ + Boundaries: b, + NoMinMax: h.NoMinMax, + } +} + +// AggregationBase2ExponentialHistogram is an Aggregation that summarizes a set of +// measurements as an histogram with bucket widths that grow exponentially. +type AggregationBase2ExponentialHistogram struct { + // MaxSize is the maximum number of buckets to use for the histogram. + MaxSize int32 + // MaxScale is the maximum resolution scale to use for the histogram. + // + // MaxScale has a maximum value of 20. Using a value of 20 means the + // maximum number of buckets that can fit within the range of a + // signed 32-bit integer index could be used. + // + // MaxScale has a minimum value of -10. Using a value of -10 means only + // two buckets will be use. + MaxScale int32 + + // NoMinMax indicates whether to not record the min and max of the + // distribution. By default, these extrema are recorded. + // + // Recording these extrema for cumulative data is expected to have little + // value, they will represent the entire life of the instrument instead of + // just the current collection cycle. It is recommended to set this to true + // for that type of data to avoid computing the low-value extrema. + NoMinMax bool +} + +var _ Aggregation = AggregationBase2ExponentialHistogram{} + +// copy returns a deep copy of the Aggregation. +func (e AggregationBase2ExponentialHistogram) copy() Aggregation { + return e +} + +const ( + expoMaxScale = 20 + expoMinScale = -10 +) + +// errExpoHist is returned by misconfigured Base2ExponentialBucketHistograms. +var errExpoHist = fmt.Errorf("%w: exponential histogram", errAgg) + +// err returns an error for any misconfigured Aggregation. +func (e AggregationBase2ExponentialHistogram) err() error { + if e.MaxScale > expoMaxScale { + return fmt.Errorf("%w: max size %d is greater than maximum scale %d", errExpoHist, e.MaxSize, expoMaxScale) + } + if e.MaxSize <= 0 { + return fmt.Errorf("%w: max size %d is less than or equal to zero", errExpoHist, e.MaxSize) + } + return nil +} diff --git a/sdk/metric/aggregation_test.go b/sdk/metric/aggregation_test.go new file mode 100644 index 000000000000..136fdc9536c1 --- /dev/null +++ b/sdk/metric/aggregation_test.go @@ -0,0 +1,95 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package metric + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestAggregationErr(t *testing.T) { + t.Run("DropOperation", func(t *testing.T) { + assert.NoError(t, AggregationDrop{}.err()) + }) + + t.Run("SumOperation", func(t *testing.T) { + assert.NoError(t, Sum{}.err()) + }) + + t.Run("LastValueOperation", func(t *testing.T) { + assert.NoError(t, AggregationLastValue{}.err()) + }) + + t.Run("ExplicitBucketHistogramOperation", func(t *testing.T) { + assert.NoError(t, AggregationExplicitBucketHistogram{}.err()) + + assert.NoError(t, AggregationExplicitBucketHistogram{ + Boundaries: []float64{0}, + NoMinMax: true, + }.err()) + + assert.NoError(t, AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 1000}, + }.err()) + }) + + t.Run("NonmonotonicHistogramBoundaries", func(t *testing.T) { + assert.ErrorIs(t, AggregationExplicitBucketHistogram{ + Boundaries: []float64{2, 1}, + }.err(), errAgg) + + assert.ErrorIs(t, AggregationExplicitBucketHistogram{ + Boundaries: []float64{0, 1, 2, 1, 3, 4}, + }.err(), errAgg) + }) + + t.Run("ExponentialHistogramOperation", func(t *testing.T) { + assert.NoError(t, AggregationBase2ExponentialHistogram{ + MaxSize: 160, + MaxScale: 20, + }.err()) + + assert.NoError(t, AggregationBase2ExponentialHistogram{ + MaxSize: 1, + NoMinMax: true, + }.err()) + + assert.NoError(t, AggregationBase2ExponentialHistogram{ + MaxSize: 1024, + MaxScale: -3, + }.err()) + }) + + t.Run("InvalidExponentialHistogramOperation", func(t *testing.T) { + // MazSize must be greater than 0 + assert.ErrorIs(t, AggregationBase2ExponentialHistogram{}.err(), errAgg) + + // MaxScale Must be <=20 + assert.ErrorIs(t, AggregationBase2ExponentialHistogram{ + MaxSize: 1, + MaxScale: 30, + }.err(), errAgg) + }) +} + +func TestExplicitBucketHistogramDeepCopy(t *testing.T) { + const orig = 0.0 + b := []float64{orig} + h := AggregationExplicitBucketHistogram{Boundaries: b} + cpH := h.copy().(AggregationExplicitBucketHistogram) + b[0] = orig + 1 + assert.Equal(t, orig, cpH.Boundaries[0], "changing the underlying slice data should not affect the copy") +} diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index a243738cfb7a..dd75de3cd63b 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -23,7 +23,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -36,7 +35,7 @@ var viewBenchmarks = []struct { "DropView", []View{NewView( Instrument{Name: "*"}, - Stream{Aggregation: aggregation.Drop{}}, + Stream{Aggregation: AggregationDrop{}}, )}, }, { diff --git a/sdk/metric/config_test.go b/sdk/metric/config_test.go index 6e48a4599cae..ae7159f2d2e5 100644 --- a/sdk/metric/config_test.go +++ b/sdk/metric/config_test.go @@ -22,7 +22,6 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/resource" ) @@ -39,7 +38,7 @@ type reader struct { var _ Reader = (*reader)(nil) -func (r *reader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. +func (r *reader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type. return r.aggregationFunc(kind) } diff --git a/sdk/metric/exporter.go b/sdk/metric/exporter.go index 7efb8bf2fbe9..695cf466c0e4 100644 --- a/sdk/metric/exporter.go +++ b/sdk/metric/exporter.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -39,7 +38,7 @@ type Exporter interface { // // This method needs to be concurrent safe with itself and all the other // Exporter methods. - Aggregation(InstrumentKind) aggregation.Aggregation + Aggregation(InstrumentKind) Aggregation // Export serializes and transmits metric data to a receiver. // diff --git a/sdk/metric/instrument.go b/sdk/metric/instrument.go index eff2f179a516..8007d40d216f 100644 --- a/sdk/metric/instrument.go +++ b/sdk/metric/instrument.go @@ -25,7 +25,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" ) @@ -145,7 +144,7 @@ type Stream struct { // Unit is the unit of measurement recorded. Unit string // Aggregation the stream uses for an instrument. - Aggregation aggregation.Aggregation + Aggregation Aggregation // AllowAttributeKeys are an allow-list of attribute keys that will be // preserved for the stream. Any attribute recorded for the stream with a // key not in this slice will be dropped. diff --git a/sdk/metric/manual_reader.go b/sdk/metric/manual_reader.go index 898af86edc04..b472b22953d6 100644 --- a/sdk/metric/manual_reader.go +++ b/sdk/metric/manual_reader.go @@ -22,7 +22,6 @@ import ( "sync/atomic" "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -87,7 +86,7 @@ func (mr *ManualReader) temporality(kind InstrumentKind) metricdata.Temporality } // aggregation returns what Aggregation to use for kind. -func (mr *ManualReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. +func (mr *ManualReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type. return mr.aggregationSelector(kind) } @@ -225,13 +224,13 @@ func (t temporalitySelectorOption) applyManual(mrc manualReaderConfig) manualRea // or the aggregation explicitly passed for a view matching an instrument. func WithAggregationSelector(selector AggregationSelector) ManualReaderOption { // Deep copy and validate before using. - wrapped := func(ik InstrumentKind) aggregation.Aggregation { + wrapped := func(ik InstrumentKind) Aggregation { a := selector(ik) if a == nil { return nil } - cpA := a.Copy() - if err := cpA.Err(); err != nil { + cpA := a.copy() + if err := cpA.err(); err != nil { cpA = DefaultAggregationSelector(ik) global.Error( err, "using default aggregation instead", diff --git a/sdk/metric/meter_test.go b/sdk/metric/meter_test.go index 65f5acfa6c8e..50ee4ea77ef6 100644 --- a/sdk/metric/meter_test.go +++ b/sdk/metric/meter_test.go @@ -31,7 +31,6 @@ import ( "go.opentelemetry.io/otel/internal/global" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.opentelemetry.io/otel/sdk/resource" @@ -1132,8 +1131,8 @@ func TestUnregisterUnregisters(t *testing.T) { } func TestRegisterCallbackDropAggregations(t *testing.T) { - aggFn := func(InstrumentKind) aggregation.Aggregation { - return aggregation.Drop{} + aggFn := func(InstrumentKind) Aggregation { + return AggregationDrop{} } r := NewManualReader(WithAggregationSelector(aggFn)) mp := NewMeterProvider(WithReader(r)) @@ -1813,11 +1812,11 @@ func BenchmarkInstrumentCreation(b *testing.B) { } } -func testNilAggregationSelector(InstrumentKind) aggregation.Aggregation { +func testNilAggregationSelector(InstrumentKind) Aggregation { return nil } -func testDefaultAggregationSelector(InstrumentKind) aggregation.Aggregation { - return aggregation.Default{} +func testDefaultAggregationSelector(InstrumentKind) Aggregation { + return AggregationDefault{} } func testUndefinedTemporalitySelector(InstrumentKind) metricdata.Temporality { return metricdata.Temporality(0) diff --git a/sdk/metric/periodic_reader.go b/sdk/metric/periodic_reader.go index f62a2ae41e32..f5c81f43cb32 100644 --- a/sdk/metric/periodic_reader.go +++ b/sdk/metric/periodic_reader.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -219,7 +218,7 @@ func (r *PeriodicReader) temporality(kind InstrumentKind) metricdata.Temporality } // aggregation returns what Aggregation to use for kind. -func (r *PeriodicReader) aggregation(kind InstrumentKind) aggregation.Aggregation { // nolint:revive // import-shadow for method scoped by type. +func (r *PeriodicReader) aggregation(kind InstrumentKind) Aggregation { // nolint:revive // import-shadow for method scoped by type. return r.exporter.Aggregation(kind) } diff --git a/sdk/metric/periodic_reader_test.go b/sdk/metric/periodic_reader_test.go index 0a34b7e9951c..736e6fdf178d 100644 --- a/sdk/metric/periodic_reader_test.go +++ b/sdk/metric/periodic_reader_test.go @@ -24,7 +24,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -167,7 +166,7 @@ func (e *fnExporter) Temporality(k InstrumentKind) metricdata.Temporality { return DefaultTemporalitySelector(k) } -func (e *fnExporter) Aggregation(k InstrumentKind) aggregation.Aggregation { +func (e *fnExporter) Aggregation(k InstrumentKind) Aggregation { if e.aggregationFunc != nil { return e.aggregationFunc(k) } diff --git a/sdk/metric/pipeline.go b/sdk/metric/pipeline.go index fd28a4afc15d..31bc80d4a4a5 100644 --- a/sdk/metric/pipeline.go +++ b/sdk/metric/pipeline.go @@ -27,7 +27,6 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/embedded" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/metricdata" @@ -308,11 +307,11 @@ type aggVal[N int64 | float64] struct { // is returned. func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind InstrumentKind, stream Stream) (meas aggregate.Measure[N], aggID uint64, err error) { switch stream.Aggregation.(type) { - case nil, aggregation.Default: + case nil, AggregationDefault: // Undefined, nil, means to use the default from the reader. stream.Aggregation = i.pipeline.reader.aggregation(kind) switch stream.Aggregation.(type) { - case nil, aggregation.Default: + case nil, AggregationDefault: // If the reader returns default or nil use the default selector. stream.Aggregation = DefaultAggregationSelector(kind) } @@ -415,15 +414,15 @@ func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID { // aggregateFunc returns new aggregate functions matching agg, kind, and // monotonic. If the agg is unknown or temporality is invalid, an error is // returned. -func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) { +func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg Aggregation, kind InstrumentKind) (meas aggregate.Measure[N], comp aggregate.ComputeAggregation, err error) { switch a := agg.(type) { - case aggregation.Default: + case AggregationDefault: return i.aggregateFunc(b, DefaultAggregationSelector(kind), kind) - case aggregation.Drop: + case AggregationDrop: // Return nil in and out to signify the drop aggregator. - case aggregation.LastValue: + case AggregationLastValue: meas, comp = b.LastValue() - case aggregation.Sum: + case Sum: switch kind { case InstrumentKindObservableCounter: meas, comp = b.PrecomputedSum(true) @@ -436,7 +435,7 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // instrumentKindUndefined or other invalid instrument kinds. meas, comp = b.Sum(false) } - case aggregation.ExplicitBucketHistogram: + case AggregationExplicitBucketHistogram: var noSum bool switch kind { case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge: @@ -445,8 +444,8 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations noSum = true } - meas, comp = b.ExplicitBucketHistogram(a, noSum) - case aggregation.Base2ExponentialHistogram: + meas, comp = b.ExplicitBucketHistogram(a.Boundaries, a.NoMinMax, noSum) + case AggregationBase2ExponentialHistogram: var noSum bool switch kind { case InstrumentKindUpDownCounter, InstrumentKindObservableUpDownCounter, InstrumentKindObservableGauge: @@ -455,7 +454,7 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.21.0/specification/metrics/sdk.md#histogram-aggregations noSum = true } - meas, comp = b.ExponentialBucketHistogram(a, noSum) + meas, comp = b.ExponentialBucketHistogram(a.MaxSize, a.MaxScale, a.NoMinMax, noSum) default: err = errUnknownAggregation @@ -475,11 +474,11 @@ func (i *inserter[N]) aggregateFunc(b aggregate.Builder[N], agg aggregation.Aggr // | Observable Counter | ✓ | | ✓ | ✓ | ✓ | // | Observable UpDownCounter | ✓ | | ✓ | ✓ | ✓ | // | Observable Gauge | ✓ | ✓ | | ✓ | ✓ |. -func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) error { +func isAggregatorCompatible(kind InstrumentKind, agg Aggregation) error { switch agg.(type) { - case aggregation.Default: + case AggregationDefault: return nil - case aggregation.ExplicitBucketHistogram, aggregation.Base2ExponentialHistogram: + case AggregationExplicitBucketHistogram, AggregationBase2ExponentialHistogram: switch kind { case InstrumentKindCounter, InstrumentKindUpDownCounter, @@ -491,7 +490,7 @@ func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) er default: return errIncompatibleAggregation } - case aggregation.Sum: + case Sum: switch kind { case InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter, InstrumentKindCounter, InstrumentKindHistogram, InstrumentKindUpDownCounter: return nil @@ -500,14 +499,14 @@ func isAggregatorCompatible(kind InstrumentKind, agg aggregation.Aggregation) er // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation } - case aggregation.LastValue: + case AggregationLastValue: if kind == InstrumentKindObservableGauge { return nil } // TODO: review need for aggregation check after // https://github.com/open-telemetry/opentelemetry-specification/issues/2710 return errIncompatibleAggregation - case aggregation.Drop: + case AggregationDrop: return nil default: // This is used passed checking for default, it should be an error at this point. diff --git a/sdk/metric/pipeline_registry_test.go b/sdk/metric/pipeline_registry_test.go index 52fedd124714..7c153ee68e87 100644 --- a/sdk/metric/pipeline_registry_test.go +++ b/sdk/metric/pipeline_registry_test.go @@ -26,7 +26,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/internal/aggregate" "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" @@ -35,14 +34,12 @@ import ( var defaultView = NewView(Instrument{Name: "*"}, Stream{}) -type invalidAggregation struct { - aggregation.Aggregation -} +type invalidAggregation struct{} -func (invalidAggregation) Copy() aggregation.Aggregation { +func (invalidAggregation) copy() Aggregation { return invalidAggregation{} } -func (invalidAggregation) Err() error { +func (invalidAggregation) err() error { return nil } @@ -146,7 +143,7 @@ func assertLastValue[N int64 | float64](t *testing.T, meas []aggregate.Measure[N func testCreateAggregators[N int64 | float64](t *testing.T) { changeAggView := NewView( Instrument{Name: "foo"}, - Stream{Aggregation: aggregation.ExplicitBucketHistogram{ + Stream{Aggregation: AggregationExplicitBucketHistogram{ Boundaries: []float64{0, 100}, NoMinMax: true, }}, @@ -176,7 +173,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { }{ { name: "Default/Drop", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Drop{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDrop{} })), inst: instruments[InstrumentKindCounter], validate: func(t *testing.T, meas []aggregate.Measure[N], comps []aggregate.ComputeAggregation, err error) { t.Helper() @@ -304,37 +301,37 @@ func testCreateAggregators[N int64 | float64](t *testing.T) { }, { name: "Reader/Default/Cumulative/Sum/Monotonic", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDefault{} })), inst: instruments[InstrumentKindCounter], validate: assertSum[N](1, metricdata.CumulativeTemporality, true, [2]N{1, 4}), }, { name: "Reader/Default/Cumulative/Sum/NonMonotonic", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDefault{} })), inst: instruments[InstrumentKindUpDownCounter], validate: assertSum[N](1, metricdata.CumulativeTemporality, false, [2]N{1, 4}), }, { name: "Reader/Default/Cumulative/ExplicitBucketHistogram", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDefault{} })), inst: instruments[InstrumentKindHistogram], validate: assertHist[N](metricdata.CumulativeTemporality), }, { name: "Reader/Default/Cumulative/PrecomputedSum/Monotonic", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDefault{} })), inst: instruments[InstrumentKindObservableCounter], validate: assertSum[N](1, metricdata.CumulativeTemporality, true, [2]N{1, 3}), }, { name: "Reader/Default/Cumulative/PrecomputedSum/NonMonotonic", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDefault{} })), inst: instruments[InstrumentKindObservableUpDownCounter], validate: assertSum[N](1, metricdata.CumulativeTemporality, false, [2]N{1, 3}), }, { name: "Reader/Default/Gauge", - reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Default{} })), + reader: NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationDefault{} })), inst: instruments[InstrumentKindObservableGauge], validate: assertLastValue[N], }, @@ -409,7 +406,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) { func TestPipelineRegistryCreateAggregators(t *testing.T) { renameView := NewView(Instrument{Name: "foo"}, Stream{Name: "bar"}) testRdr := NewManualReader() - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.ExplicitBucketHistogram{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return AggregationExplicitBucketHistogram{} })) testCases := []struct { name string @@ -498,7 +495,7 @@ func TestPipelineRegistryResource(t *testing.T) { } func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) { - testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) aggregation.Aggregation { return aggregation.Sum{} })) + testRdrHistogram := NewManualReader(WithAggregationSelector(func(ik InstrumentKind) Aggregation { return Sum{} })) readers := []Reader{testRdrHistogram} views := []View{defaultView} @@ -599,187 +596,187 @@ func TestIsAggregatorCompatible(t *testing.T) { testCases := []struct { name string kind InstrumentKind - agg aggregation.Aggregation + agg Aggregation want error }{ { name: "SyncCounter and Drop", kind: InstrumentKindCounter, - agg: aggregation.Drop{}, + agg: AggregationDrop{}, }, { name: "SyncCounter and LastValue", kind: InstrumentKindCounter, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, want: errIncompatibleAggregation, }, { name: "SyncCounter and Sum", kind: InstrumentKindCounter, - agg: aggregation.Sum{}, + agg: Sum{}, }, { name: "SyncCounter and ExplicitBucketHistogram", kind: InstrumentKindCounter, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, }, { name: "SyncCounter and ExponentialHistogram", kind: InstrumentKindCounter, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, }, { name: "SyncUpDownCounter and Drop", kind: InstrumentKindUpDownCounter, - agg: aggregation.Drop{}, + agg: AggregationDrop{}, }, { name: "SyncUpDownCounter and LastValue", kind: InstrumentKindUpDownCounter, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, want: errIncompatibleAggregation, }, { name: "SyncUpDownCounter and Sum", kind: InstrumentKindUpDownCounter, - agg: aggregation.Sum{}, + agg: Sum{}, }, { name: "SyncUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindUpDownCounter, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, }, { name: "SyncUpDownCounter and ExponentialHistogram", kind: InstrumentKindUpDownCounter, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, }, { name: "SyncHistogram and Drop", kind: InstrumentKindHistogram, - agg: aggregation.Drop{}, + agg: AggregationDrop{}, }, { name: "SyncHistogram and LastValue", kind: InstrumentKindHistogram, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, want: errIncompatibleAggregation, }, { name: "SyncHistogram and Sum", kind: InstrumentKindHistogram, - agg: aggregation.Sum{}, + agg: Sum{}, }, { name: "SyncHistogram and ExplicitBucketHistogram", kind: InstrumentKindHistogram, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, }, { name: "SyncHistogram and ExponentialHistogram", kind: InstrumentKindHistogram, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, }, { name: "ObservableCounter and Drop", kind: InstrumentKindObservableCounter, - agg: aggregation.Drop{}, + agg: AggregationDrop{}, }, { name: "ObservableCounter and LastValue", kind: InstrumentKindObservableCounter, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, want: errIncompatibleAggregation, }, { name: "ObservableCounter and Sum", kind: InstrumentKindObservableCounter, - agg: aggregation.Sum{}, + agg: Sum{}, }, { name: "ObservableCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableCounter, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, }, { name: "ObservableCounter and ExponentialHistogram", kind: InstrumentKindObservableCounter, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, }, { name: "ObservableUpDownCounter and Drop", kind: InstrumentKindObservableUpDownCounter, - agg: aggregation.Drop{}, + agg: AggregationDrop{}, }, { name: "ObservableUpDownCounter and LastValue", kind: InstrumentKindObservableUpDownCounter, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, want: errIncompatibleAggregation, }, { name: "ObservableUpDownCounter and Sum", kind: InstrumentKindObservableUpDownCounter, - agg: aggregation.Sum{}, + agg: Sum{}, }, { name: "ObservableUpDownCounter and ExplicitBucketHistogram", kind: InstrumentKindObservableUpDownCounter, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, }, { name: "ObservableUpDownCounter and ExponentialHistogram", kind: InstrumentKindObservableUpDownCounter, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, }, { name: "ObservableGauge and Drop", kind: InstrumentKindObservableGauge, - agg: aggregation.Drop{}, + agg: AggregationDrop{}, }, { - name: "ObservableGauge and aggregation.LastValue{}", + name: "ObservableGauge and LastValue{}", kind: InstrumentKindObservableGauge, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, }, { name: "ObservableGauge and Sum", kind: InstrumentKindObservableGauge, - agg: aggregation.Sum{}, + agg: Sum{}, want: errIncompatibleAggregation, }, { name: "ObservableGauge and ExplicitBucketHistogram", kind: InstrumentKindObservableGauge, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, }, { name: "ObservableGauge and ExponentialHistogram", kind: InstrumentKindObservableGauge, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, }, { name: "unknown kind with Sum should error", kind: undefinedInstrument, - agg: aggregation.Sum{}, + agg: Sum{}, want: errIncompatibleAggregation, }, { name: "unknown kind with LastValue should error", kind: undefinedInstrument, - agg: aggregation.LastValue{}, + agg: AggregationLastValue{}, want: errIncompatibleAggregation, }, { name: "unknown kind with Histogram should error", kind: undefinedInstrument, - agg: aggregation.ExplicitBucketHistogram{}, + agg: AggregationExplicitBucketHistogram{}, want: errIncompatibleAggregation, }, { name: "unknown kind with Histogram should error", kind: undefinedInstrument, - agg: aggregation.Base2ExponentialHistogram{}, + agg: AggregationBase2ExponentialHistogram{}, want: errIncompatibleAggregation, }, } diff --git a/sdk/metric/reader.go b/sdk/metric/reader.go index da68ef33686b..03024c5dab54 100644 --- a/sdk/metric/reader.go +++ b/sdk/metric/reader.go @@ -18,7 +18,6 @@ import ( "context" "fmt" - "go.opentelemetry.io/otel/sdk/metric/aggregation" "go.opentelemetry.io/otel/sdk/metric/metricdata" ) @@ -74,7 +73,7 @@ type Reader interface { // // This method needs to be concurrent safe with itself and all the other // Reader methods. - aggregation(InstrumentKind) aggregation.Aggregation // nolint:revive // import-shadow for method scoped by type. + aggregation(InstrumentKind) Aggregation // nolint:revive // import-shadow for method scoped by type. // Collect gathers and returns all metric data related to the Reader from // the SDK and stores it in out. An error is returned if this is called @@ -154,7 +153,7 @@ func DefaultTemporalitySelector(InstrumentKind) metricdata.Temporality { // // If the Aggregation returned is nil or DefaultAggregation, the selection from // DefaultAggregationSelector will be used. -type AggregationSelector func(InstrumentKind) aggregation.Aggregation +type AggregationSelector func(InstrumentKind) Aggregation // DefaultAggregationSelector returns the default aggregation and parameters // that will be used to summarize measurement made from an instrument of @@ -162,14 +161,14 @@ type AggregationSelector func(InstrumentKind) aggregation.Aggregation // mapping: Counter ⇨ Sum, Observable Counter ⇨ Sum, UpDownCounter ⇨ Sum, // Observable UpDownCounter ⇨ Sum, Observable Gauge ⇨ LastValue, // Histogram ⇨ ExplicitBucketHistogram. -func DefaultAggregationSelector(ik InstrumentKind) aggregation.Aggregation { +func DefaultAggregationSelector(ik InstrumentKind) Aggregation { switch ik { case InstrumentKindCounter, InstrumentKindUpDownCounter, InstrumentKindObservableCounter, InstrumentKindObservableUpDownCounter: - return aggregation.Sum{} + return Sum{} case InstrumentKindObservableGauge: - return aggregation.LastValue{} + return AggregationLastValue{} case InstrumentKindHistogram: - return aggregation.ExplicitBucketHistogram{ + return AggregationExplicitBucketHistogram{ Boundaries: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000}, NoMinMax: false, } diff --git a/sdk/metric/reader_test.go b/sdk/metric/reader_test.go index 8904d68ee409..c23e3131729a 100644 --- a/sdk/metric/reader_test.go +++ b/sdk/metric/reader_test.go @@ -324,7 +324,7 @@ func TestDefaultAggregationSelector(t *testing.T) { } for _, ik := range iKinds { - assert.NoError(t, DefaultAggregationSelector(ik).Err(), ik) + assert.NoError(t, DefaultAggregationSelector(ik).err(), ik) } } diff --git a/sdk/metric/view.go b/sdk/metric/view.go index f1df24466bca..2d0fe18d7e94 100644 --- a/sdk/metric/view.go +++ b/sdk/metric/view.go @@ -20,7 +20,6 @@ import ( "strings" "go.opentelemetry.io/otel/internal/global" - "go.opentelemetry.io/otel/sdk/metric/aggregation" ) var ( @@ -92,10 +91,10 @@ func NewView(criteria Instrument, mask Stream) View { matchFunc = criteria.matches } - var agg aggregation.Aggregation + var agg Aggregation if mask.Aggregation != nil { - agg = mask.Aggregation.Copy() - if err := agg.Err(); err != nil { + agg = mask.Aggregation.copy() + if err := agg.err(); err != nil { global.Error( err, "not using aggregation with view", "criteria", criteria, diff --git a/sdk/metric/view_test.go b/sdk/metric/view_test.go index b8f6c9214683..07f0c906cb87 100644 --- a/sdk/metric/view_test.go +++ b/sdk/metric/view_test.go @@ -28,7 +28,6 @@ import ( "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk/instrumentation" - "go.opentelemetry.io/otel/sdk/metric/aggregation" ) var ( @@ -395,13 +394,13 @@ func TestNewViewReplace(t *testing.T) { }, { name: "Aggregation", - mask: Stream{Aggregation: aggregation.LastValue{}}, + mask: Stream{Aggregation: AggregationLastValue{}}, want: func(i Instrument) Stream { return Stream{ Name: i.Name, Description: i.Description, Unit: i.Unit, - Aggregation: aggregation.LastValue{}, + Aggregation: AggregationLastValue{}, } }, }, @@ -423,14 +422,14 @@ func TestNewViewReplace(t *testing.T) { Name: alt, Description: alt, Unit: "1", - Aggregation: aggregation.LastValue{}, + Aggregation: AggregationLastValue{}, }, want: func(i Instrument) Stream { return Stream{ Name: alt, Description: alt, Unit: "1", - Aggregation: aggregation.LastValue{}, + Aggregation: AggregationLastValue{}, } }, }, @@ -446,20 +445,19 @@ func TestNewViewReplace(t *testing.T) { } type badAgg struct { - aggregation.Aggregation - err error + e error } -func (a badAgg) Copy() aggregation.Aggregation { return a } +func (a badAgg) copy() Aggregation { return a } -func (a badAgg) Err() error { return a.err } +func (a badAgg) err() error { return a.e } func TestNewViewAggregationErrorLogged(t *testing.T) { tLog := testr.NewWithOptions(t, testr.Options{Verbosity: 6}) l := &logCounter{LogSink: tLog.GetSink()} otel.SetLogger(logr.New(l)) - agg := badAgg{err: assert.AnError} + agg := badAgg{e: assert.AnError} mask := Stream{Aggregation: agg} got, match := NewView(completeIP, mask)(completeIP) require.True(t, match, "view did not match exact criteria") @@ -532,7 +530,7 @@ func ExampleNewView_drop() { // library. view := NewView( Instrument{Scope: instrumentation.Scope{Name: "db"}}, - Stream{Aggregation: aggregation.Drop{}}, + Stream{Aggregation: AggregationDrop{}}, ) // The created view can then be registered with the OpenTelemetry metric @@ -548,7 +546,7 @@ func ExampleNewView_drop() { fmt.Printf("aggregation: %#v", stream.Aggregation) // Output: // name: queries - // aggregation: aggregation.Drop{} + // aggregation: metric.AggregationDrop{} } func ExampleNewView_wildcard() {