diff --git a/CHANGELOG.md b/CHANGELOG.md index 1914e7ba0eb..f6c575d80d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm - `NewGRPCDriver` function returns a `ProtocolDriver` that maintains a single gRPC connection to the collector. (#1369) - Documentation about the project's versioning policy. (#1388) - `NewSplitDriver` for OTLP exporter that allows sending traces and metrics to different endpoints. (#1418) +- Metric SDK adds `Enricher` API for applying baggage attributes as metric labels in request context. (#1421) ### Changed diff --git a/baggage/baggage.go b/baggage/baggage.go index 66b8416f1f3..e20ceebc600 100644 --- a/baggage/baggage.go +++ b/baggage/baggage.go @@ -36,6 +36,12 @@ func Set(ctx context.Context) label.Set { return label.NewSet(values...) } +// ForEach allows visiting the baggage values in a context without +// copying a slice. +func ForEach(ctx context.Context, f func(kv label.KeyValue) bool) { + baggage.MapFromContext(ctx).Foreach(f) +} + // Value returns the value related to key in the baggage of ctx. If no // value is set, the returned label.Value will be an uninitialized zero-value // with type INVALID. diff --git a/baggage/baggage_test.go b/baggage/baggage_test.go index f87c3b2b64d..dbb5bdf5c00 100644 --- a/baggage/baggage_test.go +++ b/baggage/baggage_test.go @@ -18,6 +18,8 @@ import ( "context" "testing" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/internal/baggage" "go.opentelemetry.io/otel/label" ) @@ -84,3 +86,22 @@ func TestBaggage(t *testing.T) { t.Fatal("WithoutBaggage failed to clear baggage") } } + +func TestForEach(t *testing.T) { + ctx := ContextWithValues( + context.Background(), + label.String("A", "B"), + label.String("C", "D"), + ) + + out := map[string]string{} + ForEach(ctx, func(kv label.KeyValue) bool { + out[string(kv.Key)] = kv.Value.Emit() + return true + }) + + require.EqualValues(t, map[string]string{ + "A": "B", + "C": "D", + }, out) +} diff --git a/sdk/export/metric/metric.go b/sdk/export/metric/metric.go index 525fe8f3a63..528e71f84c4 100644 --- a/sdk/export/metric/metric.go +++ b/sdk/export/metric/metric.go @@ -125,6 +125,31 @@ type Checkpointer interface { FinishCollection() error } +// Enricher supports extracting baggage attributes and applying them +// as metric event labels and also permits erasing metric labels at +// runtime. When configured with an Accumulator, the Enricher is +// applied to all synchronous instruments. +// +// This receives the context and the event labels and +// returns the effective KeyValue slice. If this returns a +// nil KeyValue slice or a non-nil error, the caller SHOULD +// use the original KeyValue slice. +// +// This SHOULD NOT modify the input label slice. +// +// Note: This interface does not include the *metric.Descriptor +// because it creates significant complexity and/or cost to enrich +// RecordBatch() events. +// +// Note: This interface is called with input labels before they are +// sorted and de-duplicated as described in +// label.NewSetWithSortableFiltered(). The enricher has control over +// whether label values should override the input making use of the +// last-value semantic detailed for metric event labels in general. +// Appending baggage values means they override the call-site labels, +// prepending baggage means call-site labels override baggage labels. +type Enricher func(context.Context, []label.KeyValue) ([]label.KeyValue, error) + // Aggregator implements a specific aggregation behavior, e.g., a // behavior to track a sequence of updates to an instrument. Sum-only // instruments commonly use a simple Sum aggregator, but for the diff --git a/sdk/metric/benchmark_test.go b/sdk/metric/benchmark_test.go index bb32bb34cdb..b117bfdb8bc 100644 --- a/sdk/metric/benchmark_test.go +++ b/sdk/metric/benchmark_test.go @@ -42,7 +42,7 @@ func newFixture(b *testing.B) *benchFixture { AggregatorSelector: processortest.AggregatorSelector(), } - bf.accumulator = sdk.NewAccumulator(bf, nil) + bf.accumulator = sdk.NewAccumulator(bf, nil, nil) bf.meter = metric.WrapMeterImpl(bf.accumulator, "benchmarks") return bf } diff --git a/sdk/metric/controller/pull/pull.go b/sdk/metric/controller/pull/pull.go index 68ea34b351c..cb4bcb2f258 100644 --- a/sdk/metric/controller/pull/pull.go +++ b/sdk/metric/controller/pull/pull.go @@ -61,6 +61,7 @@ func New(checkpointer export.Checkpointer, options ...Option) *Controller { accum := sdk.NewAccumulator( checkpointer, config.Resource, + nil, ) return &Controller{ accumulator: accum, diff --git a/sdk/metric/controller/push/push.go b/sdk/metric/controller/push/push.go index 4350de32de5..824e42e3304 100644 --- a/sdk/metric/controller/push/push.go +++ b/sdk/metric/controller/push/push.go @@ -62,6 +62,7 @@ func New(checkpointer export.Checkpointer, exporter export.Exporter, opts ...Opt impl := sdk.NewAccumulator( checkpointer, c.Resource, + nil, ) return &Controller{ provider: registry.NewMeterProvider(impl), diff --git a/sdk/metric/correct_test.go b/sdk/metric/correct_test.go index 151f3bdf6fc..a8e9bdf0610 100644 --- a/sdk/metric/correct_test.go +++ b/sdk/metric/correct_test.go @@ -24,6 +24,7 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/baggage" "go.opentelemetry.io/otel/label" "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/otel/metric/number" @@ -98,11 +99,21 @@ func newSDK(t *testing.T) (metric.Meter, *metricsdk.Accumulator, *correctnessPro accum := metricsdk.NewAccumulator( processor, testResource, + nil, ) meter := metric.WrapMeterImpl(accum, "test") return meter, accum, processor } +func expectAndResetOutput(t *testing.T, processor *correctnessProcessor, expect map[string]float64) { + out := processortest.NewOutput(label.DefaultEncoder()) + for _, rec := range processor.accumulations { + require.NoError(t, out.AddAccumulation(rec)) + } + require.EqualValues(t, expect, out.Map()) + processor.accumulations = nil +} + func (ci *correctnessProcessor) Process(accumulation export.Accumulation) error { ci.accumulations = append(ci.accumulations, accumulation) return nil @@ -351,12 +362,9 @@ func TestObserverCollection(t *testing.T) { collected := sdk.Collect(ctx) require.Equal(t, collected, len(processor.accumulations)) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } mult := float64(mult) - require.EqualValues(t, map[string]float64{ + + expectAndResetOutput(t, processor, map[string]float64{ "float.valueobserver.lastvalue/A=B/R=V": -mult, "float.valueobserver.lastvalue/C=D/R=V": -mult, "int.valueobserver.lastvalue//R=V": mult, @@ -371,7 +379,7 @@ func TestObserverCollection(t *testing.T) { "float.updownsumobserver.sum/C=D/R=V": mult, "int.updownsumobserver.sum//R=V": -mult, "int.updownsumobserver.sum/A=B/R=V": mult, - }, out.Map()) + }) } } @@ -456,11 +464,7 @@ func TestObserverBatch(t *testing.T) { require.Equal(t, collected, len(processor.accumulations)) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } - require.EqualValues(t, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "float.sumobserver.sum//R=V": 1.1, "float.sumobserver.sum/A=B/R=V": 1000, "int.sumobserver.sum//R=V": 10, @@ -475,7 +479,7 @@ func TestObserverBatch(t *testing.T) { "float.valueobserver.lastvalue/C=D/R=V": -1, "int.valueobserver.lastvalue//R=V": 1, "int.valueobserver.lastvalue/A=B/R=V": 1, - }, out.Map()) + }) } func TestRecordBatch(t *testing.T) { @@ -501,16 +505,12 @@ func TestRecordBatch(t *testing.T) { sdk.Collect(ctx) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } - require.EqualValues(t, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "int64.sum/A=B,C=D/R=V": 1, "float64.sum/A=B,C=D/R=V": 2, "int64.exact/A=B,C=D/R=V": 3, "float64.exact/A=B,C=D/R=V": 4, - }, out.Map()) + }) } // TestRecordPersistence ensures that a direct-called instrument that @@ -583,12 +583,127 @@ func TestSyncInAsync(t *testing.T) { sdk.Collect(ctx) - out := processortest.NewOutput(label.DefaultEncoder()) - for _, rec := range processor.accumulations { - require.NoError(t, out.AddAccumulation(rec)) - } - require.EqualValues(t, map[string]float64{ + expectAndResetOutput(t, processor, map[string]float64{ "counter.sum//R=V": 100, "observer.lastvalue//R=V": 10, - }, out.Map()) + }) +} + +func TestEnricher(t *testing.T) { + enrich := func(context.Context, []label.KeyValue) ([]label.KeyValue, error) { + return nil, nil + } + + testHandler.Reset() + processor := &correctnessProcessor{ + t: t, + testSelector: &testSelector{selector: processortest.AggregatorSelector()}, + } + accum := metricsdk.NewAccumulator( + processor, + testResource, + func(ctx context.Context, kvs []label.KeyValue) ([]label.KeyValue, error) { + return enrich(ctx, kvs) + }, + ) + + meter := metric.WrapMeterImpl(accum, "test") + + bg := context.Background() + ctx := baggage.ContextWithValues( + bg, + label.String("Corr1", "Val1"), + label.String("Corr2", "Val2"), + ) + + counter := Must(meter).NewInt64Counter("name.sum") + recorder := Must(meter).NewFloat64ValueRecorder("name.lastvalue") + + counter.Add(ctx, 1) + recorder.Record(ctx, 10, label.String("E", "F")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum//R=V": 1, + "name.lastvalue/E=F/R=V": 10, + }) + + // This enriches with all baggage keys + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + baggage.ForEach(ctx, func(kv label.KeyValue) bool { + input = append(input, kv) + return true + }) + return input, nil + } + + counter.Add(ctx, 1) + recorder.Record(ctx, 10, label.String("E", "F")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Corr1=Val1,Corr2=Val2/R=V": 1, + "name.lastvalue/Corr1=Val1,Corr2=Val2,E=F/R=V": 10, + }) + + // This enriches by erasing all labels + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + return []label.KeyValue{}, nil + } + + counter.Add(ctx, 1, label.String("Y", "Z")) + recorder.Record(ctx, 10, label.String("E", "F")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum//R=V": 1, + "name.lastvalue//R=V": 10, + }) + + // This enriches by including the first input and all baggage labels. + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + var output []label.KeyValue + if len(input) > 0 { + output = append(output, input[0]) + } + baggage.ForEach(ctx, func(kv label.KeyValue) bool { + output = append(output, kv) + return true + }) + return output, nil + } + + counter.Add(ctx, 1, label.String("Y", "Z"), label.String("X", "Y")) + recorder.Record(ctx, 10, label.String("E", "F"), label.String("G", "H")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Corr1=Val1,Corr2=Val2,Y=Z/R=V": 1, + "name.lastvalue/Corr1=Val1,Corr2=Val2,E=F/R=V": 10, + }) + + // This enriches by APPENDING a duplicate label. + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + return append(input, label.String("Extra", "Baggage")), nil + } + + counter.Add(ctx, 1, label.String("Extra", "Call-site")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Extra=Baggage/R=V": 1, + }) + + // This enriches by APPENDING a duplicate label. + enrich = func(ctx context.Context, input []label.KeyValue) ([]label.KeyValue, error) { + return append([]label.KeyValue{label.String("Extra", "Baggage")}, input...), nil + } + + counter.Add(ctx, 1, label.String("Extra", "Call-site")) + + _ = accum.Collect(bg) + expectAndResetOutput(t, processor, map[string]float64{ + "name.sum/Extra=Call-site/R=V": 1, + }) + } diff --git a/sdk/metric/processor/basic/basic_test.go b/sdk/metric/processor/basic/basic_test.go index 016de9677bd..ec4f79b9c53 100644 --- a/sdk/metric/processor/basic/basic_test.go +++ b/sdk/metric/processor/basic/basic_test.go @@ -474,7 +474,7 @@ func TestSumObserverEndToEnd(t *testing.T) { processorTest.AggregatorSelector(), eselector, ) - accum := sdk.NewAccumulator(proc, resource.Empty()) + accum := sdk.NewAccumulator(proc, resource.Empty(), nil) meter := metric.WrapMeterImpl(accum, "testing") var calls int64 diff --git a/sdk/metric/processor/processortest/test_test.go b/sdk/metric/processor/processortest/test_test.go index c4541d2656a..dc19da9a561 100644 --- a/sdk/metric/processor/processortest/test_test.go +++ b/sdk/metric/processor/processortest/test_test.go @@ -33,6 +33,7 @@ func generateTestData(proc export.Processor) { accum := metricsdk.NewAccumulator( proc, resource.NewWithAttributes(label.String("R", "V")), + nil, ) meter := metric.WrapMeterImpl(accum, "testing") diff --git a/sdk/metric/processor/reducer/reducer_test.go b/sdk/metric/processor/reducer/reducer_test.go index 138f3825dde..5aaa2edc111 100644 --- a/sdk/metric/processor/reducer/reducer_test.go +++ b/sdk/metric/processor/reducer/reducer_test.go @@ -76,6 +76,7 @@ func TestFilterProcessor(t *testing.T) { accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, processorTest.Checkpointer(testProc)), resource.NewWithAttributes(label.String("R", "V")), + nil, ) generateData(accum) @@ -93,6 +94,7 @@ func TestFilterBasicProcessor(t *testing.T) { accum := metricsdk.NewAccumulator( reducer.New(testFilter{}, basicProc), resource.NewWithAttributes(label.String("R", "V")), + nil, ) exporter := processorTest.NewExporter(basicProc, label.DefaultEncoder()) diff --git a/sdk/metric/sdk.go b/sdk/metric/sdk.go index 64bfa9b1dd4..9bce81fd098 100644 --- a/sdk/metric/sdk.go +++ b/sdk/metric/sdk.go @@ -67,6 +67,9 @@ type ( // resource is applied to all records in this Accumulator. resource *resource.Resource + + // enricher (optional) extracts labels from baggage. + enricher export.Enricher } syncInstrument struct { @@ -123,7 +126,7 @@ type ( } instrument struct { - meter *Accumulator + meter *Accumulator // TODO rename 'accumulator' descriptor metric.Descriptor } @@ -287,9 +290,12 @@ func (s *syncInstrument) Bind(kvs []label.KeyValue) metric.BoundSyncImpl { } func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs []label.KeyValue) { + // Introduce labels from baggage. + kvs = s.meter.enrich(ctx, kvs) + h := s.acquireHandle(kvs, nil) defer h.Unbind() - h.RecordOne(ctx, num) + h.recordEvent(ctx, num) } // NewAccumulator constructs a new Accumulator for the given @@ -301,11 +307,12 @@ func (s *syncInstrument) RecordOne(ctx context.Context, num number.Number, kvs [ // processor will call Collect() when it receives a request to scrape // current metric values. A push-based processor should configure its // own periodic collection. -func NewAccumulator(processor export.Processor, resource *resource.Resource) *Accumulator { +func NewAccumulator(processor export.Processor, resource *resource.Resource, enricher export.Enricher) *Accumulator { return &Accumulator{ processor: processor, asyncInstruments: internal.NewAsyncInstrumentState(), resource: resource, + enricher: enricher, } } @@ -352,6 +359,20 @@ func (m *Accumulator) Collect(ctx context.Context) int { return checkpointed } +func (m *Accumulator) enrich(ctx context.Context, kvs []label.KeyValue) []label.KeyValue { + if m.enricher == nil { + return kvs + } + out, err := m.enricher(ctx, kvs) + + // Return the enricher result if it is non-nil and no error. + if err == nil && out != nil { + return out + } + otel.Handle(err) + return kvs +} + func (m *Accumulator) collectSyncInstruments() int { checkpointed := 0 @@ -474,6 +495,9 @@ func (m *Accumulator) checkpointAsync(a *asyncInstrument) int { // RecordBatch enters a batch of metric events. func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, measurements ...metric.Measurement) { + // Introduce labels from baggage. + kvs = m.enrich(ctx, kvs) + // Labels will be computed the first time acquireHandle is // called. Subsequent calls to acquireHandle will re-use the // previously computed value instead of recomputing the @@ -492,12 +516,27 @@ func (m *Accumulator) RecordBatch(ctx context.Context, kvs []label.KeyValue, mea } defer h.Unbind() - h.RecordOne(ctx, meas.Number()) + h.recordEvent(ctx, meas.Number()) } } // RecordOne implements metric.SyncImpl. func (r *record) RecordOne(ctx context.Context, num number.Number) { + if r.inst.meter.enricher == nil { + r.recordEvent(ctx, num) + return + } + + // Note: When there is an enricher, the bound instrument loses + // performance when labels are introduced. The ToSlice() below + // could be stored in the instrument when enricher != nil, to + // avoid this cost. + + // Call the unbound instrument path when the enricher is in use. + r.inst.RecordOne(ctx, num, r.labels.ToSlice()) +} + +func (r *record) recordEvent(ctx context.Context, num number.Number) { if r.current == nil { // The instrument is disabled according to the AggregatorSelector. return @@ -506,6 +545,7 @@ func (r *record) RecordOne(ctx context.Context, num number.Number) { otel.Handle(err) return } + if err := r.current.Update(ctx, num, &r.inst.descriptor); err != nil { otel.Handle(err) return diff --git a/sdk/metric/stress_test.go b/sdk/metric/stress_test.go index 4878435bde3..e27e6d89403 100644 --- a/sdk/metric/stress_test.go +++ b/sdk/metric/stress_test.go @@ -294,7 +294,7 @@ func stressTest(t *testing.T, impl testImpl) { } cc := concurrency() - sdk := NewAccumulator(fixture, nil) + sdk := NewAccumulator(fixture, nil, nil) meter := metric.WrapMeterImpl(sdk, "stress_test") fixture.wg.Add(cc + 1)