Skip to content

Commit

Permalink
Merge branch 'main' into inst-name-conflicts
Browse files Browse the repository at this point in the history
  • Loading branch information
MrAlias committed Jul 19, 2023
2 parents 3fd07ec + 84b2e54 commit 91369f3
Show file tree
Hide file tree
Showing 7 changed files with 65 additions and 49 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- Correctly format log messages from the `go.opentelemetry.io/otel/exporters/zipkin` exporter. (#4143)
- Log an error for calls to `NewView` in `go.opentelemetry.io/otel/sdk/metric` that have empty criteria. (#4307)
- Fix `resource.WithHostID()` to not set an empty `host.id`. (#4317)
- Use the instrument identifying fields to cache aggregators and determine duplicate instrument registrations in `go.opentelemetry.io/otel/sdk/metric`. (#4337)
- Detect duplicate instruments for case-insensitive names in `go.opentelemetry.io/otel/sdk/metric`. (#4338)

## [1.16.0/0.39.0] 2023-05-18
Expand Down
18 changes: 6 additions & 12 deletions sdk/metric/instrument.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:generate stringer -type=InstrumentKind -trimprefix=InstrumentKind

package metric // import "go.opentelemetry.io/otel/sdk/metric"

import (
Expand All @@ -25,7 +27,6 @@ import (
"go.opentelemetry.io/otel/sdk/instrumentation"
"go.opentelemetry.io/otel/sdk/metric/aggregation"
"go.opentelemetry.io/otel/sdk/metric/internal/aggregate"
"go.opentelemetry.io/otel/sdk/metric/metricdata"
)

var (
Expand Down Expand Up @@ -172,23 +173,16 @@ func (s Stream) attributeFilter() attribute.Filter {
}
}

// streamID are the identifying properties of a stream.
type streamID struct {
// instID are the identifying properties of a instrument.
type instID struct {
// Name is the name of the stream.
Name string
// Description is the description of the stream.
Description string
// Kind defines the functional group of the instrument.
Kind InstrumentKind
// Unit is the unit of the stream.
Unit string
// Aggregation is the aggregation data type of the stream.
Aggregation string
// Monotonic is the monotonicity of an instruments data type. This field is
// not used for all data types, so a zero value needs to be understood in the
// context of Aggregation.
Monotonic bool
// Temporality is the temporality of a stream's data type. This field is
// not used by some data types.
Temporality metricdata.Temporality
// Number is the number type of the stream.
Number string
}
Expand Down
29 changes: 29 additions & 0 deletions sdk/metric/instrumentkind_string.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion sdk/metric/meter.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type meter struct {
func newMeter(s instrumentation.Scope, p pipelines) *meter {
// viewCache ensures instrument conflicts, including number conflicts, this
// meter is asked to create are logged to the user.
var viewCache cache[string, streamID]
var viewCache cache[string, instID]

return &meter{
scope: s,
Expand Down
40 changes: 16 additions & 24 deletions sdk/metric/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,24 +187,24 @@ type inserter[N int64 | float64] struct {
// cache ensures no duplicate aggregate functions are inserted into the
// reader pipeline and if a new request during an instrument creation asks
// for the same aggregate function input the same instance is returned.
aggregators *cache[streamID, aggVal[N]]
aggregators *cache[instID, aggVal[N]]

// views is a cache that holds instrument identifiers for all the
// instruments a Meter has created, it is provided from the Meter that owns
// this inserter. This cache ensures during the creation of instruments
// with the same name but different options (e.g. description, unit) a
// warning message is logged.
views *cache[string, streamID]
views *cache[string, instID]

pipeline *pipeline
}

func newInserter[N int64 | float64](p *pipeline, vc *cache[string, streamID]) *inserter[N] {
func newInserter[N int64 | float64](p *pipeline, vc *cache[string, instID]) *inserter[N] {
if vc == nil {
vc = &cache[string, streamID]{}
vc = &cache[string, instID]{}
}
return &inserter[N]{
aggregators: &cache[streamID, aggVal[N]]{},
aggregators: &cache[instID, aggVal[N]]{},
views: vc,
pipeline: p,
}
Expand Down Expand Up @@ -320,12 +320,14 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum
)
}

id := i.streamID(kind, stream)
id := i.instID(kind, stream)
// If there is a conflict, the specification says the view should
// still be applied and a warning should be logged.
i.logConflict(id)
cv := i.aggregators.Lookup(id, func() aggVal[N] {
b := aggregate.Builder[N]{Temporality: id.Temporality}
b := aggregate.Builder[N]{
Temporality: i.pipeline.reader.temporality(kind),
}
if len(stream.AllowAttributeKeys) > 0 {
b.Filter = stream.attributeFilter()
}
Expand All @@ -350,11 +352,11 @@ func (i *inserter[N]) cachedAggregator(scope instrumentation.Scope, kind Instrum

// logConflict validates if an instrument with the same name as id has already
// been created. If that instrument conflicts with id, a warning is logged.
func (i *inserter[N]) logConflict(id streamID) {
func (i *inserter[N]) logConflict(id instID) {
// The API specification defines names as case-insensitive. If there is a
// different casing of a name it needs to be a conflict.
name := strings.ToLower(id.Name)
existing := i.views.Lookup(name, func() streamID { return id })
existing := i.views.Lookup(name, func() instID { return id })
if id == existing {
return
}
Expand All @@ -363,31 +365,21 @@ func (i *inserter[N]) logConflict(id streamID) {
"duplicate metric stream definitions",
"names", fmt.Sprintf("%q, %q", existing.Name, id.Name),
"descriptions", fmt.Sprintf("%q, %q", existing.Description, id.Description),
"kinds", fmt.Sprintf("%s, %s", existing.Kind, id.Kind),
"units", fmt.Sprintf("%s, %s", existing.Unit, id.Unit),
"numbers", fmt.Sprintf("%s, %s", existing.Number, id.Number),
"aggregations", fmt.Sprintf("%s, %s", existing.Aggregation, id.Aggregation),
"monotonics", fmt.Sprintf("%t, %t", existing.Monotonic, id.Monotonic),
"temporalities", fmt.Sprintf("%s, %s", existing.Temporality.String(), id.Temporality.String()),
)
}

func (i *inserter[N]) streamID(kind InstrumentKind, stream Stream) streamID {
func (i *inserter[N]) instID(kind InstrumentKind, stream Stream) instID {
var zero N
id := streamID{
return instID{
Name: stream.Name,
Description: stream.Description,
Unit: stream.Unit,
Aggregation: fmt.Sprintf("%T", stream.Aggregation),
Temporality: i.pipeline.reader.temporality(kind),
Kind: kind,
Number: fmt.Sprintf("%T", zero),
}

switch kind {
case InstrumentKindObservableCounter, InstrumentKindCounter, InstrumentKindHistogram:
id.Monotonic = true
}

return id
}

// aggregateFunc returns new aggregate functions matching agg, kind, and
Expand Down Expand Up @@ -529,7 +521,7 @@ type resolver[N int64 | float64] struct {
inserters []*inserter[N]
}

func newResolver[N int64 | float64](p pipelines, vc *cache[string, streamID]) resolver[N] {
func newResolver[N int64 | float64](p pipelines, vc *cache[string, instID]) resolver[N] {
in := make([]*inserter[N], len(p))
for i := range in {
in[i] = newInserter[N](p[i], vc)
Expand Down
14 changes: 7 additions & 7 deletions sdk/metric/pipeline_registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ func testCreateAggregators[N int64 | float64](t *testing.T) {
}
for _, tt := range testcases {
t.Run(tt.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
p := newPipeline(nil, tt.reader, tt.views)
i := newInserter[N](p, &c)
input, err := i.Instrument(tt.inst)
Expand All @@ -371,7 +371,7 @@ func TestCreateAggregators(t *testing.T) {
}

func testInvalidInstrumentShouldPanic[N int64 | float64]() {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](newPipeline(nil, NewManualReader(), []View{defaultView}), &c)
inst := Instrument{
Name: "foo",
Expand All @@ -391,7 +391,7 @@ func TestPipelinesAggregatorForEachReader(t *testing.T) {
require.Len(t, pipes, 2, "created pipelines")

inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](pipes, &c)
aggs, err := r.Aggregators(inst)
require.NoError(t, err, "resolved Aggregators error")
Expand Down Expand Up @@ -468,7 +468,7 @@ func TestPipelineRegistryCreateAggregators(t *testing.T) {

func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[int64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
Expand All @@ -478,7 +478,7 @@ func testPipelineRegistryResolveIntAggregators(t *testing.T, p pipelines, wantCo

func testPipelineRegistryResolveFloatAggregators(t *testing.T, p pipelines, wantCount int) {
inst := Instrument{Name: "foo", Kind: InstrumentKindCounter}
var c cache[string, streamID]
var c cache[string, instID]
r := newResolver[float64](p, &c)
aggs, err := r.Aggregators(inst)
assert.NoError(t, err)
Expand All @@ -505,7 +505,7 @@ func TestPipelineRegistryCreateAggregatorsIncompatibleInstrument(t *testing.T) {
p := newPipelines(resource.Empty(), readers, views)
inst := Instrument{Name: "foo", Kind: InstrumentKindObservableGauge}

var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(inst)
assert.Error(t, err)
Expand Down Expand Up @@ -556,7 +556,7 @@ func TestResolveAggregatorsDuplicateErrors(t *testing.T) {

p := newPipelines(resource.Empty(), readers, views)

var vc cache[string, streamID]
var vc cache[string, instID]
ri := newResolver[int64](p, &vc)
intAggs, err := ri.Aggregators(fooInst)
assert.NoError(t, err)
Expand Down
10 changes: 5 additions & 5 deletions sdk/metric/pipeline_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ func testDefaultViewImplicit[N int64 | float64]() func(t *testing.T) {

for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
var c cache[string, streamID]
var c cache[string, instID]
i := newInserter[N](test.pipe, &c)
got, err := i.Instrument(inst)
require.NoError(t, err)
Expand Down Expand Up @@ -215,15 +215,15 @@ func TestLogConflictName(t *testing.T) {
}(stdr.New(log.New(os.Stderr, "", log.LstdFlags|log.Lshortfile))))

for _, tc := range testcases {
var vc cache[string, streamID]
var vc cache[string, instID]

name := strings.ToLower(tc.existing)
_ = vc.Lookup(name, func() streamID {
return streamID{Name: tc.existing}
_ = vc.Lookup(name, func() instID {
return instID{Name: tc.existing}
})

i := newInserter[int64](newPipeline(nil, nil, nil), &vc)
i.logConflict(streamID{Name: tc.name})
i.logConflict(instID{Name: tc.name})

if tc.conflict {
assert.Containsf(
Expand Down

0 comments on commit 91369f3

Please sign in to comment.