diff --git a/.chloggen/fix-issue-40400.yaml b/.chloggen/fix-issue-40400.yaml new file mode 100644 index 0000000000000..f4a5b8af4f0e3 --- /dev/null +++ b/.chloggen/fix-issue-40400.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: spanmetricsconnector + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Supports adding the `collector.instance.id` attribute to data points generated by the spanmetrics connector. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [40400] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This feature currently in alpha stage, user should enable it by feature-gate `--feature-gates=+connector.spanmetrics.includeCollectorInstanceID` + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/connector/spanmetricsconnector/README.md b/connector/spanmetricsconnector/README.md index 975b23f269f32..32cb665db33dd 100644 --- a/connector/spanmetricsconnector/README.md +++ b/connector/spanmetricsconnector/README.md @@ -55,7 +55,12 @@ across all spans: - `span.name` - `span.kind` - `status.code` +- `collector.instance.id` +The `collector.instance.id` dimension is intended to add a unique UUID to all metrics, ensuring that the spanmetrics connector +does not violate the **Single Writer Principle** when spanmetrics is used in a multi-deployment model. +Currently, `collector.instance.id` must be manually enabled via the feature gate: `connector.spanmetrics.includeServiceInstanceID`. +More detail, please see [Known Limitation: the Single Writer Principle](#known-limitation-the-single-writer-principle) ## Span to Metrics processor to Span to metrics connector @@ -244,9 +249,12 @@ For more example configuration covering various other use cases, please visit th [Connectors README]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md -## Known Limitation: Violation of the Single Writer Principle +## Known Limitation: the Single Writer Principle -The `spanmetricsconnector` currently does not guarantee adherence to the [Single Writer Principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer), which is a core requirement in the OpenTelemetry metrics data model. Depending on how the collector is configured, multiple components may write to the same metric stream. This can result in inconsistent data, metric conflicts, or dropped series in metric backends. +Proper configuration of the `spanmetricsconnector` ensures compliance with the [Single Writer Principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer), +which is a core requirement in the OpenTelemetry metrics data model. Misconfiguration, however, +may allow multiple components to write to the same metric stream, resulting in data inconsistency, +metric conflicts, or the dropping of time series by metric backends. ### Why this happens @@ -260,8 +268,16 @@ This issue typically arises when: To reduce the risk of conflicting writes: -* Avoid using multiple instances of the `spanmetricsconnector` unless metrics are partitioned (e.g., by attribute filtering) so each stream has a single writer -* If multiple pipelines are used, ensure each produces uniquely identified metrics (e.g., inject attributes using a processor) +* Add `resource_metrics_key_attributes` to your configuration. +``` +connectors: + spanmetrics: + resource_metrics_key_attributes: + - service.name + - telemetry.sdk.language + - telemetry.sdk.name +``` +* Manually enable the feature gate: `connector.spanmetrics.includeServiceInstanceID` to produce uniquely identified metrics. * For exporters like Prometheus, which rely on the single writer assumption, use a dedicated pipeline with a single `spanmetricsconnector` instance More context is available in [GitHub issue #21101](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21101). diff --git a/connector/spanmetricsconnector/config.go b/connector/spanmetricsconnector/config.go index 38e3543abd6af..73f0943fee313 100644 --- a/connector/spanmetricsconnector/config.go +++ b/connector/spanmetricsconnector/config.go @@ -41,6 +41,7 @@ type Config struct { // - span.kind // - span.kind // - status.code + // - collector.instance.id This dimensions never added unless enable feature-gate connector.spanmetrics.includeCollectorInstanceID // The dimensions will be fetched from the span's attributes. Examples of some conventionally used attributes: // https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go. Dimensions []Dimension `mapstructure:"dimensions"` @@ -194,7 +195,12 @@ func (c Config) GetDeltaTimestampCacheSize() int { // validateDimensions checks duplicates for reserved dimensions and additional dimensions. func validateDimensions(dimensions []Dimension) error { labelNames := make(map[string]struct{}) - for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey, spanNameKey} { + intervalLabels := []string{serviceNameKey, spanKindKey, statusCodeKey, spanNameKey} + if includeCollectorInstanceID.IsEnabled() { + intervalLabels = append(intervalLabels, collectorInstanceKey) + } + + for _, key := range intervalLabels { labelNames[key] = struct{}{} } diff --git a/connector/spanmetricsconnector/connector.go b/connector/spanmetricsconnector/connector.go index 0d779292c692f..26ecc414e2b68 100644 --- a/connector/spanmetricsconnector/connector.go +++ b/connector/spanmetricsconnector/connector.go @@ -32,6 +32,7 @@ const ( spanNameKey = "span.name" // OpenTelemetry non-standard constant. spanKindKey = "span.kind" // OpenTelemetry non-standard constant. statusCodeKey = "status.code" // OpenTelemetry non-standard constant. + collectorInstanceKey = "collector.instance.id" // OpenTelemetry non-standard constant. instrumentationScopeNameKey = "span.instrumentation.scope.name" // OpenTelemetry non-standard constant. instrumentationScopeVersionKey = "span.instrumentation.scope.version" // OpenTelemetry non-standard constant. metricKeySeparator = string(byte(0)) @@ -86,6 +87,7 @@ type connectorImp struct { // Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics. lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp] + instanceID string } type resourceMetrics struct { @@ -112,7 +114,7 @@ func newDimensions(cfgDims []Dimension) []utilattri.Dimension { return dims } -func newConnector(logger *zap.Logger, config component.Config, clock clockwork.Clock) (*connectorImp, error) { +func newConnector(logger *zap.Logger, config component.Config, clock clockwork.Clock, instanceID string) (*connectorImp, error) { logger.Info("Building spanmetrics connector") cfg := config.(*Config) if cfg.DimensionsCacheSize != 0 { @@ -155,6 +157,7 @@ func newConnector(logger *zap.Logger, config component.Config, clock clockwork.C callsDimensions: newDimensions(cfg.CallsDimensions), durationDimensions: newDimensions(cfg.Histogram.Dimensions), events: cfg.Events, + instanceID: instanceID, }, nil } @@ -529,7 +532,7 @@ func (p *connectorImp) buildAttributes( instrumentationScope pcommon.InstrumentationScope, ) pcommon.Map { attr := pcommon.NewMap() - attr.EnsureCapacity(4 + len(dimensions)) + attr.EnsureCapacity(5 + len(dimensions)) if !contains(p.config.ExcludeDimensions, serviceNameKey) { attr.PutStr(serviceNameKey, serviceName) } @@ -542,6 +545,11 @@ func (p *connectorImp) buildAttributes( if !contains(p.config.ExcludeDimensions, statusCodeKey) { attr.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code())) } + if includeCollectorInstanceID.IsEnabled() { + if !contains(p.config.ExcludeDimensions, collectorInstanceKey) { + attr.PutStr(collectorInstanceKey, p.instanceID) + } + } if contains(p.config.IncludeInstrumentationScope, instrumentationScope.Name()) && instrumentationScope.Name() != "" { attr.PutStr(instrumentationScopeNameKey, instrumentationScope.Name()) diff --git a/connector/spanmetricsconnector/connector_test.go b/connector/spanmetricsconnector/connector_test.go index 249fe8cd27d9d..325b50090fe94 100644 --- a/connector/spanmetricsconnector/connector_test.go +++ b/connector/spanmetricsconnector/connector_test.go @@ -51,6 +51,8 @@ const ( sampleRegion = "us-east-1" sampleDuration = float64(11) + + instanceID = "0044953a-2946-449f-a5c8-2971f2a63928" ) // metricID represents the minimum attributes that uniquely identifies a metric in our tests. @@ -490,7 +492,7 @@ func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramC MetricsFlushInterval: time.Nanosecond, } - c, err := newConnector(zap.NewNop(), cfg, clock) + c, err := newConnector(zap.NewNop(), cfg, clock, instanceID) if err != nil { return nil, err } @@ -505,7 +507,7 @@ func stringp(str string) *string { func TestBuildKeySameServiceNameCharSequence(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) span0 := ptrace.NewSpan() @@ -525,7 +527,7 @@ func TestBuildKeyExcludeDimensionsAll(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.ExcludeDimensions = []string{"span.kind", "service.name", "span.name", "status.code"} - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) span0 := ptrace.NewSpan() @@ -538,7 +540,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.ExcludeDimensions = []string{"span.kind", "service.name.wrong.name", "span.name", "status.code"} - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) span0 := ptrace.NewSpan() @@ -550,7 +552,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) { func TestBuildKeyWithDimensions(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) defaultFoo := pcommon.NewValueStr("bar") @@ -691,7 +693,7 @@ func TestConnectorCapabilities(t *testing.T) { cfg := factory.CreateDefaultConfig().(*Config) // Test - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) // Override the default no-op consumer for testing. c.metricsConsumer = new(consumertest.MetricsSink) assert.NoError(t, err) @@ -1521,7 +1523,7 @@ func TestSpanMetrics_Events(t *testing.T) { factory := NewFactory() cfg := factory.CreateDefaultConfig().(*Config) cfg.Events = tt.eventsConfig - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) err = c.ConsumeTraces(t.Context(), buildSampleTrace()) require.NoError(t, err) @@ -1875,7 +1877,7 @@ func TestSeparateDimensions(t *testing.T) { cfg.Dimensions = []Dimension{{Name: stringAttrName, Default: nil}} cfg.CallsDimensions = []Dimension{{Name: intAttrName, Default: stringp("0")}} cfg.Histogram.Dimensions = []Dimension{{Name: doubleAttrName, Default: stringp("0.0")}} - c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) err = c.ConsumeTraces(t.Context(), buildSampleTrace()) require.NoError(t, err) @@ -2032,7 +2034,7 @@ func TestConnectorWithCardinalityLimit(t *testing.T) { {Name: "region"}, } - connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) require.NotNil(t, connector) @@ -2170,7 +2172,7 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) { {Name: "event.name"}, } - connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock()) + connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID) require.NoError(t, err) require.NotNil(t, connector) @@ -2253,3 +2255,81 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) { assert.Equal(t, 2, normalCount, "expected 2 normal metrics") assert.Equal(t, 1, overflowCount, "expected 1 overflow metric") } + +func TestBuildAttributesWithFeatureGate(t *testing.T) { + tests := []struct { + name string + instrumentationScope pcommon.InstrumentationScope + config Config + want map[string]string + includeCollectorInstanceID bool + }{ + { + name: "disable includeCollectorInstanceID feature-gate", + instrumentationScope: func() pcommon.InstrumentationScope { + scope := pcommon.NewInstrumentationScope() + scope.SetName("express") + scope.SetVersion("1.0.0") + return scope + }(), + config: Config{ + IncludeInstrumentationScope: []string{"express"}, + }, + want: map[string]string{ + serviceNameKey: "test_service", + spanNameKey: "test_span", + spanKindKey: "SPAN_KIND_INTERNAL", + statusCodeKey: "STATUS_CODE_UNSET", + instrumentationScopeNameKey: "express", + instrumentationScopeVersionKey: "1.0.0", + }, + }, + { + name: "enable includeCollectorInstanceID feature-gate", + instrumentationScope: func() pcommon.InstrumentationScope { + scope := pcommon.NewInstrumentationScope() + scope.SetName("express") + scope.SetVersion("1.0.0") + return scope + }(), + config: Config{ + IncludeInstrumentationScope: []string{"express"}, + }, + want: map[string]string{ + serviceNameKey: "test_service", + spanNameKey: "test_span", + spanKindKey: "SPAN_KIND_INTERNAL", + statusCodeKey: "STATUS_CODE_UNSET", + instrumentationScopeNameKey: "express", + instrumentationScopeVersionKey: "1.0.0", + collectorInstanceKey: instanceID, + }, + includeCollectorInstanceID: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + p := &connectorImp{config: tt.config, instanceID: instanceID} + if tt.includeCollectorInstanceID { + require.NoError(t, featuregate.GlobalRegistry().Set(includeCollectorInstanceID.ID(), true)) + } + defer func() { + require.NoError(t, featuregate.GlobalRegistry().Set(includeCollectorInstanceID.ID(), false)) + }() + + span := ptrace.NewSpan() + span.SetName("test_span") + span.SetKind(ptrace.SpanKindInternal) + + attrs := p.buildAttributes("test_service", span, pcommon.NewMap(), nil, tt.instrumentationScope) + + assert.Equal(t, len(tt.want), attrs.Len()) + for k, v := range tt.want { + val, ok := attrs.Get(k) + assert.True(t, ok) + assert.Equal(t, v, val.Str()) + } + }) + } +} diff --git a/connector/spanmetricsconnector/factory.go b/connector/spanmetricsconnector/factory.go index f27b35c2e59a9..6dea8ab1ef626 100644 --- a/connector/spanmetricsconnector/factory.go +++ b/connector/spanmetricsconnector/factory.go @@ -9,21 +9,27 @@ import ( "context" "time" + "github.com/google/uuid" "github.com/jonboulle/clockwork" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/connector" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/featuregate" + "go.opentelemetry.io/collector/pdata/pcommon" "github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/metadata" ) const ( - DefaultNamespace = "traces.span.metrics" - legacyMetricNamesFeatureGateID = "connector.spanmetrics.legacyMetricNames" + DefaultNamespace = "traces.span.metrics" + legacyMetricNamesFeatureGateID = "connector.spanmetrics.legacyMetricNames" + includeCollectorInstanceIDGateID = "connector.spanmetrics.includeCollectorInstanceID" ) -var legacyMetricNamesFeatureGate *featuregate.Gate +var ( + legacyMetricNamesFeatureGate *featuregate.Gate + includeCollectorInstanceID *featuregate.Gate +) func init() { // TODO: Remove this feature gate when the legacy metric names are removed. @@ -33,6 +39,12 @@ func init() { featuregate.WithRegisterDescription("When enabled, connector uses legacy metric names."), featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33227"), ) + includeCollectorInstanceID = featuregate.GlobalRegistry().MustRegister( + includeCollectorInstanceIDGateID, + featuregate.StageAlpha, + featuregate.WithRegisterDescription("When enabled, connector add collector.instance.id to default dimensions."), + featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40400"), + ) } // NewFactory creates a factory for the spanmetrics connector. @@ -59,7 +71,18 @@ func createDefaultConfig() component.Config { } func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) { - c, err := newConnector(params.Logger, cfg, clockwork.FromContext(ctx)) + instanceID, ok := params.Resource.Attributes().Get(collectorInstanceKey) + // This never happens: the OpenTelemetry Collector automatically adds this attribute. + // See: https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/internal/resource/config.go#L31 + // + // The fallback logic below exists solely for lifecycle tests in generated_component_test.go, + // where the mocked telemetry setting does not include the service.instance.id attribute. + if !ok { + instanceUUID, _ := uuid.NewRandom() + instanceID = pcommon.NewValueStr(instanceUUID.String()) + } + + c, err := newConnector(params.Logger, cfg, clockwork.FromContext(ctx), instanceID.AsString()) if err != nil { return nil, err } diff --git a/connector/spanmetricsconnector/factory_test.go b/connector/spanmetricsconnector/factory_test.go index e6633f00a775e..845e847a9347d 100644 --- a/connector/spanmetricsconnector/factory_test.go +++ b/connector/spanmetricsconnector/factory_test.go @@ -49,6 +49,7 @@ func TestNewConnector(t *testing.T) { factory := NewFactory() creationParams := connectortest.NewNopSettings(metadata.Type) + creationParams.Resource.Attributes().PutStr(collectorInstanceKey, instanceID) cfg := factory.CreateDefaultConfig().(*Config) cfg.Histogram.Explicit = configoptional.Some(ExplicitHistogramConfig{ Buckets: tc.durationHistogramBuckets, diff --git a/connector/spanmetricsconnector/go.mod b/connector/spanmetricsconnector/go.mod index db1b9d753ce6d..3402123fa7530 100644 --- a/connector/spanmetricsconnector/go.mod +++ b/connector/spanmetricsconnector/go.mod @@ -3,6 +3,7 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanm go 1.24.0 require ( + github.com/google/uuid v1.6.0 github.com/hashicorp/golang-lru/v2 v2.0.7 github.com/jonboulle/clockwork v0.5.0 github.com/lightstep/go-expohisto v1.0.0 @@ -36,7 +37,6 @@ require ( github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/hashicorp/go-version v1.7.0 // indirect github.com/json-iterator/go v1.1.12 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect