Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/fix-issue-40400.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: spanmetricsconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Supports adding the `collector.instance.id` attribute to data points generated by the spanmetrics connector.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [40400]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
This feature currently in alpha stage, user should enable it by feature-gate `--feature-gates=+connector.spanmetrics.includeCollectorInstanceID`

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [user]
24 changes: 20 additions & 4 deletions connector/spanmetricsconnector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,12 @@ across all spans:
- `span.name`
- `span.kind`
- `status.code`
- `collector.instance.id`

The `collector.instance.id` dimension is intended to add a unique UUID to all metrics, ensuring that the spanmetrics connector
does not violate the **Single Writer Principle** when spanmetrics is used in a multi-deployment model.
Currently, `collector.instance.id` must be manually enabled via the feature gate: `connector.spanmetrics.includeServiceInstanceID`.
More detail, please see [Known Limitation: the Single Writer Principle](#known-limitation-the-single-writer-principle)

## Span to Metrics processor to Span to metrics connector

Expand Down Expand Up @@ -244,9 +249,12 @@ For more example configuration covering various other use cases, please visit th

[Connectors README]: https://github.com/open-telemetry/opentelemetry-collector/blob/main/connector/README.md

## Known Limitation: Violation of the Single Writer Principle
## Known Limitation: the Single Writer Principle

The `spanmetricsconnector` currently does not guarantee adherence to the [Single Writer Principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer), which is a core requirement in the OpenTelemetry metrics data model. Depending on how the collector is configured, multiple components may write to the same metric stream. This can result in inconsistent data, metric conflicts, or dropped series in metric backends.
Proper configuration of the `spanmetricsconnector` ensures compliance with the [Single Writer Principle](https://opentelemetry.io/docs/specs/otel/metrics/data-model/#single-writer),
which is a core requirement in the OpenTelemetry metrics data model. Misconfiguration, however,
may allow multiple components to write to the same metric stream, resulting in data inconsistency,
metric conflicts, or the dropping of time series by metric backends.

### Why this happens

Expand All @@ -260,8 +268,16 @@ This issue typically arises when:

To reduce the risk of conflicting writes:

* Avoid using multiple instances of the `spanmetricsconnector` unless metrics are partitioned (e.g., by attribute filtering) so each stream has a single writer
* If multiple pipelines are used, ensure each produces uniquely identified metrics (e.g., inject attributes using a processor)
* Add `resource_metrics_key_attributes` to your configuration.
```
connectors:
spanmetrics:
resource_metrics_key_attributes:
- service.name
- telemetry.sdk.language
- telemetry.sdk.name
```
* Manually enable the feature gate: `connector.spanmetrics.includeServiceInstanceID` to produce uniquely identified metrics.
* For exporters like Prometheus, which rely on the single writer assumption, use a dedicated pipeline with a single `spanmetricsconnector` instance

More context is available in [GitHub issue #21101](https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/21101).
Expand Down
8 changes: 7 additions & 1 deletion connector/spanmetricsconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ type Config struct {
// - span.kind
// - span.kind
// - status.code
// - collector.instance.id This dimensions never added unless enable feature-gate connector.spanmetrics.includeCollectorInstanceID
// The dimensions will be fetched from the span's attributes. Examples of some conventionally used attributes:
// https://github.com/open-telemetry/opentelemetry-collector/blob/main/model/semconv/opentelemetry.go.
Dimensions []Dimension `mapstructure:"dimensions"`
Expand Down Expand Up @@ -194,7 +195,12 @@ func (c Config) GetDeltaTimestampCacheSize() int {
// validateDimensions checks duplicates for reserved dimensions and additional dimensions.
func validateDimensions(dimensions []Dimension) error {
labelNames := make(map[string]struct{})
for _, key := range []string{serviceNameKey, spanKindKey, statusCodeKey, spanNameKey} {
intervalLabels := []string{serviceNameKey, spanKindKey, statusCodeKey, spanNameKey}
if includeCollectorInstanceID.IsEnabled() {
intervalLabels = append(intervalLabels, collectorInstanceKey)
}

for _, key := range intervalLabels {
labelNames[key] = struct{}{}
}

Expand Down
12 changes: 10 additions & 2 deletions connector/spanmetricsconnector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ const (
spanNameKey = "span.name" // OpenTelemetry non-standard constant.
spanKindKey = "span.kind" // OpenTelemetry non-standard constant.
statusCodeKey = "status.code" // OpenTelemetry non-standard constant.
collectorInstanceKey = "collector.instance.id" // OpenTelemetry non-standard constant.
Comment thread
Frapschen marked this conversation as resolved.
instrumentationScopeNameKey = "span.instrumentation.scope.name" // OpenTelemetry non-standard constant.
instrumentationScopeVersionKey = "span.instrumentation.scope.version" // OpenTelemetry non-standard constant.
metricKeySeparator = string(byte(0))
Expand Down Expand Up @@ -86,6 +87,7 @@ type connectorImp struct {

// Tracks the last TimestampUnixNano for delta metrics so that they represent an uninterrupted series. Unused for cumulative span metrics.
lastDeltaTimestamps *simplelru.LRU[metrics.Key, pcommon.Timestamp]
instanceID string
}

type resourceMetrics struct {
Expand All @@ -112,7 +114,7 @@ func newDimensions(cfgDims []Dimension) []utilattri.Dimension {
return dims
}

func newConnector(logger *zap.Logger, config component.Config, clock clockwork.Clock) (*connectorImp, error) {
func newConnector(logger *zap.Logger, config component.Config, clock clockwork.Clock, instanceID string) (*connectorImp, error) {
logger.Info("Building spanmetrics connector")
cfg := config.(*Config)
if cfg.DimensionsCacheSize != 0 {
Expand Down Expand Up @@ -155,6 +157,7 @@ func newConnector(logger *zap.Logger, config component.Config, clock clockwork.C
callsDimensions: newDimensions(cfg.CallsDimensions),
durationDimensions: newDimensions(cfg.Histogram.Dimensions),
events: cfg.Events,
instanceID: instanceID,
}, nil
}

Expand Down Expand Up @@ -529,7 +532,7 @@ func (p *connectorImp) buildAttributes(
instrumentationScope pcommon.InstrumentationScope,
) pcommon.Map {
attr := pcommon.NewMap()
attr.EnsureCapacity(4 + len(dimensions))
attr.EnsureCapacity(5 + len(dimensions))
if !contains(p.config.ExcludeDimensions, serviceNameKey) {
attr.PutStr(serviceNameKey, serviceName)
}
Expand All @@ -542,6 +545,11 @@ func (p *connectorImp) buildAttributes(
if !contains(p.config.ExcludeDimensions, statusCodeKey) {
attr.PutStr(statusCodeKey, traceutil.StatusCodeStr(span.Status().Code()))
}
if includeCollectorInstanceID.IsEnabled() {
if !contains(p.config.ExcludeDimensions, collectorInstanceKey) {
attr.PutStr(collectorInstanceKey, p.instanceID)
}
}

if contains(p.config.IncludeInstrumentationScope, instrumentationScope.Name()) && instrumentationScope.Name() != "" {
attr.PutStr(instrumentationScopeNameKey, instrumentationScope.Name())
Expand Down
100 changes: 90 additions & 10 deletions connector/spanmetricsconnector/connector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@ const (

sampleRegion = "us-east-1"
sampleDuration = float64(11)

instanceID = "0044953a-2946-449f-a5c8-2971f2a63928"
)

// metricID represents the minimum attributes that uniquely identifies a metric in our tests.
Expand Down Expand Up @@ -490,7 +492,7 @@ func newConnectorImp(defaultNullValue *string, histogramConfig func() HistogramC
MetricsFlushInterval: time.Nanosecond,
}

c, err := newConnector(zap.NewNop(), cfg, clock)
c, err := newConnector(zap.NewNop(), cfg, clock, instanceID)
if err != nil {
return nil, err
}
Expand All @@ -505,7 +507,7 @@ func stringp(str string) *string {
func TestBuildKeySameServiceNameCharSequence(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -525,7 +527,7 @@ func TestBuildKeyExcludeDimensionsAll(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ExcludeDimensions = []string{"span.kind", "service.name", "span.name", "status.code"}
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -538,7 +540,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.ExcludeDimensions = []string{"span.kind", "service.name.wrong.name", "span.name", "status.code"}
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)

span0 := ptrace.NewSpan()
Expand All @@ -550,7 +552,7 @@ func TestBuildKeyExcludeWrongDimensions(t *testing.T) {
func TestBuildKeyWithDimensions(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)

defaultFoo := pcommon.NewValueStr("bar")
Expand Down Expand Up @@ -691,7 +693,7 @@ func TestConnectorCapabilities(t *testing.T) {
cfg := factory.CreateDefaultConfig().(*Config)

// Test
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
// Override the default no-op consumer for testing.
c.metricsConsumer = new(consumertest.MetricsSink)
assert.NoError(t, err)
Expand Down Expand Up @@ -1521,7 +1523,7 @@ func TestSpanMetrics_Events(t *testing.T) {
factory := NewFactory()
cfg := factory.CreateDefaultConfig().(*Config)
cfg.Events = tt.eventsConfig
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)
err = c.ConsumeTraces(t.Context(), buildSampleTrace())
require.NoError(t, err)
Expand Down Expand Up @@ -1875,7 +1877,7 @@ func TestSeparateDimensions(t *testing.T) {
cfg.Dimensions = []Dimension{{Name: stringAttrName, Default: nil}}
cfg.CallsDimensions = []Dimension{{Name: intAttrName, Default: stringp("0")}}
cfg.Histogram.Dimensions = []Dimension{{Name: doubleAttrName, Default: stringp("0.0")}}
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
c, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)
err = c.ConsumeTraces(t.Context(), buildSampleTrace())
require.NoError(t, err)
Expand Down Expand Up @@ -2032,7 +2034,7 @@ func TestConnectorWithCardinalityLimit(t *testing.T) {
{Name: "region"},
}

connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)

require.NotNil(t, connector)
Expand Down Expand Up @@ -2170,7 +2172,7 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
{Name: "event.name"},
}

connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock())
connector, err := newConnector(zaptest.NewLogger(t), cfg, clockwork.NewFakeClock(), instanceID)
require.NoError(t, err)
require.NotNil(t, connector)

Expand Down Expand Up @@ -2253,3 +2255,81 @@ func TestConnectorWithCardinalityLimitForEvents(t *testing.T) {
assert.Equal(t, 2, normalCount, "expected 2 normal metrics")
assert.Equal(t, 1, overflowCount, "expected 1 overflow metric")
}

func TestBuildAttributesWithFeatureGate(t *testing.T) {
tests := []struct {
name string
instrumentationScope pcommon.InstrumentationScope
config Config
want map[string]string
includeCollectorInstanceID bool
}{
{
name: "disable includeCollectorInstanceID feature-gate",
instrumentationScope: func() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName("express")
scope.SetVersion("1.0.0")
return scope
}(),
config: Config{
IncludeInstrumentationScope: []string{"express"},
},
want: map[string]string{
serviceNameKey: "test_service",
spanNameKey: "test_span",
spanKindKey: "SPAN_KIND_INTERNAL",
statusCodeKey: "STATUS_CODE_UNSET",
instrumentationScopeNameKey: "express",
instrumentationScopeVersionKey: "1.0.0",
},
},
{
name: "enable includeCollectorInstanceID feature-gate",
instrumentationScope: func() pcommon.InstrumentationScope {
scope := pcommon.NewInstrumentationScope()
scope.SetName("express")
scope.SetVersion("1.0.0")
return scope
}(),
config: Config{
IncludeInstrumentationScope: []string{"express"},
},
want: map[string]string{
serviceNameKey: "test_service",
spanNameKey: "test_span",
spanKindKey: "SPAN_KIND_INTERNAL",
statusCodeKey: "STATUS_CODE_UNSET",
instrumentationScopeNameKey: "express",
instrumentationScopeVersionKey: "1.0.0",
collectorInstanceKey: instanceID,
},
includeCollectorInstanceID: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := &connectorImp{config: tt.config, instanceID: instanceID}
if tt.includeCollectorInstanceID {
require.NoError(t, featuregate.GlobalRegistry().Set(includeCollectorInstanceID.ID(), true))
}
defer func() {
require.NoError(t, featuregate.GlobalRegistry().Set(includeCollectorInstanceID.ID(), false))
}()

span := ptrace.NewSpan()
span.SetName("test_span")
span.SetKind(ptrace.SpanKindInternal)

attrs := p.buildAttributes("test_service", span, pcommon.NewMap(), nil, tt.instrumentationScope)

assert.Equal(t, len(tt.want), attrs.Len())
for k, v := range tt.want {
val, ok := attrs.Get(k)
assert.True(t, ok)
assert.Equal(t, v, val.Str())
}
})
}
}
31 changes: 27 additions & 4 deletions connector/spanmetricsconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,21 +9,27 @@ import (
"context"
"time"

"github.com/google/uuid"
"github.com/jonboulle/clockwork"
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/featuregate"
"go.opentelemetry.io/collector/pdata/pcommon"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/spanmetricsconnector/internal/metadata"
)

const (
DefaultNamespace = "traces.span.metrics"
legacyMetricNamesFeatureGateID = "connector.spanmetrics.legacyMetricNames"
DefaultNamespace = "traces.span.metrics"
legacyMetricNamesFeatureGateID = "connector.spanmetrics.legacyMetricNames"
includeCollectorInstanceIDGateID = "connector.spanmetrics.includeCollectorInstanceID"
)

var legacyMetricNamesFeatureGate *featuregate.Gate
var (
legacyMetricNamesFeatureGate *featuregate.Gate
includeCollectorInstanceID *featuregate.Gate
)

func init() {
// TODO: Remove this feature gate when the legacy metric names are removed.
Expand All @@ -33,6 +39,12 @@ func init() {
featuregate.WithRegisterDescription("When enabled, connector uses legacy metric names."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/33227"),
)
includeCollectorInstanceID = featuregate.GlobalRegistry().MustRegister(
includeCollectorInstanceIDGateID,
featuregate.StageAlpha,
featuregate.WithRegisterDescription("When enabled, connector add collector.instance.id to default dimensions."),
featuregate.WithRegisterReferenceURL("https://github.com/open-telemetry/opentelemetry-collector-contrib/issues/40400"),
)
}

// NewFactory creates a factory for the spanmetrics connector.
Expand All @@ -59,7 +71,18 @@ func createDefaultConfig() component.Config {
}

func createTracesToMetricsConnector(ctx context.Context, params connector.Settings, cfg component.Config, nextConsumer consumer.Metrics) (connector.Traces, error) {
c, err := newConnector(params.Logger, cfg, clockwork.FromContext(ctx))
instanceID, ok := params.Resource.Attributes().Get(collectorInstanceKey)
// This never happens: the OpenTelemetry Collector automatically adds this attribute.
// See: https://github.com/open-telemetry/opentelemetry-collector/blob/main/service/internal/resource/config.go#L31
//
// The fallback logic below exists solely for lifecycle tests in generated_component_test.go,
// where the mocked telemetry setting does not include the service.instance.id attribute.
if !ok {
instanceUUID, _ := uuid.NewRandom()
instanceID = pcommon.NewValueStr(instanceUUID.String())
}

c, err := newConnector(params.Logger, cfg, clockwork.FromContext(ctx), instanceID.AsString())
if err != nil {
return nil, err
}
Expand Down
Loading
Loading