From 7f149b12e8ac01da00dd17efc7f676adc9ef165f Mon Sep 17 00:00:00 2001 From: Tigran Najaryan Date: Thu, 17 Oct 2024 17:42:02 -0400 Subject: [PATCH] Support multiple entity refs in the Resource --- attribute/set.go | 9 + example/dice/otel.go | 25 +- .../internal/tracetransform/resource.go | 18 +- go.sum | 1 + sdk/resource/builtin.go | 65 +-- sdk/resource/config.go | 40 +- sdk/resource/entity.go | 10 + sdk/resource/entityref.go | 49 ++ sdk/resource/host_id.go | 14 +- sdk/resource/internal/entity.go | 62 --- sdk/resource/process.go | 43 +- sdk/resource/resource.go | 423 ++++++++++++++---- 12 files changed, 551 insertions(+), 208 deletions(-) create mode 100644 sdk/resource/entity.go create mode 100644 sdk/resource/entityref.go delete mode 100644 sdk/resource/internal/entity.go diff --git a/attribute/set.go b/attribute/set.go index 7cb3e720dde..5e181e15134 100644 --- a/attribute/set.go +++ b/attribute/set.go @@ -307,6 +307,15 @@ func filteredToFront(slice []KeyValue, keep Filter) int { return j } +func (l *Set) Clone() Set { + s, _ := l.Filter( + func(KeyValue) bool { + return true + }, + ) + return s +} + // Filter returns a filtered copy of this Set. See the documentation for // NewSetWithSortableFiltered for more details. func (l *Set) Filter(re Filter) (Set, []KeyValue) { diff --git a/example/dice/otel.go b/example/dice/otel.go index 8185c1412f8..23f111ca45a 100644 --- a/example/dice/otel.go +++ b/example/dice/otel.go @@ -20,6 +20,7 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/exporters/stdout/stdoutmetric" "go.opentelemetry.io/otel/exporters/stdout/stdouttrace" "go.opentelemetry.io/otel/propagation" @@ -86,14 +87,28 @@ func setupOTelSDK(ctx context.Context, serviceName, serviceVersion string) ( } func newResource(serviceName, serviceVersion string) (*resource.Resource, error) { - return resource.Merge( - resource.Default(), - resource.NewWithAttributes( + res, err := resource.New( + context.Background(), + resource.WithHost(), + resource.WithHostID(), + resource.WithProcess(), + resource.WithEntity( semconv.SchemaURL, - semconv.ServiceName(serviceName), - semconv.ServiceVersion(serviceVersion), + "service", + []attribute.KeyValue{ + semconv.ServiceName(serviceName), + semconv.ServiceVersion(serviceVersion), + }, + nil, ), ) + if err != nil { + return nil, err + } + return resource.Merge( + resource.Default(), + res, + ) } func newPropagator() propagation.TextMapPropagator { diff --git a/exporters/otlp/otlptrace/internal/tracetransform/resource.go b/exporters/otlp/otlptrace/internal/tracetransform/resource.go index 21c15de7174..7513afc93b0 100644 --- a/exporters/otlp/otlptrace/internal/tracetransform/resource.go +++ b/exporters/otlp/otlptrace/internal/tracetransform/resource.go @@ -27,11 +27,21 @@ func Resource(r *resource.Resource) *resourcepb.Resource { } attrs := Iterator(r.Iter()) - entityId := Iterator(r.EntityId().Iter()) - return &resourcepb.Resource{ + out := &resourcepb.Resource{ Attributes: attrs, - EntityType: r.EntityType(), - EntityId: entityId, } + + for _, entity := range r.EntityRefs() { + out.Entities = append( + out.Entities, &resourcepb.ResourceEntityRef{ + SchemaUrl: entity.SchemaUrl(), + Type: entity.Type(), + IdAttrKeys: entity.Id(), + DescrAttrKeys: entity.Attrs(), + }, + ) + } + + return out } diff --git a/go.sum b/go.sum index 10051f76c68..96fd3ecbc2e 100644 --- a/go.sum +++ b/go.sum @@ -9,6 +9,7 @@ github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/sdk/resource/builtin.go b/sdk/resource/builtin.go index 94f93abcb43..502a3d8d67a 100644 --- a/sdk/resource/builtin.go +++ b/sdk/resource/builtin.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/sdk" - "go.opentelemetry.io/otel/sdk/resource/internal" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) @@ -42,10 +41,9 @@ type ( host struct{} stringDetector struct { - schemaURL string - K attribute.Key - F func() (string, error) - entityType string + schemaURL string + K attribute.Key + F func() (string, error) } defaultServiceNameDetector struct{} @@ -70,7 +68,19 @@ func (telemetrySDK) Detect(context.Context) (*Resource, error) { // Detect returns a *Resource that describes the host being run on. func (host) Detect(ctx context.Context) (*Resource, error) { - return StringDetector(semconv.SchemaURL, semconv.HostNameKey, os.Hostname).Detect(ctx) + name, err := os.Hostname() + if err != nil { + return nil, err + } + return NewWithEntities( + []Entity{ + { + Type: "host", + Attrs: attribute.NewSet(semconv.HostName(name)), + SchemaURL: semconv.SchemaURL, + }, + }, + ) } // StringDetector returns a Detector that will produce a *Resource @@ -80,14 +90,6 @@ func StringDetector(schemaURL string, k attribute.Key, f func() (string, error)) return stringDetector{schemaURL: schemaURL, K: k, F: f} } -// StringDetectorWithEntity returns a Detector that will produce a *Resource -// containing the string as a value corresponding to k. The Id of entity of the -// resource will also be set to the same key/value pair. -// The resulting Resource will have the specified schemaURL. -func StringDetectorWithEntity(schemaURL string, entityType string, k attribute.Key, f func() (string, error)) Detector { - return stringDetector{schemaURL: schemaURL, K: k, F: f, entityType: entityType} -} - // Detect returns a *Resource that describes the string as a value // corresponding to attribute.Key as well as the specific schemaURL. func (sd stringDetector) Detect(ctx context.Context) (*Resource, error) { @@ -99,27 +101,26 @@ func (sd stringDetector) Detect(ctx context.Context) (*Resource, error) { if !a.Valid() { return nil, fmt.Errorf("invalid attribute: %q -> %q", a.Key, a.Value.Emit()) } - id := attribute.NewSet(sd.K.String(value)) - entity := internal.EntityData{ - Type: sd.entityType, - Id: id, - Attrs: id, - } - return NewWithEntity(sd.schemaURL, &entity), nil + return NewWithAttributes(sd.schemaURL, sd.K.String(value)), nil } // Detect implements Detector. func (defaultServiceNameDetector) Detect(ctx context.Context) (*Resource, error) { - return StringDetectorWithEntity( - semconv.SchemaURL, - "service", - semconv.ServiceNameKey, - func() (string, error) { - executable, err := os.Executable() - if err != nil { - return "unknown_service:go", nil - } - return "unknown_service:" + filepath.Base(executable), nil + serviceName := "" + executable, err := os.Executable() + if err != nil { + serviceName = "unknown_service:go" + } else { + serviceName = "unknown_service:" + filepath.Base(executable) + } + + return NewWithEntities( + []Entity{ + { + Type: "service", + Id: attribute.NewSet(semconv.ServiceName(serviceName)), + SchemaURL: semconv.SchemaURL, + }, }, - ).Detect(ctx) + ) } diff --git a/sdk/resource/config.go b/sdk/resource/config.go index 613809f42d2..b9d0c1bf638 100644 --- a/sdk/resource/config.go +++ b/sdk/resource/config.go @@ -27,8 +27,8 @@ type config struct { // SchemaURL to associate with the Resource. schemaURL string - entityType string - entityId []attribute.KeyValue + //entityType string + //entityId []attribute.KeyValue } // Option is the interface that applies a configuration option. @@ -96,20 +96,36 @@ func (o schemaURLOption) apply(cfg config) config { return cfg } -// WithEntity sets the schema URL for the configured resource. -func WithEntity(entityType string, entityId ...attribute.KeyValue) Option { - return entityOption{entityType, entityId} +type entityDetector struct { + SchemaURL string + Type string + Id []attribute.KeyValue + Attrs []attribute.KeyValue } -type entityOption struct { - entityType string - entityId []attribute.KeyValue +func (e entityDetector) Detect(ctx context.Context) (*Resource, error) { + return NewWithEntities( + []Entity{ + { + Type: e.Type, + Id: attribute.NewSet(e.Id...), + Attrs: attribute.NewSet(e.Attrs...), + SchemaURL: e.SchemaURL, + }, + }, + ) } -func (o entityOption) apply(cfg config) config { - cfg.entityType = o.entityType - cfg.entityId = o.entityId - return cfg +// WithEntity sets the schema URL for the configured resource. +func WithEntity(schemaURL string, entityType string, id, attrs []attribute.KeyValue) Option { + return WithDetectors( + entityDetector{ + SchemaURL: schemaURL, + Type: entityType, + Id: id, + Attrs: attrs, + }, + ) } // WithOS adds all the OS attributes to the configured Resource. diff --git a/sdk/resource/entity.go b/sdk/resource/entity.go new file mode 100644 index 00000000000..bd84d4e8f02 --- /dev/null +++ b/sdk/resource/entity.go @@ -0,0 +1,10 @@ +package resource + +import "go.opentelemetry.io/otel/attribute" + +type Entity struct { + Type string + Id attribute.Set + Attrs attribute.Set + SchemaURL string +} diff --git a/sdk/resource/entityref.go b/sdk/resource/entityref.go new file mode 100644 index 00000000000..2b062c7147b --- /dev/null +++ b/sdk/resource/entityref.go @@ -0,0 +1,49 @@ +package resource + +import "go.opentelemetry.io/otel/attribute" + +type resourceEntityRef struct { + schemaUrl string + + // Defines the entity type, e.g "service", "k8s.pod", etc. + typ string + + // Set of Resource attribute keys that identify the entity. + id map[attribute.Key]bool + + // Set of Resource attribute keys that describe the entity. + attrs map[attribute.Key]bool + + // id and attrs are cached as slices for faster exporting. + idAsSlice []string + attrsAsSlice []string +} + +// updateCache must be called after id or attrs are modified to make +// sure the cache is up to date. +func (r *resourceEntityRef) updateCache() { + r.idAsSlice = r.idAsSlice[:0] + for k := range r.id { + r.idAsSlice = append(r.idAsSlice, string(k)) + } + r.attrsAsSlice = r.attrsAsSlice[:0] + for k := range r.attrs { + r.attrsAsSlice = append(r.attrsAsSlice, string(k)) + } +} + +func (r *resourceEntityRef) SchemaUrl() string { + return r.schemaUrl +} + +func (r *resourceEntityRef) Type() string { + return r.typ +} + +func (r *resourceEntityRef) Id() []string { + return r.idAsSlice +} + +func (r *resourceEntityRef) Attrs() []string { + return r.attrsAsSlice +} diff --git a/sdk/resource/host_id.go b/sdk/resource/host_id.go index f579329c2c6..512b516027e 100644 --- a/sdk/resource/host_id.go +++ b/sdk/resource/host_id.go @@ -19,6 +19,7 @@ import ( "errors" "strings" + "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) @@ -113,8 +114,13 @@ func (hostIDDetector) Detect(ctx context.Context) (*Resource, error) { return nil, err } - return NewWithAttributes( - semconv.SchemaURL, - semconv.HostID(hostID), - ), nil + return NewWithEntities( + []Entity{ + { + Type: "host", + Id: attribute.NewSet(semconv.HostID(hostID)), + SchemaURL: semconv.SchemaURL, + }, + }, + ) } diff --git a/sdk/resource/internal/entity.go b/sdk/resource/internal/entity.go deleted file mode 100644 index ec8d29041b5..00000000000 --- a/sdk/resource/internal/entity.go +++ /dev/null @@ -1,62 +0,0 @@ -package internal - -import "go.opentelemetry.io/otel/attribute" - -type EntityData struct { - // Defines the producing entity type of this resource, e.g "service", "k8s.pod", etc. - // Empty for legacy Resources that are not entity-aware. - Type string - // Set of attributes that identify the entity. - // Note that a copy of identifying attributes will be also recorded in the Attrs field. - Id attribute.Set - - Attrs attribute.Set -} - -func MergeEntities(a, b *EntityData) *EntityData { - // Note: 'b' attributes will overwrite 'a' with last-value-wins in attribute.Key() - // Meaning this is equivalent to: append(a.Attributes(), b.Attributes()...) - mergedAttrs := mergeAttrs(&b.Attrs, &a.Attrs) - - var mergedType string - var mergedId attribute.Set - - if a.Type == b.Type { - mergedType = a.Type - mergedId = mergeAttrs(&b.Id, &a.Id) - } else { - if a.Type == "" { - mergedType = b.Type - mergedId = b.Id - } else if b.Type == "" { - mergedType = a.Type - mergedId = a.Id - } else { - // Different non-empty entities. - mergedId = a.Id - // TODO: merge the id of the updating Entity into the non-identifying - // attributes of the old Resource, attributes from the updating Entity - // take precedence. - panic("not implemented") - } - } - - return &EntityData{ - Type: mergedType, - Id: mergedId, - Attrs: mergedAttrs, - } -} - -func mergeAttrs(a, b *attribute.Set) attribute.Set { - if a.Len()+b.Len() == 0 { - return *attribute.EmptySet() - } - - mi := attribute.NewMergeIterator(a, b) - combine := make([]attribute.KeyValue, 0, a.Len()+b.Len()) - for mi.Next() { - combine = append(combine, mi.Attribute()) - } - return attribute.NewSet(combine...) -} diff --git a/sdk/resource/process.go b/sdk/resource/process.go index 739ea4512ac..3f141f93c11 100644 --- a/sdk/resource/process.go +++ b/sdk/resource/process.go @@ -22,6 +22,7 @@ import ( "path/filepath" "runtime" + "go.opentelemetry.io/otel/attribute" semconv "go.opentelemetry.io/otel/semconv/v1.24.0" ) @@ -124,14 +125,34 @@ type ( // Detect returns a *Resource that describes the process identifier (PID) of the // executing process. func (processPIDDetector) Detect(ctx context.Context) (*Resource, error) { - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessPID(pid())), nil + return NewWithEntities( + []Entity{ + { + Type: "process", + Id: attribute.NewSet(semconv.ProcessPID(pid())), + SchemaURL: semconv.SchemaURL, + }, + }, + ) +} + +func newProcessWithDescr(attrs ...attribute.KeyValue) (*Resource, error) { + return NewWithEntities( + []Entity{ + { + Type: "process", + Attrs: attribute.NewSet(attrs...), + SchemaURL: semconv.SchemaURL, + }, + }, + ) } // Detect returns a *Resource that describes the name of the process executable. func (processExecutableNameDetector) Detect(ctx context.Context) (*Resource, error) { executableName := filepath.Base(commandArgs()[0]) - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessExecutableName(executableName)), nil + return newProcessWithDescr(semconv.ProcessExecutableName(executableName)) } // Detect returns a *Resource that describes the full path of the process executable. @@ -141,13 +162,13 @@ func (processExecutablePathDetector) Detect(ctx context.Context) (*Resource, err return nil, err } - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessExecutablePath(executablePath)), nil + return newProcessWithDescr(semconv.ProcessExecutablePath(executablePath)) } // Detect returns a *Resource that describes all the command arguments as received // by the process. func (processCommandArgsDetector) Detect(ctx context.Context) (*Resource, error) { - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessCommandArgs(commandArgs()...)), nil + return newProcessWithDescr(semconv.ProcessCommandArgs(commandArgs()...)) } // Detect returns a *Resource that describes the username of the user that owns the @@ -158,27 +179,25 @@ func (processOwnerDetector) Detect(ctx context.Context) (*Resource, error) { return nil, err } - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessOwner(owner.Username)), nil + return newProcessWithDescr(semconv.ProcessOwner(owner.Username)) } // Detect returns a *Resource that describes the name of the compiler used to compile // this process image. func (processRuntimeNameDetector) Detect(ctx context.Context) (*Resource, error) { - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessRuntimeName(runtimeName())), nil + return newProcessWithDescr(semconv.ProcessRuntimeName(runtimeName())) } // Detect returns a *Resource that describes the version of the runtime of this process. func (processRuntimeVersionDetector) Detect(ctx context.Context) (*Resource, error) { - return NewWithAttributes(semconv.SchemaURL, semconv.ProcessRuntimeVersion(runtimeVersion())), nil + return newProcessWithDescr(semconv.ProcessRuntimeVersion(runtimeVersion())) } // Detect returns a *Resource that describes the runtime of this process. func (processRuntimeDescriptionDetector) Detect(ctx context.Context) (*Resource, error) { runtimeDescription := fmt.Sprintf( - "go version %s %s/%s", runtimeVersion(), runtimeOS(), runtimeArch()) + "go version %s %s/%s", runtimeVersion(), runtimeOS(), runtimeArch(), + ) - return NewWithAttributes( - semconv.SchemaURL, - semconv.ProcessRuntimeDescription(runtimeDescription), - ), nil + return newProcessWithDescr(semconv.ProcessRuntimeDescription(runtimeDescription)) } diff --git a/sdk/resource/resource.go b/sdk/resource/resource.go index c3e3e8ac209..10a386c0993 100644 --- a/sdk/resource/resource.go +++ b/sdk/resource/resource.go @@ -18,11 +18,11 @@ import ( "context" "encoding/json" "errors" + "fmt" "sync" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/sdk/resource/internal" ) // Resource describes an entity about which identifying information @@ -33,10 +33,13 @@ import ( // (`*resource.Resource`). The `nil` value is equivalent to an empty // Resource. type Resource struct { - schemaURL string + attrs map[attribute.Key]attribute.Value - // Producing entity. - entity internal.EntityData + // attrSet is cached attribute.Set representation of attrs. + attrSet attribute.Set + + schemaURL string + entityRefs []resourceEntityRef } var ( @@ -53,16 +56,7 @@ func New(ctx context.Context, opts ...Option) (*Resource, error) { cfg = opt.apply(cfg) } - entityId, _ := attribute.NewSetWithFiltered( - cfg.entityId, func(kv attribute.KeyValue) bool { - return kv.Valid() - }, - ) - - r := &Resource{ - schemaURL: cfg.schemaURL, - entity: internal.EntityData{Type: cfg.entityType, Id: entityId}, - } + r := &Resource{schemaURL: cfg.schemaURL} return r, detect(ctx, r, cfg.detectors) } @@ -76,28 +70,59 @@ func NewWithAttributes(schemaURL string, attrs ...attribute.KeyValue) *Resource return resource } -// NewWithEntity creates a resource from entity and attrs and associates the resource with a +// NewWithEntities creates a resource from entity and attrs and associates the resource with a // schema URL. If attrs or entityId contains duplicate keys, the last value will be used. If attrs or entityId // contains any invalid items those items will be dropped. The attrs and entityId are assumed to be // in a schema identified by schemaURL. -func NewWithEntity( - schemaURL string, entity *internal.EntityData, -) *Resource { - resource := NewSchemaless(entity.Attrs.ToSlice()...) - resource.schemaURL = schemaURL - resource.entity = *entity - //resource.entity.Type = entityType - - // Ensure attributes comply with the specification: - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute - //id, _ := attribute.NewSetWithFiltered( - // entityId, func(kv attribute.KeyValue) bool { - // return kv.Valid() - // }, - //) - // - //resource.entity.Id = id - return resource +func NewWithEntities( + entities []Entity, +) (*Resource, error) { + resource := &Resource{} + + for _, entity := range entities { + b := &Resource{ + schemaURL: entity.SchemaURL, + attrs: map[attribute.Key]attribute.Value{}, + entityRefs: []resourceEntityRef{{}}, + } + + entityRef := &b.entityRefs[0] + entityRef.typ = entity.Type + entityRef.id = map[attribute.Key]bool{} + entityRef.attrs = map[attribute.Key]bool{} + entityRef.schemaUrl = entity.SchemaURL + + ids := entity.Id.Iter() + for ids.Next() { + attr := ids.Attribute() + if !attr.Valid() { + continue + } + entityRef.id[attr.Key] = true + b.attrs[attr.Key] = attr.Value + } + attrs := entity.Attrs.Iter() + for attrs.Next() { + attr := attrs.Attribute() + if !attr.Valid() { + continue + } + if _, exists := b.attrs[attr.Key]; exists { + return nil, fmt.Errorf("invalid Entity, key %q is both an id and Attr", attr.Key) + } + entityRef.attrs[attr.Key] = true + b.attrs[attr.Key] = attr.Value + } + entityRef.updateCache() + + var err error + resource, err = Merge(resource, b) + if err != nil { + return nil, err + } + } + + return resource, nil } // NewSchemaless creates a resource from attrs. If attrs contains duplicate keys, @@ -109,20 +134,40 @@ func NewSchemaless(attrs ...attribute.KeyValue) *Resource { return &Resource{} } - // Ensure attributes comply with the specification: - // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute - s, _ := attribute.NewSetWithFiltered( - attrs, func(kv attribute.KeyValue) bool { - return kv.Valid() - }, - ) + m := map[attribute.Key]attribute.Value{} + for _, attr := range attrs { + // Ensure attributes comply with the specification: + // https://github.com/open-telemetry/opentelemetry-specification/blob/v1.20.0/specification/common/README.md#attribute + if attr.Valid() { + m[attr.Key] = attr.Value + } + } // If attrs only contains invalid entries do not allocate a new resource. - if s.Len() == 0 { + if len(m) == 0 { return &Resource{} } - return &Resource{entity: internal.EntityData{Id: attribute.NewSet(), Attrs: s}} //nolint + r := &Resource{attrs: m} + r.updateCache() + return r +} + +func mapAttrsToSlice(m map[attribute.Key]attribute.Value) []attribute.KeyValue { + var kv []attribute.KeyValue + for k, v := range m { + kv = append( + kv, attribute.KeyValue{ + Key: k, + Value: v, + }, + ) + } + return kv +} + +func mapAttrsToSet(m map[attribute.Key]attribute.Value) attribute.Set { + return attribute.NewSet(mapAttrsToSlice(m)...) } // String implements the Stringer interface and provides a @@ -134,7 +179,8 @@ func (r *Resource) String() string { if r == nil { return "" } - return r.entity.Attrs.Encoded(attribute.DefaultEncoder()) + s := mapAttrsToSet(r.attrs) + return s.Encoded(attribute.DefaultEncoder()) } // MarshalLog is the marshaling function used by the logging system to represent this Resource. @@ -143,7 +189,7 @@ func (r *Resource) MarshalLog() interface{} { Attributes attribute.Set SchemaURL string }{ - Attributes: r.entity.Attrs, + Attributes: mapAttrsToSet(r.attrs), SchemaURL: r.schemaURL, } } @@ -154,7 +200,7 @@ func (r *Resource) Attributes() []attribute.KeyValue { if r == nil { r = Empty() } - return r.entity.Attrs.ToSlice() + return mapAttrsToSlice(r.attrs) } // SchemaURL returns the schema URL associated with Resource r. @@ -165,27 +211,13 @@ func (r *Resource) SchemaURL() string { return r.schemaURL } -func (r *Resource) EntityId() *attribute.Set { - if r == nil { - return attribute.EmptySet() - } - return &r.entity.Id -} - -func (r *Resource) EntityType() string { - if r == nil { - return "" - } - return r.entity.Type -} - // Iter returns an iterator of the Resource attributes. // This is ideal to use if you do not want a copy of the attributes. func (r *Resource) Iter() attribute.Iterator { if r == nil { r = Empty() } - return r.entity.Attrs.Iter() + return r.attrSet.Iter() } // Equal returns true when a Resource is equivalent to this Resource. @@ -233,12 +265,101 @@ func Merge(a, b *Resource) (*Resource, error) { return Empty(), errMergeConflictSchemaURL } - mergedEntity := internal.MergeEntities(&a.entity, &b.entity) - merged := NewWithEntity(schemaURL, mergedEntity) + merged := &Resource{ + attrs: cloneAttrs(a.attrs), + schemaURL: schemaURL, + entityRefs: make([]resourceEntityRef, len(a.entityRefs)), + } + for k, v := range b.attrs { + merged.attrs[k] = v + } + + copy(merged.entityRefs, a.entityRefs) + + entityTypes := map[string]resourceEntityRef{} + for _, er := range a.entityRefs { + entityTypes[er.typ] = er + } + + for _, er := range b.entityRefs { + if existingEr, exists := entityTypes[er.typ]; !exists { + merged.entityRefs = append(merged.entityRefs, er) + entityTypes[er.typ] = er + + for k := range er.id { + merged.attrs[k] = b.attrs[k] + } + for k := range er.attrs { + merged.attrs[k] = b.attrs[k] + } + + } else { + err := mergeEntity(merged, existingEr, b, er) + if err != nil { + return nil, err + } + } + } + + merged.updateCache() return merged, nil } +func cloneAttrs(attrs map[attribute.Key]attribute.Value) map[attribute.Key]attribute.Value { + m := map[attribute.Key]attribute.Value{} + for k, v := range attrs { + m[k] = v + } + return m +} + +func mergeEntity( + intoRes *Resource, intoEnt resourceEntityRef, fromRes *Resource, fromEnt resourceEntityRef, +) error { + intoId, err := intoRes.getEntityId(intoEnt) + if err != nil { + return err + } + + fromId, err := fromRes.getEntityId(fromEnt) + if err != nil { + return err + } + + if true || intoId == fromId { + // id is the same. + if intoEnt.schemaUrl == fromEnt.schemaUrl { + // SchemaURL is the same too. + // Merge descriptive attributes. + attrs, err := fromRes.getEntityDescr(fromEnt) + if err != nil { + return err + } + + err = intoRes.mergeEntity(intoEnt, fromId, attrs) + if err != nil { + return err + } + } else { + // SchemaURL is different. + // Overwrite entity + err = intoRes.overwriteEntity(intoEnt, fromRes, fromEnt) + if err != nil { + return err + } + } + } else { + // id is different + // Overwrite entity + err = intoRes.overwriteEntity(intoEnt, fromRes, fromEnt) + if err != nil { + return err + } + } + return nil +} + // Empty returns an instance of Resource with no attributes. It is // equivalent to a `nil` Resource. func Empty() *Resource { @@ -292,7 +413,7 @@ func (r *Resource) Set() *attribute.Set { if r == nil { r = Empty() } - return &r.entity.Attrs + return &r.attrSet } // MarshalJSON encodes the resource attributes as a JSON list of { "Key": @@ -302,23 +423,30 @@ func (r *Resource) MarshalJSON() ([]byte, error) { r = Empty() } + type entityRef struct { + Type string + Id any + Attrs any + SchemaURL string + } + rjson := struct { Attributes any SchemaURL string - Entity struct { - Type string - Id any - } + EntityRefs []entityRef }{ - Attributes: r.entity.Attrs.MarshalableToJSON(), + Attributes: r.attrSet.MarshalableToJSON(), SchemaURL: r.schemaURL, - Entity: struct { - Type string - Id any - }{ - Type: r.entity.Type, - Id: r.entity.Id.MarshalableToJSON(), - }, + } + for _, er := range r.entityRefs { + rjson.EntityRefs = append( + rjson.EntityRefs, entityRef{ + Type: er.typ, + Id: er.idAsSlice, + Attrs: er.attrsAsSlice, + SchemaURL: er.schemaUrl, + }, + ) } return json.Marshal(rjson) @@ -329,7 +457,7 @@ func (r *Resource) Len() int { if r == nil { return 0 } - return r.entity.Attrs.Len() + return len(r.attrs) } // Encoded returns an encoded representation of the resource. @@ -337,5 +465,146 @@ func (r *Resource) Encoded(enc attribute.Encoder) string { if r == nil { return "" } - return r.entity.Attrs.Encoded(enc) + return r.attrSet.Encoded(enc) +} + +func (r *Resource) getEntityId(entity resourceEntityRef) (attribute.Set, error) { + return r.getAttrsByKeys(entity.id) +} + +func (r *Resource) getEntityDescr(entity resourceEntityRef) (attribute.Set, error) { + return r.getAttrsByKeys(entity.attrs) +} + +func (r *Resource) getAttrsByKeys(keys map[attribute.Key]bool) (attribute.Set, error) { + var id []attribute.KeyValue + for key := range keys { + val, exists := r.attrs[key] + if !exists { + return attribute.NewSet(), fmt.Errorf( + "invalid resourceEntityRef, key %s not found in Resource attrs", key, + ) + } + id = append(id, attribute.KeyValue{Key: attribute.Key(key), Value: val}) + } + return attribute.NewSet(id...), nil +} + +func (r *Resource) mergeEntity(entity resourceEntityRef, id, attrs attribute.Set) error { + idx := r.findEntity(entity) + if idx < 0 { + return errors.New("invalid resourceEntityRef") + } + updateEnt := &r.entityRefs[idx] + + iter := id.Iter() + for iter.Next() { + attr := iter.Attribute() + r.attrs[attr.Key] = attr.Value + updateEnt.id[attr.Key] = true + } + + iter = attrs.Iter() + for iter.Next() { + attr := iter.Attribute() + r.attrs[attr.Key] = attr.Value + updateEnt.attrs[attr.Key] = true + } + return nil +} + +func (r *Resource) mergeEntityDescr(entity resourceEntityRef, attrs attribute.Set) error { + idx := r.findEntity(entity) + if idx < 0 { + return errors.New("invalid resourceEntityRef") + } + updateEnt := &r.entityRefs[idx] + + iter := attrs.Iter() + for iter.Next() { + idAttr := iter.Attribute() + r.attrs[idAttr.Key] = idAttr.Value + updateEnt.attrs[idAttr.Key] = true + } + return nil +} + +func (r *Resource) findEntity(entity resourceEntityRef) int { + for i, e := range r.entityRefs { + if e.typ == entity.typ && equalEntityIdKeys(e.id, entity.id) { + return i + } + } + return -1 +} + +func (r *Resource) overwriteEntity( + intoEnt resourceEntityRef, fromRes *Resource, fromEnt resourceEntityRef, +) error { + idx := r.findEntity(intoEnt) + if idx < 0 { + return errors.New("invalid resourceEntityRef") + } + updateEnt := &r.entityRefs[idx] + + updateEnt.typ = fromEnt.typ + updateEnt.schemaUrl = fromEnt.schemaUrl + + id, err := fromRes.getEntityId(fromEnt) + if err != nil { + return err + } + + r.setEntityId(updateEnt, id) + attrs, err := fromRes.getEntityDescr(fromEnt) + if err != nil { + return err + } + r.setEntityDescr(updateEnt, attrs) + + return nil +} + +func (r *Resource) setEntityId(ent *resourceEntityRef, id attribute.Set) { + iter := id.Iter() + ent.id = map[attribute.Key]bool{} + for iter.Next() { + attr := iter.Attribute() + ent.id[attr.Key] = true + r.attrs[attr.Key] = attr.Value + } +} + +func (r *Resource) setEntityDescr(ent *resourceEntityRef, attrs attribute.Set) { + iter := attrs.Iter() + ent.attrs = map[attribute.Key]bool{} + for iter.Next() { + attr := iter.Attribute() + ent.attrs[attr.Key] = true + r.attrs[attr.Key] = attr.Value + } + +} + +func (r *Resource) updateCache() { + r.attrSet = mapAttrsToSet(r.attrs) + for i := range r.entityRefs { + r.entityRefs[i].updateCache() + } +} + +func (r *Resource) EntityRefs() []resourceEntityRef { + return r.entityRefs +} + +func equalEntityIdKeys(id1 map[attribute.Key]bool, id2 map[attribute.Key]bool) bool { + if len(id1) != len(id2) { + return false + } + for k := range id1 { + if !id2[k] { + return false + } + } + return true }