diff --git a/.chloggen/consumererror-downstream.yaml b/.chloggen/consumererror-downstream.yaml new file mode 100644 index 000000000000..6510e641d348 --- /dev/null +++ b/.chloggen/consumererror-downstream.yaml @@ -0,0 +1,31 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: consumererror + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add new "Downstream" error marker + +# One or more tracking issues or pull requests related to the change +issues: [13234] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This new error wrapper type indicates that the error returned by a component's + `Consume` method is not an internal failure of the component, but instead + was passed through from another component further downstream. + This is used internally by the new pipeline instrumentation feature to + determine the `outcome` of a component call. This wrapper is not intended to + be used by components directly. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/.chloggen/obsconsumer-downstream-refused.yaml b/.chloggen/obsconsumer-downstream-refused.yaml new file mode 100644 index 000000000000..ca644ac4e769 --- /dev/null +++ b/.chloggen/obsconsumer-downstream-refused.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: 'enhancement' + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: service + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: New pipeline instrumentation now differentiates internal failures from downstream errors + +# One or more tracking issues or pull requests related to the change +issues: [13234] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + With the telemetry.newPipelineTelemetry feature gate enabled, the "received" and "produced" + metrics related to a component now distinguish between two types of errors: + - "outcome = failure" indicates that the component returned an internal error; + - "outcome = refused" indicates that the component successfully emitted data, but returned an + error coming from a downstream component processing that data. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/consumer/consumererror/downstream.go b/consumer/consumererror/downstream.go new file mode 100644 index 000000000000..ab1581a474a6 --- /dev/null +++ b/consumer/consumererror/downstream.go @@ -0,0 +1,38 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror // import "go.opentelemetry.io/collector/consumer/consumererror" + +import "errors" + +type downstreamError struct { + inner error +} + +var _ error = downstreamError{} + +func (de downstreamError) Error() string { + return de.inner.Error() +} + +func (de downstreamError) Unwrap() error { + return de.inner +} + +// NewDownstream wraps an error to indicate that it is a downstream error, i.e. an +// error that does not come from the current component, but from one further downstream. +// This is used by pipeline instrumentation to determine whether an operation's outcome +// was an internal failure, or if it successfully produced data that was later refused. +// This wrapper is not intended to be used manually inside components. +func NewDownstream(err error) error { + return downstreamError{ + inner: err, + } +} + +// IsDownstream checks if an error was wrapped with the NewDownstream function, +// or if it contains one such error in its Unwrap() tree. +func IsDownstream(err error) bool { + var de downstreamError + return errors.As(err, &de) +} diff --git a/consumer/consumererror/downstream_test.go b/consumer/consumererror/downstream_test.go new file mode 100644 index 000000000000..6f4153e1a27e --- /dev/null +++ b/consumer/consumererror/downstream_test.go @@ -0,0 +1,44 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package consumererror + +import ( + "errors" + "testing" + + "github.com/stretchr/testify/assert" +) + +//nolint:testifylint // Testing properties of errors, no reason to use require +func TestDownstream(t *testing.T) { + err1 := errors.New("test error") + assert.False(t, IsDownstream(err1)) + err2 := errors.New("test error 2") + assert.False(t, IsDownstream(err2)) + + errDownstream1 := NewDownstream(err1) + assert.True(t, IsDownstream(errDownstream1)) + assert.Equal(t, err1.Error(), errDownstream1.Error()) + assert.ErrorIs(t, errDownstream1, err1) + assert.NotErrorIs(t, errDownstream1, err2) + + // we can access downstream wrappers through other wrappers + errWrapDownstream := NewRetryableError(errDownstream1) + assert.True(t, IsDownstream(errWrapDownstream)) + errorStruct := new(Error) + assert.ErrorAs(t, errWrapDownstream, &errorStruct) + + // we can access other wrappers through downstream wrappers + errDownstreamWrap := NewDownstream(NewRetryableError(err1)) + assert.True(t, IsDownstream(errDownstreamWrap)) + assert.ErrorAs(t, errDownstreamWrap, &errorStruct) + + // downstream + downstream = downstream + errJoin2 := errors.Join(errDownstream1, NewDownstream(err2)) + assert.True(t, IsDownstream(errJoin2)) + + // downstream + not downstream = downstream + errJoin1 := errors.Join(errDownstream1, err2) + assert.True(t, IsDownstream(errJoin1)) +} diff --git a/service/go.mod b/service/go.mod index 250433da94d4..7d2afc8e01d1 100644 --- a/service/go.mod +++ b/service/go.mod @@ -20,6 +20,7 @@ require ( go.opentelemetry.io/collector/connector/connectortest v0.130.1 go.opentelemetry.io/collector/connector/xconnector v0.130.1 go.opentelemetry.io/collector/consumer v1.36.1 + go.opentelemetry.io/collector/consumer/consumererror v0.130.1 go.opentelemetry.io/collector/consumer/consumertest v0.130.1 go.opentelemetry.io/collector/consumer/xconsumer v0.130.1 go.opentelemetry.io/collector/exporter v0.130.1 @@ -111,7 +112,6 @@ require ( go.opentelemetry.io/collector/config/configopaque v1.36.1 // indirect go.opentelemetry.io/collector/config/configoptional v0.130.1 // indirect go.opentelemetry.io/collector/config/configtls v1.36.1 // indirect - go.opentelemetry.io/collector/consumer/consumererror v0.130.1 // indirect go.opentelemetry.io/collector/extension/extensionauth v1.36.1 // indirect go.opentelemetry.io/collector/extension/extensionmiddleware v0.130.1 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect diff --git a/service/internal/obsconsumer/consumer_test.go b/service/internal/obsconsumer/consumer_test.go new file mode 100644 index 000000000000..0c12bc7cd023 --- /dev/null +++ b/service/internal/obsconsumer/consumer_test.go @@ -0,0 +1,198 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package obsconsumer_test + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + sdkmetric "go.opentelemetry.io/otel/sdk/metric" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/consumer/xconsumer" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pprofile" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/service/internal/obsconsumer" +) + +type failingConsumer struct { + err error +} + +var ( + _ consumer.Metrics = (*failingConsumer)(nil) + _ consumer.Logs = (*failingConsumer)(nil) + _ consumer.Traces = (*failingConsumer)(nil) + _ xconsumer.Profiles = (*failingConsumer)(nil) +) + +func (*failingConsumer) Capabilities() consumer.Capabilities { + return consumer.Capabilities{} +} + +func (fc *failingConsumer) ConsumeMetrics(_ context.Context, _ pmetric.Metrics) error { + return fc.err +} + +func (fc *failingConsumer) ConsumeLogs(_ context.Context, _ plog.Logs) error { + return fc.err +} + +func (fc *failingConsumer) ConsumeTraces(_ context.Context, _ ptrace.Traces) error { + return fc.err +} + +func (fc *failingConsumer) ConsumeProfiles(_ context.Context, _ pprofile.Profiles) error { + return fc.err +} + +func TestConsumeRefused(t *testing.T) { + setGateForTest(t, true) + + ctx := context.Background() + originalErr := errors.New("test error") + expectedErr := consumererror.NewDownstream(originalErr) + mockConsumer := &failingConsumer{err: originalErr} + + // Use delta temporality so sums don't accumulate across tests + reader := sdkmetric.NewManualReader(sdkmetric.WithTemporalitySelector(func(_ sdkmetric.InstrumentKind) metricdata.Temporality { + return metricdata.DeltaTemporality + })) + mp := sdkmetric.NewMeterProvider(sdkmetric.WithReader(reader)) + meter := mp.Meter("test") + + receivedItemsCounter, err := meter.Int64Counter("received.items") + require.NoError(t, err) + receivedSizeCounter, err := meter.Int64Counter("received.size") + require.NoError(t, err) + + producedItemsCounter, err := meter.Int64Counter("produced.items") + require.NoError(t, err) + producedSizeConter, err := meter.Int64Counter("produced.size") + require.NoError(t, err) + + type testCase struct { + name string + testConsumer func() error + } + + testCases := []testCase{ + { + name: "metrics", + testConsumer: func() error { + consumer1 := obsconsumer.NewMetrics(mockConsumer, receivedItemsCounter, receivedSizeCounter) + consumer2 := obsconsumer.NewMetrics(consumer1, producedItemsCounter, producedSizeConter) + md := pmetric.NewMetrics() + md.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + return consumer2.ConsumeMetrics(ctx, md) + }, + }, + { + name: "logs", + testConsumer: func() error { + consumer1 := obsconsumer.NewLogs(mockConsumer, receivedItemsCounter, receivedSizeCounter) + consumer2 := obsconsumer.NewLogs(consumer1, producedItemsCounter, producedSizeConter) + ld := plog.NewLogs() + ld.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty() + return consumer2.ConsumeLogs(ctx, ld) + }, + }, + { + name: "traces", + testConsumer: func() error { + consumer1 := obsconsumer.NewTraces(mockConsumer, receivedItemsCounter, receivedSizeCounter) + consumer2 := obsconsumer.NewTraces(consumer1, producedItemsCounter, producedSizeConter) + td := ptrace.NewTraces() + td.ResourceSpans().AppendEmpty().ScopeSpans().AppendEmpty().Spans().AppendEmpty() + return consumer2.ConsumeTraces(ctx, td) + }, + }, + { + name: "profiles", + testConsumer: func() error { + consumer1 := obsconsumer.NewProfiles(mockConsumer, receivedItemsCounter, receivedSizeCounter) + consumer2 := obsconsumer.NewProfiles(consumer1, producedItemsCounter, producedSizeConter) + pd := pprofile.NewProfiles() + pd.ResourceProfiles().AppendEmpty().ScopeProfiles().AppendEmpty().Profiles().AppendEmpty().Sample().AppendEmpty() + return consumer2.ConsumeProfiles(ctx, pd) + }, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + err := tc.testConsumer() + assert.Equal(t, expectedErr, err) + + var rm metricdata.ResourceMetrics + err = reader.Collect(ctx, &rm) + require.NoError(t, err) + require.Len(t, rm.ScopeMetrics, 1) + require.Len(t, rm.ScopeMetrics[0].Metrics, 4) + + var receivedItemMetric, receivedSizeMetric metricdata.Metrics + var producedItemMetric, producedSizeMetric metricdata.Metrics + for _, m := range rm.ScopeMetrics[0].Metrics { + switch m.Name { + case "received.items": + receivedItemMetric = m + case "received.size": + receivedSizeMetric = m + case "produced.items": + producedItemMetric = m + case "produced.size": + producedSizeMetric = m + } + } + require.NotNil(t, receivedItemMetric) + require.NotNil(t, receivedSizeMetric) + require.NotNil(t, producedItemMetric) + require.NotNil(t, producedSizeMetric) + + data := receivedItemMetric.Data.(metricdata.Sum[int64]) + require.Len(t, data.DataPoints, 1) + require.Equal(t, int64(1), data.DataPoints[0].Value) + attrs := data.DataPoints[0].Attributes + require.Equal(t, 1, attrs.Len()) + val, ok := attrs.Value(attribute.Key(obsconsumer.ComponentOutcome)) + require.True(t, ok) + require.Equal(t, "failure", val.Emit()) + + data = receivedSizeMetric.Data.(metricdata.Sum[int64]) + require.Len(t, data.DataPoints, 1) + require.Positive(t, data.DataPoints[0].Value) + attrs = data.DataPoints[0].Attributes + require.Equal(t, 1, attrs.Len()) + val, ok = attrs.Value(attribute.Key(obsconsumer.ComponentOutcome)) + require.True(t, ok) + require.Equal(t, "failure", val.Emit()) + + data = producedItemMetric.Data.(metricdata.Sum[int64]) + require.Len(t, data.DataPoints, 1) + require.Equal(t, int64(1), data.DataPoints[0].Value) + attrs = data.DataPoints[0].Attributes + require.Equal(t, 1, attrs.Len()) + val, ok = attrs.Value(attribute.Key(obsconsumer.ComponentOutcome)) + require.True(t, ok) + require.Equal(t, "refused", val.Emit()) + + data = producedSizeMetric.Data.(metricdata.Sum[int64]) + require.Len(t, data.DataPoints, 1) + require.Positive(t, data.DataPoints[0].Value) + attrs = data.DataPoints[0].Attributes + require.Equal(t, 1, attrs.Len()) + val, ok = attrs.Value(attribute.Key(obsconsumer.ComponentOutcome)) + require.True(t, ok) + require.Equal(t, "refused", val.Emit()) + }) + } +} diff --git a/service/internal/obsconsumer/logs.go b/service/internal/obsconsumer/logs.go index 53d129f94c01..8a0c9b91ee26 100644 --- a/service/internal/obsconsumer/logs.go +++ b/service/internal/obsconsumer/logs.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/pdata/plog" ) @@ -62,7 +63,12 @@ func (c obsLogs) ConsumeLogs(ctx context.Context, ld plog.Logs) error { err := c.consumer.ConsumeLogs(ctx, ld) if err != nil { - attrs = &c.withFailureAttrs + if consumererror.IsDownstream(err) { + attrs = &c.withRefusedAttrs + } else { + attrs = &c.withFailureAttrs + err = consumererror.NewDownstream(err) + } } return err } diff --git a/service/internal/obsconsumer/logs_test.go b/service/internal/obsconsumer/logs_test.go index ac56ea6da37c..336f9a4275b3 100644 --- a/service/internal/obsconsumer/logs_test.go +++ b/service/internal/obsconsumer/logs_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/service/internal/obsconsumer" @@ -161,6 +162,7 @@ func TestLogsConsumeFailure(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockLogsConsumer{err: expectedErr} reader := sdkmetric.NewManualReader() @@ -180,7 +182,7 @@ func TestLogsConsumeFailure(t *testing.T) { sl.LogRecords().AppendEmpty() err = consumer.ConsumeLogs(ctx, ld) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var rm metricdata.ResourceMetrics err = reader.Collect(ctx, &rm) @@ -298,6 +300,7 @@ func TestLogsMultipleItemsMixedOutcomes(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockLogsConsumer{} reader := sdkmetric.NewManualReader() @@ -328,7 +331,7 @@ func TestLogsMultipleItemsMixedOutcomes(t *testing.T) { sl := r.ScopeLogs().AppendEmpty() sl.LogRecords().AppendEmpty() err = consumer.ConsumeLogs(ctx, ld2) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) // Third batch: 2 successful items mockConsumer.err = nil @@ -348,7 +351,7 @@ func TestLogsMultipleItemsMixedOutcomes(t *testing.T) { sl = r.ScopeLogs().AppendEmpty() sl.LogRecords().AppendEmpty() err = consumer.ConsumeLogs(ctx, ld4) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var rm metricdata.ResourceMetrics err = reader.Collect(ctx, &rm) diff --git a/service/internal/obsconsumer/metrics.go b/service/internal/obsconsumer/metrics.go index 002e31c55068..309af21e5a6e 100644 --- a/service/internal/obsconsumer/metrics.go +++ b/service/internal/obsconsumer/metrics.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/pdata/pmetric" ) @@ -62,7 +63,12 @@ func (c obsMetrics) ConsumeMetrics(ctx context.Context, md pmetric.Metrics) erro err := c.consumer.ConsumeMetrics(ctx, md) if err != nil { - attrs = &c.withFailureAttrs + if consumererror.IsDownstream(err) { + attrs = &c.withRefusedAttrs + } else { + attrs = &c.withFailureAttrs + err = consumererror.NewDownstream(err) + } } return err } diff --git a/service/internal/obsconsumer/metrics_test.go b/service/internal/obsconsumer/metrics_test.go index f4473c37f935..e7258af7ef01 100644 --- a/service/internal/obsconsumer/metrics_test.go +++ b/service/internal/obsconsumer/metrics_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/service/internal/obsconsumer" @@ -162,6 +163,7 @@ func TestMetricsConsumeFailure(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockMetricsConsumer{err: expectedErr} reader := sdkmetric.NewManualReader() @@ -182,7 +184,7 @@ func TestMetricsConsumeFailure(t *testing.T) { m.SetEmptyGauge().DataPoints().AppendEmpty() err = consumer.ConsumeMetrics(ctx, md) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var metrics metricdata.ResourceMetrics err = reader.Collect(ctx, &metrics) @@ -303,6 +305,7 @@ func TestMetricsMultipleItemsMixedOutcomes(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockMetricsConsumer{} reader := sdkmetric.NewManualReader() @@ -335,7 +338,7 @@ func TestMetricsMultipleItemsMixedOutcomes(t *testing.T) { m := sm.Metrics().AppendEmpty() m.SetEmptyGauge().DataPoints().AppendEmpty() err = consumer.ConsumeMetrics(ctx, md2) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) // Third batch: 2 successful items mockConsumer.err = nil @@ -357,7 +360,7 @@ func TestMetricsMultipleItemsMixedOutcomes(t *testing.T) { m = sm.Metrics().AppendEmpty() m.SetEmptyGauge().DataPoints().AppendEmpty() err = consumer.ConsumeMetrics(ctx, md4) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var metrics metricdata.ResourceMetrics err = reader.Collect(ctx, &metrics) diff --git a/service/internal/obsconsumer/option.go b/service/internal/obsconsumer/option.go index 40f7c8a0f6e9..d146ead031f9 100644 --- a/service/internal/obsconsumer/option.go +++ b/service/internal/obsconsumer/option.go @@ -29,6 +29,7 @@ func WithStaticDataPointAttribute(attr attribute.KeyValue) Option { type compiledOptions struct { withSuccessAttrs metric.AddOption withFailureAttrs metric.AddOption + withRefusedAttrs metric.AddOption } func (o *options) compile() compiledOptions { @@ -40,8 +41,13 @@ func (o *options) compile() compiledOptions { failureAttrs = append(failureAttrs, attribute.String(ComponentOutcome, "failure")) failureAttrs = append(failureAttrs, o.staticDataPointAttributes...) + refusedAttrs := make([]attribute.KeyValue, 0, 1+len(o.staticDataPointAttributes)) + refusedAttrs = append(refusedAttrs, attribute.String(ComponentOutcome, "refused")) + refusedAttrs = append(refusedAttrs, o.staticDataPointAttributes...) + return compiledOptions{ withSuccessAttrs: metric.WithAttributes(successAttrs...), withFailureAttrs: metric.WithAttributes(failureAttrs...), + withRefusedAttrs: metric.WithAttributes(refusedAttrs...), } } diff --git a/service/internal/obsconsumer/profiles.go b/service/internal/obsconsumer/profiles.go index 950626906c1d..715eedd9925e 100644 --- a/service/internal/obsconsumer/profiles.go +++ b/service/internal/obsconsumer/profiles.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/xconsumer" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/pdata/pprofile" @@ -63,7 +64,12 @@ func (c obsProfiles) ConsumeProfiles(ctx context.Context, pd pprofile.Profiles) err := c.consumer.ConsumeProfiles(ctx, pd) if err != nil { - attrs = &c.withFailureAttrs + if consumererror.IsDownstream(err) { + attrs = &c.withRefusedAttrs + } else { + attrs = &c.withFailureAttrs + err = consumererror.NewDownstream(err) + } } return err } diff --git a/service/internal/obsconsumer/profiles_test.go b/service/internal/obsconsumer/profiles_test.go index 4b6d7b005edb..14488fc980b4 100644 --- a/service/internal/obsconsumer/profiles_test.go +++ b/service/internal/obsconsumer/profiles_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/service/internal/obsconsumer" @@ -162,6 +163,7 @@ func TestProfilesConsumeFailure(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockProfilesConsumer{err: expectedErr} reader := sdkmetric.NewManualReader() @@ -181,7 +183,7 @@ func TestProfilesConsumeFailure(t *testing.T) { sp.Profiles().AppendEmpty().Sample().AppendEmpty() err = consumer.ConsumeProfiles(ctx, pd) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var rm metricdata.ResourceMetrics err = reader.Collect(ctx, &rm) @@ -299,6 +301,7 @@ func TestProfilesMultipleItemsMixedOutcomes(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockProfilesConsumer{} reader := sdkmetric.NewManualReader() @@ -329,7 +332,7 @@ func TestProfilesMultipleItemsMixedOutcomes(t *testing.T) { sp := r.ScopeProfiles().AppendEmpty() sp.Profiles().AppendEmpty().Sample().AppendEmpty() err = consumer.ConsumeProfiles(ctx, pd2) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) // Third batch: 2 successful items mockConsumer.err = nil @@ -349,7 +352,7 @@ func TestProfilesMultipleItemsMixedOutcomes(t *testing.T) { sp = r.ScopeProfiles().AppendEmpty() sp.Profiles().AppendEmpty().Sample().AppendEmpty() err = consumer.ConsumeProfiles(ctx, pd4) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var rm metricdata.ResourceMetrics err = reader.Collect(ctx, &rm) diff --git a/service/internal/obsconsumer/traces.go b/service/internal/obsconsumer/traces.go index 23af6debb765..2b7482b23bd4 100644 --- a/service/internal/obsconsumer/traces.go +++ b/service/internal/obsconsumer/traces.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/otel/metric" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/internal/telemetry" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -62,7 +63,12 @@ func (c obsTraces) ConsumeTraces(ctx context.Context, td ptrace.Traces) error { err := c.consumer.ConsumeTraces(ctx, td) if err != nil { - attrs = &c.withFailureAttrs + if consumererror.IsDownstream(err) { + attrs = &c.withRefusedAttrs + } else { + attrs = &c.withFailureAttrs + err = consumererror.NewDownstream(err) + } } return err } diff --git a/service/internal/obsconsumer/traces_test.go b/service/internal/obsconsumer/traces_test.go index 2ed712713202..bd042e692cd3 100644 --- a/service/internal/obsconsumer/traces_test.go +++ b/service/internal/obsconsumer/traces_test.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/service/internal/obsconsumer" @@ -162,6 +163,7 @@ func TestTracesConsumeFailure(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockTracesConsumer{err: expectedErr} reader := sdkmetric.NewManualReader() @@ -181,7 +183,7 @@ func TestTracesConsumeFailure(t *testing.T) { ss.Spans().AppendEmpty() err = consumer.ConsumeTraces(ctx, td) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var rm metricdata.ResourceMetrics err = reader.Collect(ctx, &rm) @@ -299,6 +301,7 @@ func TestTracesMultipleItemsMixedOutcomes(t *testing.T) { ctx := context.Background() expectedErr := errors.New("test error") + downstreamErr := consumererror.NewDownstream(expectedErr) mockConsumer := &mockTracesConsumer{} reader := sdkmetric.NewManualReader() @@ -329,7 +332,7 @@ func TestTracesMultipleItemsMixedOutcomes(t *testing.T) { ss := r.ScopeSpans().AppendEmpty() ss.Spans().AppendEmpty() err = consumer.ConsumeTraces(ctx, td2) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) // Third batch: 2 successful items mockConsumer.err = nil @@ -349,7 +352,7 @@ func TestTracesMultipleItemsMixedOutcomes(t *testing.T) { ss = r.ScopeSpans().AppendEmpty() ss.Spans().AppendEmpty() err = consumer.ConsumeTraces(ctx, td4) - assert.Equal(t, expectedErr, err) + assert.Equal(t, downstreamErr, err) var rm metricdata.ResourceMetrics err = reader.Collect(ctx, &rm)