Skip to content

Commit

Permalink
Merge branch 'main' into view-deprecate
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Nov 8, 2022
2 parents f5a9033 + c21b6b6 commit a6b8e10
Show file tree
Hide file tree
Showing 3 changed files with 362 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Cumulative metrics from the OpenCensus bridge (`go.opentelemetry.io/otel/bridge/opencensus`) are defined as monotonic sums, instead of non-monotonic. (#3389)
- Asynchronous counters (`Counter` and `UpDownCounter`) from the metric SDK now produce delta sums when configured with delta temporality. (#3398)
- Exported `Status` codes in the `go.opentelemetry.io/otel/exporters/zipkin` exporter are now exported as all upper case values. (#3340)
- Reenabled Attribute Filters in the Metric SDK. (#3396)
- Do not report empty partial-success responses in the `go.opentelemetry.io/otel/exporters/otlp` exporters. (#3438, #3432)

## [1.11.1/0.33.0] 2022-10-19
Expand Down
353 changes: 353 additions & 0 deletions sdk/metric/meter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -474,3 +474,356 @@ func TestMetersProvideScope(t *testing.T) {
assert.NoError(t, err)
metricdatatest.AssertEqual(t, want, got, metricdatatest.IgnoreTimestamp())
}

func TestAttributeFilter(t *testing.T) {
one := 1.0
two := 2.0
testcases := []struct {
name string
register func(t *testing.T, mtr metric.Meter) error
wantMetric metricdata.Metrics
}{
{
name: "AsyncFloat64Counter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.AsyncFloat64().Counter("afcounter")
if err != nil {
return err
}
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
},
wantMetric: metricdata.Metrics{
Name: "afcounter",
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
{
name: "AsyncFloat64UpDownCounter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.AsyncFloat64().UpDownCounter("afupdowncounter")
if err != nil {
return err
}
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
},
wantMetric: metricdata.Metrics{
Name: "afupdowncounter",
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0, // TODO (#3439): This should be 3.0.
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
},
},
},
{
name: "AsyncFloat64Gauge",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.AsyncFloat64().Gauge("afgauge")
if err != nil {
return err
}
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
},
wantMetric: metricdata.Metrics{
Name: "afgauge",
Data: metricdata.Gauge[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 2.0,
},
},
},
},
},
{
name: "AsyncInt64Counter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.AsyncInt64().Counter("aicounter")
if err != nil {
return err
}
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
},
wantMetric: metricdata.Metrics{
Name: "aicounter",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
{
name: "AsyncInt64UpDownCounter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.AsyncInt64().UpDownCounter("aiupdowncounter")
if err != nil {
return err
}
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
},
wantMetric: metricdata.Metrics{
Name: "aiupdowncounter",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20, // TODO (#3439): This should be 30.
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
},
},
},
{
name: "AsyncInt64Gauge",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.AsyncInt64().Gauge("aigauge")
if err != nil {
return err
}
return mtr.RegisterCallback([]instrument.Asynchronous{ctr}, func(ctx context.Context) {
ctr.Observe(ctx, 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Observe(ctx, 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
})
},
wantMetric: metricdata.Metrics{
Name: "aigauge",
Data: metricdata.Gauge[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 20,
},
},
},
},
},
{
name: "SyncFloat64Counter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.SyncFloat64().Counter("sfcounter")
if err != nil {
return err
}

ctr.Add(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Add(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
},
wantMetric: metricdata.Metrics{
Name: "sfcounter",
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
{
name: "SyncFloat64UpDownCounter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.SyncFloat64().UpDownCounter("sfupdowncounter")
if err != nil {
return err
}

ctr.Add(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Add(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
},
wantMetric: metricdata.Metrics{
Name: "sfupdowncounter",
Data: metricdata.Sum[float64]{
DataPoints: []metricdata.DataPoint[float64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
},
},
},
{
name: "SyncFloat64Histogram",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.SyncFloat64().Histogram("sfhistogram")
if err != nil {
return err
}

ctr.Record(context.Background(), 1.0, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Record(context.Background(), 2.0, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
},
wantMetric: metricdata.Metrics{
Name: "sfhistogram",
Data: metricdata.Histogram{
DataPoints: []metricdata.HistogramDataPoint{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Count: 2,
Min: &one,
Max: &two,
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
},
{
name: "SyncInt64Counter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.SyncInt64().Counter("sicounter")
if err != nil {
return err
}

ctr.Add(context.Background(), 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Add(context.Background(), 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
},
wantMetric: metricdata.Metrics{
Name: "sicounter",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: true,
},
},
},
{
name: "SyncInt64UpDownCounter",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.SyncInt64().UpDownCounter("siupdowncounter")
if err != nil {
return err
}

ctr.Add(context.Background(), 10, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Add(context.Background(), 20, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
},
wantMetric: metricdata.Metrics{
Name: "siupdowncounter",
Data: metricdata.Sum[int64]{
DataPoints: []metricdata.DataPoint[int64]{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Value: 30,
},
},
Temporality: metricdata.CumulativeTemporality,
IsMonotonic: false,
},
},
},
{
name: "SyncInt64Histogram",
register: func(t *testing.T, mtr metric.Meter) error {
ctr, err := mtr.SyncInt64().Histogram("sihistogram")
if err != nil {
return err
}

ctr.Record(context.Background(), 1, attribute.String("foo", "bar"), attribute.Int("version", 1))
ctr.Record(context.Background(), 2, attribute.String("foo", "bar"), attribute.Int("version", 2))
return nil
},
wantMetric: metricdata.Metrics{
Name: "sihistogram",
Data: metricdata.Histogram{
DataPoints: []metricdata.HistogramDataPoint{
{
Attributes: attribute.NewSet(attribute.String("foo", "bar")),
Bounds: []float64{0, 5, 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2500, 5000, 7500, 10000},
BucketCounts: []uint64{0, 2, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Count: 2,
Min: &one,
Max: &two,
Sum: 3.0,
},
},
Temporality: metricdata.CumulativeTemporality,
},
},
},
}

for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
rdr := NewManualReader()
mtr := NewMeterProvider(
WithReader(rdr),
WithView(NewView(
Instrument{Name: "*"},
Stream{AttributeFilter: func(kv attribute.KeyValue) bool {
return kv.Key == "foo"
}},
)),
).Meter("TestAttributeFilter")

err := tt.register(t, mtr)
require.NoError(t, err)

m, err := rdr.Collect(context.Background())
assert.NoError(t, err)

require.Len(t, m.ScopeMetrics, 1)
require.Len(t, m.ScopeMetrics[0].Metrics, 1)

metricdatatest.AssertEqual(t, tt.wantMetric, m.ScopeMetrics[0].Metrics[0], metricdatatest.IgnoreTimestamp())
})
}
}
8 changes: 8 additions & 0 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"strings"
"sync"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/internal/global"
"go.opentelemetry.io/otel/metric/unit"
"go.opentelemetry.io/otel/sdk/instrumentation"
Expand Down Expand Up @@ -272,6 +273,13 @@ func (i *inserter[N]) cachedAggregator(inst Stream) (internal.Aggregator[N], err
if agg == nil { // Drop aggregator.
return nil, nil
}
if inst.AttributeFilter != nil {
agg = internal.NewFilter(agg, func(s attribute.Set) attribute.Set {
s, _ = s.Filter(inst.AttributeFilter)
return s
})
}

i.pipeline.addSync(inst.Scope, instrumentSync{
name: inst.Name,
description: inst.Description,
Expand Down

0 comments on commit a6b8e10

Please sign in to comment.