diff --git a/sdk/metric/go.mod b/sdk/metric/go.mod index f41f6636be7..da93f6e68e5 100644 --- a/sdk/metric/go.mod +++ b/sdk/metric/go.mod @@ -9,12 +9,12 @@ require ( go.opentelemetry.io/otel v1.23.0-rc.1 go.opentelemetry.io/otel/metric v1.23.0-rc.1 go.opentelemetry.io/otel/sdk v1.23.0-rc.1 + go.opentelemetry.io/otel/trace v1.23.0-rc.1 ) require ( github.com/davecgh/go-spew v1.1.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - go.opentelemetry.io/otel/trace v1.23.0-rc.1 // indirect golang.org/x/sys v0.16.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/sdk/metric/internal/exemplar/doc.go b/sdk/metric/internal/exemplar/doc.go new file mode 100644 index 00000000000..3caeb542c57 --- /dev/null +++ b/sdk/metric/internal/exemplar/doc.go @@ -0,0 +1,17 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// Package exemplar provides an implementation of the OpenTelemetry exemplar +// reservoir to be used in metric collection pipelines. +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" diff --git a/sdk/metric/internal/exemplar/reservoir.go b/sdk/metric/internal/exemplar/reservoir.go new file mode 100644 index 00000000000..b3654414ee7 --- /dev/null +++ b/sdk/metric/internal/exemplar/reservoir.go @@ -0,0 +1,51 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar // import "go.opentelemetry.io/otel/sdk/metric/internal/exemplar" + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" +) + +// Reservoir holds the sampled exemplar of measurements made. +type Reservoir[N int64 | float64] interface { + // Offer accepts the parameters associated with a measurement. The + // parameters will be stored as an exemplar if the Reservoir decides to + // sample the measurement. + // + // The passed ctx needs to contain any baggage or span that were active + // when the measurement was made. This information may be used by the + // Reservoir in making a sampling decision. + // + // The time t is the time when the measurement was made. The val and attr + // parameters are the value and dropped (filtered) attributes of the + // measurement respectively. + Offer(ctx context.Context, t time.Time, val N, attr []attribute.KeyValue) + + // Collect returns all the held exemplars. + // + // The Reservoir state is preserved after this call. See Flush to + // copy-and-clear instead. + Collect(dest *[]metricdata.Exemplar[N]) + + // Flush returns all the held exemplars. + // + // The Reservoir state is reset after this call. See Collect to preserve + // the state instead. + Flush(dest *[]metricdata.Exemplar[N]) +} diff --git a/sdk/metric/internal/exemplar/reservoir_test.go b/sdk/metric/internal/exemplar/reservoir_test.go new file mode 100644 index 00000000000..9ef8e964538 --- /dev/null +++ b/sdk/metric/internal/exemplar/reservoir_test.go @@ -0,0 +1,180 @@ +// Copyright The OpenTelemetry Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package exemplar + +import ( + "context" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/trace" +) + +// Sat Jan 01 2000 00:00:00 GMT+0000. +var staticTime = time.Unix(946684800, 0) + +type factory[N int64 | float64] func(requstedCap int) (r Reservoir[N], actualCap int) + +func ReservoirTest[N int64 | float64](f factory[N]) func(*testing.T) { + return func(t *testing.T) { + t.Helper() + + ctx := context.Background() + + t.Run("CaptureSpanContext", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + tID, sID := trace.TraceID{0x01}, trace.SpanID{0x01} + sc := trace.NewSpanContext(trace.SpanContextConfig{ + TraceID: tID, + SpanID: sID, + TraceFlags: trace.FlagsSampled, + }) + ctx := trace.ContextWithSpanContext(ctx, sc) + + r.Offer(ctx, staticTime, 10, nil) + + var dest []metricdata.Exemplar[N] + r.Collect(&dest) + + want := metricdata.Exemplar[N]{ + Time: staticTime, + Value: 10, + SpanID: []byte(sID[:]), + TraceID: []byte(tID[:]), + } + require.Len(t, dest, 1, "number of collected exemplars") + assert.Equal(t, want, dest[0]) + }) + + t.Run("FilterAttributes", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + adminTrue := attribute.Bool("admin", true) + r.Offer(ctx, staticTime, 10, []attribute.KeyValue{adminTrue}) + + var dest []metricdata.Exemplar[N] + r.Collect(&dest) + + want := metricdata.Exemplar[N]{ + FilteredAttributes: []attribute.KeyValue{adminTrue}, + Time: staticTime, + Value: 10, + } + require.Len(t, dest, 1, "number of collected exemplars") + assert.Equal(t, want, dest[0]) + }) + + t.Run("CollectDoesNotFlush", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + r.Offer(ctx, staticTime, 10, nil) + + var dest []metricdata.Exemplar[N] + r.Collect(&dest) + require.Len(t, dest, 1, "number of collected exemplars") + + dest = dest[:0] + r.Collect(&dest) + assert.Len(t, dest, 1, "Collect flushed reservoir") + }) + + t.Run("FlushFlushes", func(t *testing.T) { + t.Helper() + + r, n := f(1) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + r.Offer(ctx, staticTime, 10, nil) + + var dest []metricdata.Exemplar[N] + r.Flush(&dest) + require.Len(t, dest, 1, "number of flushed exemplars") + + r.Flush(&dest) + assert.Len(t, dest, 0, "Flush did not flush reservoir") + }) + + t.Run("MultipleOffers", func(t *testing.T) { + t.Helper() + + r, n := f(3) + if n < 1 { + t.Skip("skipping, reservoir capacity less than 1:", n) + } + + for i := 0; i < n+1; i++ { + v := N(i) + r.Offer(ctx, staticTime, v, nil) + } + + var dest []metricdata.Exemplar[N] + r.Flush(&dest) + assert.Len(t, dest, n, "multiple offers did not fill reservoir") + + // Ensure the flush reset also resets any couting state. + for i := 0; i < n+1; i++ { + v := N(2 * i) + r.Offer(ctx, staticTime, v, nil) + } + + dest = dest[:0] + r.Flush(&dest) + assert.Len(t, dest, n, "internal count state not reset") + }) + + t.Run("DropAll", func(t *testing.T) { + t.Helper() + + r, n := f(0) + if n > 0 { + t.Skip("skipping, reservoir capacity greater than 0:", n) + } + + r.Offer(context.Background(), staticTime, 10, nil) + + dest := []metricdata.Exemplar[N]{{}} // Should be reset to empty. + r.Collect(&dest) + assert.Len(t, dest, 0, "no exemplars should be collected") + + r.Offer(context.Background(), staticTime, 10, nil) + dest = []metricdata.Exemplar[N]{{}} // Should be reset to empty. + r.Flush(&dest) + assert.Len(t, dest, 0, "no exemplars should be flushed") + }) + } +}