From b4611c8a6dca510dc1e87f5c793d31535550c812 Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Thu, 17 Oct 2024 17:42:02 -0400 Subject: [PATCH] Support multiple entity refs in the Resource --- attribute/set.go | 9 + .../internal/tracetransform/resource.go | 18 +- go.sum | 1 + sdk/resource/builtin.go | 11 +- sdk/resource/config.go | 34 +- sdk/resource/entity.go | 10 + sdk/resource/entityref.go | 29 ++ sdk/resource/internal/entity.go | 62 --- sdk/resource/resource.go | 397 ++++++++++++++---- 9 files changed, 404 insertions(+), 167 deletions(-) create mode 100644 sdk/resource/entity.go create mode 100644 sdk/resource/entityref.go delete mode 100644 sdk/resource/internal/entity.go diff --git a/attribute/set.go b/attribute/set.go index 7cb3e720dde..5e181e15134 100644 --- a/attribute/set.go +++ b/attribute/set.go @@ -307,6 +307,15 @@ func filteredToFront(slice []KeyValue, keep Filter) int { return j } +func (l *Set) Clone() Set { + s, _ := l.Filter( + func(KeyValue) bool { + return true + }, + ) + return s +} + // Filter returns a filtered copy of this Set. See the documentation for // NewSetWithSortableFiltered for more details. func (l *Set) Filter(re Filter) (Set, []KeyValue) { diff --git a/exporters/otlp/otlptrace/internal/tracetransform/resource.go b/exporters/otlp/otlptrace/internal/tracetransform/resource.go index 21c15de7174..343df01a83d 100644 --- a/exporters/otlp/otlptrace/internal/tracetransform/resource.go +++ b/exporters/otlp/otlptrace/internal/tracetransform/resource.go @@ -27,11 +27,21 @@ func Resource(r *resource.Resource) *resourcepb.Resource { } attrs := Iterator(r.Iter()) - entityId := Iterator(r.EntityId().Iter()) - return &resourcepb.Resource{ + out := &resourcepb.Resource{ Attributes: attrs, - EntityType: r.EntityType(), - EntityId: entityId, } + + for _, entity := range r.EntityRefs() { + out.Entities = append( + out.Entities, &resourcepb.ResourceEntityRef{ + SchemaUrl: entity.SchemaUrl, + Type: entity.Type, + IdAttrKeys: entity.IdKeys, + DescrAttrKeys: entity.AttrsKeys, + }, + ) + } + + return out } diff --git a/go.sum b/go.sum index 10051f76c68..96fd3ecbc2e 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,7 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/sdk/resource/builtin.go b/sdk/resource/builtin.go index 94f93abcb43..9eb5e5f9d1d 100644 --- a/sdk/resource/builtin.go +++ b/sdk/resource/builtin.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk" - "go.opentelemetry.io/otel/sdk/resource/internal" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) @@ -100,12 +99,12 @@ func (sd stringDetector) Detect(ctx context.Context) (*Resource, error) { return nil, fmt.Errorf("invalid attribute: %q -> %q", a.Key, a.Value.Emit()) } id := attribute.NewSet(sd.K.String(value)) - entity := internal.EntityData{ - Type: sd.entityType, - Id: id, - Attrs: id, + entity := Entity{ + Type: sd.entityType, + Id: id, + SchemaURL: sd.schemaURL, } - return NewWithEntity(sd.schemaURL, &entity), nil + return NewWithEntities([]Entity{entity}) } // Detect implements Detector. diff --git a/sdk/resource/config.go b/sdk/resource/config.go index 613809f42d2..92ea072450c 100644 --- a/sdk/resource/config.go +++ b/sdk/resource/config.go @@ -27,8 +27,8 @@ type config struct { // SchemaURL to associate with the Resource. schemaURL string - entityType string - entityId []attribute.KeyValue + //entityType string + //entityId []attribute.KeyValue } // Option is the interface that applies a configuration option. @@ -96,21 +96,21 @@ func (o schemaURLOption) apply(cfg config) config { return cfg } -// WithEntity sets the schema URL for the configured resource. -func WithEntity(entityType string, entityId ...attribute.KeyValue) Option { - return entityOption{entityType, entityId} -} - -type entityOption struct { - entityType string - entityId []attribute.KeyValue -} - -func (o entityOption) apply(cfg config) config { - cfg.entityType = o.entityType - cfg.entityId = o.entityId - return cfg -} +//// WithEntity sets the schema URL for the configured resource. +//func WithEntity(entityType string, entityId ...attribute.KeyValue) Option { +// return entityOption{entityType, entityId} +//} +// +//type entityOption struct { +// entityType string +// entityId []attribute.KeyValue +//} +// +//func (o entityOption) apply(cfg config) config { +// cfg.entityType = o.entityType +// cfg.entityId = o.entityId +// return cfg +//} // WithOS adds all the OS attributes to the configured Resource. // See individual WithOS* functions to configure specific attributes. diff --git a/sdk/resource/entity.go b/sdk/resource/entity.go new file mode 100644 index 00000000000..bd84d4e8f02 --- /dev/null +++ b/sdk/resource/entity.go @@ -0,0 +1,10 @@ +package resource + +import "go.opentelemetry.io/otel/attribute" + +type Entity struct { + Type string + Id attribute.Set + Attrs attribute.Set + SchemaURL string +} diff --git a/sdk/resource/entityref.go b/sdk/resource/entityref.go new file mode 100644 index 00000000000..624d123bad4 --- /dev/null +++ b/sdk/resource/entityref.go @@ -0,0 +1,29 @@ +package resource + +import "go.opentelemetry.io/otel/attribute" + +type resourceEntityRef struct { + SchemaUrl string + + // Defines the entity type, e.g "service", "k8s.pod", etc. + Type string + + // Set of Resource attribute keys that identify the entity. + Id map[attribute.Key]bool + + // Set of Resource attribute keys that describe the entity. + Attrs map[attribute.Key]bool + + // Id and Attrs materialized as slices for faster exporting. + IdKeys []string + AttrsKeys []string +} + +func (r *resourceEntityRef) materialize() { + for k := range r.Id { + r.IdKeys = append(r.IdKeys, string(k)) + } + for k := range r.Attrs { + r.AttrsKeys = append(r.AttrsKeys, string(k)) + } +} diff --git a/sdk/resource/internal/entity.go b/sdk/resource/internal/entity.go deleted file mode 100644 index ec8d29041b5..00000000000 --- a/sdk/resource/internal/entity.go +++ /dev/null @@ -1,62 +0,0 @@ -package internal - -import "go.opentelemetry.io/otel/attribute" - -type EntityData struct { - // Defines the producing entity type of this resource, e.g "service", "k8s.pod", etc. - // Empty for legacy Resources that are not entity-aware. - Type string - // Set of attributes that identify the entity. - // Note that a copy of identifying attributes will be also recorded in the Attrs field. - Id attribute.Set - - Attrs attribute.Set -} - -func MergeEntities(a, b *EntityData) *EntityData { - // Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key() - // Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...) - mergedAttrs := mergeAttrs(&b.Attrs, &a.Attrs) - - var mergedType string - var mergedId attribute.Set - - if a.Type == b.Type { - mergedType = a.Type - mergedId = mergeAttrs(&b.Id, &a.Id) - } else { - if a.Type == "" { - mergedType = b.Type - mergedId = b.Id - } else if b.Type == "" { - mergedType = a.Type - mergedId = a.Id - } else { - // Different non-empty entities. - mergedId = a.Id - // TODO: merge the id of the updating Entity into the non-identifying - // attributes of the old Resource, attributes from the updating Entity - // take precedence. - panic("not implemented") - } - } - - return &EntityData{ - Type: mergedType, - Id: mergedId, - Attrs: mergedAttrs, - } -} - -func mergeAttrs(a, b *attribute.Set) attribute.Set { - if a.Len()+b.Len() == 0 { - return *attribute.EmptySet() - } - - mi := attribute.NewMergeIterator(a, b) - combine := make([]attribute.KeyValue, 0, a.Len()+b.Len()) - for mi.Next() { - combine = append(combine, mi.Attribute()) - } - return attribute.NewSet(combine...) -} diff --git a/sdk/resource/resource.go b/sdk/resource/resource.go index c3e3e8ac209..b7fd2f94f6d 100644 --- a/sdk/resource/resource.go +++ b/sdk/resource/resource.go @@ -18,11 +18,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/resource/internal" ) // Resource describes an entity about which identifying information @@ -33,10 +33,10 @@ import ( // (`*resource.Resource`). The `nil` value is equivalent to an empty // Resource. type Resource struct { - schemaURL string - - // Producing entity. - entity internal.EntityData + attrs map[attribute.Key]attribute.Value + attrSet attribute.Set + schemaURL string + entityRefs []resourceEntityRef } var ( @@ -53,16 +53,7 @@ func New(ctx context.Context, opts ...Option) (*Resource, error) { cfg = opt.apply(cfg) } - entityId, _ := attribute.NewSetWithFiltered( - cfg.entityId, func(kv attribute.KeyValue) bool { - return kv.Valid() - }, - ) - - r := &Resource{ - schemaURL: cfg.schemaURL, - entity: internal.EntityData{Type: cfg.entityType, Id: entityId}, - } + r := &Resource{schemaURL: cfg.schemaURL} return r, detect(ctx, r, cfg.detectors) } @@ -76,28 +67,57 @@ func NewWithAttributes(schemaURL string, attrs ...attribute.KeyValue) *Resource return resource } -// NewWithEntity creates a resource from entity and attrs and associates the resource with a +// NewWithEntities creates a resource from entity and attrs and associates the resource with a // schema URL. If attrs or entityId contains duplicate keys, the last value will be used. If attrs or entityId // contains any invalid items those items will be dropped. The attrs and entityId are assumed to be // in a schema identified by schemaURL. -func NewWithEntity( - schemaURL string, entity *internal.EntityData, -) *Resource { - resource := NewSchemaless(entity.Attrs.ToSlice()...) - resource.schemaURL = schemaURL - resource.entity = *entity - //resource.entity.Type = entityType - - // Ensure attributes comply with the specification: - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute - //id, _ := attribute.NewSetWithFiltered( - // entityId, func(kv attribute.KeyValue) bool { - // return kv.Valid() - // }, - //) - // - //resource.entity.Id = id - return resource +func NewWithEntities( + entities []Entity, +) (*Resource, error) { + resource := &Resource{} + + for _, entity := range entities { + entityRef := resourceEntityRef{ + Type: entity.Type, + Id: map[attribute.Key]bool{}, + Attrs: map[attribute.Key]bool{}, + SchemaUrl: entity.SchemaURL, + } + b := &Resource{ + schemaURL: entity.SchemaURL, + attrs: map[attribute.Key]attribute.Value{}, + entityRefs: []resourceEntityRef{entityRef}, + } + ids := entity.Id.Iter() + for ids.Next() { + attr := ids.Attribute() + if !attr.Valid() { + continue + } + entityRef.Id[attr.Key] = true + b.attrs[attr.Key] = attr.Value + } + attrs := entity.Attrs.Iter() + for attrs.Next() { + attr := attrs.Attribute() + if !attr.Valid() { + continue + } + if _, exists := b.attrs[attr.Key]; exists { + return nil, fmt.Errorf("invalid Entity, key %q is both an Id and Attr", attr.Key) + } + entityRef.Attrs[attr.Key] = true + b.attrs[attr.Key] = attr.Value + } + + var err error + resource, err = Merge(resource, b) + if err != nil { + return nil, err + } + } + + return resource, nil } // NewSchemaless creates a resource from attrs. If attrs contains duplicate keys, @@ -109,20 +129,40 @@ func NewSchemaless(attrs ...attribute.KeyValue) *Resource { return &Resource{} } - // Ensure attributes comply with the specification: - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute - s, _ := attribute.NewSetWithFiltered( - attrs, func(kv attribute.KeyValue) bool { - return kv.Valid() - }, - ) + m := map[attribute.Key]attribute.Value{} + for _, attr := range attrs { + // Ensure attributes comply with the specification: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute + if attr.Valid() { + m[attr.Key] = attr.Value + } + } // If attrs only contains invalid entries do not allocate a new resource. - if s.Len() == 0 { + if len(m) == 0 { return &Resource{} } - return &Resource{entity: internal.EntityData{Id: attribute.NewSet(), Attrs: s}} //nolint + r := &Resource{attrs: m} + r.materializeAttrs() + return r +} + +func mapAttrsToSlice(m map[attribute.Key]attribute.Value) []attribute.KeyValue { + var kv []attribute.KeyValue + for k, v := range m { + kv = append( + kv, attribute.KeyValue{ + Key: k, + Value: v, + }, + ) + } + return kv +} + +func mapAttrsToSet(m map[attribute.Key]attribute.Value) attribute.Set { + return attribute.NewSet(mapAttrsToSlice(m)...) } // String implements the Stringer interface and provides a @@ -134,7 +174,8 @@ func (r *Resource) String() string { if r == nil { return "" } - return r.entity.Attrs.Encoded(attribute.DefaultEncoder()) + s := mapAttrsToSet(r.attrs) + return s.Encoded(attribute.DefaultEncoder()) } // MarshalLog is the marshaling function used by the logging system to represent this Resource. @@ -143,7 +184,7 @@ func (r *Resource) MarshalLog() interface{} { Attributes attribute.Set SchemaURL string }{ - Attributes: r.entity.Attrs, + Attributes: mapAttrsToSet(r.attrs), SchemaURL: r.schemaURL, } } @@ -154,7 +195,7 @@ func (r *Resource) Attributes() []attribute.KeyValue { if r == nil { r = Empty() } - return r.entity.Attrs.ToSlice() + return mapAttrsToSlice(r.attrs) } // SchemaURL returns the schema URL associated with Resource r. @@ -165,27 +206,13 @@ func (r *Resource) SchemaURL() string { return r.schemaURL } -func (r *Resource) EntityId() *attribute.Set { - if r == nil { - return attribute.EmptySet() - } - return &r.entity.Id -} - -func (r *Resource) EntityType() string { - if r == nil { - return "" - } - return r.entity.Type -} - // Iter returns an iterator of the Resource attributes. // This is ideal to use if you do not want a copy of the attributes. func (r *Resource) Iter() attribute.Iterator { if r == nil { r = Empty() } - return r.entity.Attrs.Iter() + return r.attrSet.Iter() } // Equal returns true when a Resource is equivalent to this Resource. @@ -233,12 +260,101 @@ func Merge(a, b *Resource) (*Resource, error) { return Empty(), errMergeConflictSchemaURL } - mergedEntity := internal.MergeEntities(&a.entity, &b.entity) - merged := NewWithEntity(schemaURL, mergedEntity) + merged := &Resource{ + attrs: cloneAttrs(a.attrs), + schemaURL: schemaURL, + entityRefs: make([]resourceEntityRef, len(a.entityRefs)), + } + for k, v := range b.attrs { + merged.attrs[k] = v + } + + copy(merged.entityRefs, a.entityRefs) + + entityTypes := map[string]resourceEntityRef{} + for _, er := range a.entityRefs { + entityTypes[er.Type] = er + } + + for _, er := range b.entityRefs { + if existingEr, exists := entityTypes[er.Type]; !exists { + merged.entityRefs = append(merged.entityRefs, er) + entityTypes[er.Type] = er + + for k := range er.Id { + merged.attrs[k] = b.attrs[k] + } + for k := range er.Attrs { + merged.attrs[k] = b.attrs[k] + } + + } else { + err := mergeEntity(merged, existingEr, b, er) + if err != nil { + return nil, err + } + } + } + + merged.materializeAttrs() return merged, nil } +func cloneAttrs(attrs map[attribute.Key]attribute.Value) map[attribute.Key]attribute.Value { + m := map[attribute.Key]attribute.Value{} + for k, v := range attrs { + m[k] = v + } + return m +} + +func mergeEntity( + intoRes *Resource, intoEnt resourceEntityRef, fromRes *Resource, fromEnt resourceEntityRef, +) error { + intoId, err := intoRes.getEntityId(intoEnt) + if err != nil { + return err + } + + fromId, err := fromRes.getEntityId(fromEnt) + if err != nil { + return err + } + + if intoId == fromId { + // Id the same. + if intoEnt.SchemaUrl == fromEnt.SchemaUrl { + // SchemaURL is the same too. + // Merge descriptive attributes. + attrs, err := fromRes.getEntityDescr(fromEnt) + if err != nil { + return err + } + + err = intoRes.overwriteEntityDescr(intoEnt, attrs) + if err != nil { + return err + } + } else { + // SchemaURL is different. + // Overwrite entity + err = intoRes.overwriteEntity(intoEnt, fromRes, fromEnt) + if err != nil { + return err + } + } + } else { + // Id is different + // Overwrite entity + err = intoRes.overwriteEntity(intoEnt, fromRes, fromEnt) + if err != nil { + return err + } + } + return nil +} + // Empty returns an instance of Resource with no attributes. It is // equivalent to a `nil` Resource. func Empty() *Resource { @@ -292,7 +408,7 @@ func (r *Resource) Set() *attribute.Set { if r == nil { r = Empty() } - return &r.entity.Attrs + return &r.attrSet } // MarshalJSON encodes the resource attributes as a JSON list of { "Key": @@ -302,23 +418,30 @@ func (r *Resource) MarshalJSON() ([]byte, error) { r = Empty() } + type entityRef struct { + Type string + Id any + Attrs any + SchemaURL string + } + rjson := struct { Attributes any SchemaURL string - Entity struct { - Type string - Id any - } + EntityRefs []entityRef }{ - Attributes: r.entity.Attrs.MarshalableToJSON(), + Attributes: r.attrSet.MarshalableToJSON(), SchemaURL: r.schemaURL, - Entity: struct { - Type string - Id any - }{ - Type: r.entity.Type, - Id: r.entity.Id.MarshalableToJSON(), - }, + } + for _, er := range r.entityRefs { + rjson.EntityRefs = append( + rjson.EntityRefs, entityRef{ + Type: er.Type, + Id: er.Id, + Attrs: er.Attrs, + SchemaURL: er.SchemaUrl, + }, + ) } return json.Marshal(rjson) @@ -329,7 +452,7 @@ func (r *Resource) Len() int { if r == nil { return 0 } - return r.entity.Attrs.Len() + return len(r.attrs) } // Encoded returns an encoded representation of the resource. @@ -337,5 +460,123 @@ func (r *Resource) Encoded(enc attribute.Encoder) string { if r == nil { return "" } - return r.entity.Attrs.Encoded(enc) + return r.attrSet.Encoded(enc) +} + +func (r *Resource) getEntityId(entity resourceEntityRef) (attribute.Set, error) { + return r.getAttrsByKeys(entity.Id) +} + +func (r *Resource) getEntityDescr(entity resourceEntityRef) (attribute.Set, error) { + return r.getAttrsByKeys(entity.Attrs) +} + +func (r *Resource) getAttrsByKeys(keys map[attribute.Key]bool) (attribute.Set, error) { + var id []attribute.KeyValue + for key := range keys { + val, exists := r.attrs[key] + if !exists { + return attribute.NewSet(), fmt.Errorf( + "invalid resourceEntityRef, key %s not found in Resource attrs", key, + ) + } + id = append(id, attribute.KeyValue{Key: attribute.Key(key), Value: val}) + } + return attribute.NewSet(id...), nil +} + +func (r *Resource) overwriteEntityDescr(entity resourceEntityRef, attrs attribute.Set) error { + idx := r.findEntity(entity) + if idx < 0 { + return errors.New("invalid resourceEntityRef") + } + updateEnt := &r.entityRefs[idx] + + iter := attrs.Iter() + for iter.Next() { + idAttr := iter.Attribute() + r.attrs[idAttr.Key] = idAttr.Value + updateEnt.Attrs[idAttr.Key] = true + } + return nil +} + +func (r *Resource) findEntity(entity resourceEntityRef) int { + for i, e := range r.entityRefs { + if e.Type == entity.Type && equalEntityIdKeys(e.Id, entity.Id) { + return i + } + } + return -1 +} + +func (r *Resource) overwriteEntity( + intoEnt resourceEntityRef, fromRes *Resource, fromEnt resourceEntityRef, +) error { + idx := r.findEntity(intoEnt) + if idx < 0 { + return errors.New("invalid resourceEntityRef") + } + updateEnt := &r.entityRefs[idx] + + updateEnt.Type = fromEnt.Type + updateEnt.SchemaUrl = fromEnt.SchemaUrl + + id, err := fromRes.getEntityId(fromEnt) + if err != nil { + return err + } + + r.setEntityId(updateEnt, id) + attrs, err := fromRes.getEntityDescr(fromEnt) + if err != nil { + return err + } + r.setEntityDescr(updateEnt, attrs) + + return nil +} + +func (r *Resource) setEntityId(ent *resourceEntityRef, id attribute.Set) { + iter := id.Iter() + ent.Id = map[attribute.Key]bool{} + for iter.Next() { + attr := iter.Attribute() + ent.Id[attr.Key] = true + r.attrs[attr.Key] = attr.Value + } +} + +func (r *Resource) setEntityDescr(ent *resourceEntityRef, attrs attribute.Set) { + iter := attrs.Iter() + ent.Id = map[attribute.Key]bool{} + for iter.Next() { + attr := iter.Attribute() + ent.Attrs[attr.Key] = true + r.attrs[attr.Key] = attr.Value + } + +} + +func (r *Resource) materializeAttrs() { + r.attrSet = mapAttrsToSet(r.attrs) + for _, entity := range r.entityRefs { + entity.materialize() + } +} + +func (r *Resource) EntityRefs() []resourceEntityRef { + return r.entityRefs +} + +func equalEntityIdKeys(id1 map[attribute.Key]bool, id2 map[attribute.Key]bool) bool { + if len(id1) != len(id2) { + return false + } + for k := range id1 { + if !id2[k] { + return false + } + } + return true }