diff --git a/.chloggen/custom_receiverhelper.yaml b/.chloggen/custom_receiverhelper.yaml new file mode 100644 index 00000000000..17add4e981f --- /dev/null +++ b/.chloggen/custom_receiverhelper.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: receiverhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: New feature flag to make receiverhelper distinguish internal vs. downstream errors using new `otelcol_receiver_failed_x` and `otelcol_receiver_requests` metrics + +# One or more tracking issues or pull requests related to the change +issues: [12207, 12802] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This is a breaking change for the semantics of the otelcol_receiver_refused_metric_points, otelcol_receiver_refused_log_records and otelcol_receiver_refused_spans metrics. + These new metrics and semantics are enabled through the `receiverhelper.newReceiverMetrics` feature gate. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/otlpreceiver/otlp_test.go b/receiver/otlpreceiver/otlp_test.go index 092445d7641..bd36656a877 100644 --- a/receiver/otlpreceiver/otlp_test.go +++ b/receiver/otlpreceiver/otlp_test.go @@ -44,11 +44,13 @@ import ( "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/pmetric/pmetricotlp" "go.opentelemetry.io/collector/pdata/pprofile" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/pdata/testdata" "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/metadata" + "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" ) @@ -118,8 +120,8 @@ func TestJsonHttp(t *testing.T) { name: "Retryable GRPCError", encoding: "", contentType: "application/json", - err: status.New(codes.Unavailable, "").Err(), - expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + err: status.New(codes.Unavailable, "Service Unavailable").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "Service Unavailable"}, expectedStatusCode: http.StatusServiceUnavailable, }, } @@ -145,7 +147,8 @@ func TestJsonHttp(t *testing.T) { errStatus := &spb.Status{} require.NoError(t, json.Unmarshal(respBytes, errStatus)) if s, ok := status.FromError(tt.err); ok { - assert.True(t, proto.Equal(errStatus, s.Proto())) + assert.Equal(t, s.Proto().Code, errStatus.Code) + assert.Equal(t, s.Proto().Message, errStatus.Message) } else { fmt.Println(errStatus) assert.True(t, proto.Equal(errStatus, tt.expectedStatus)) @@ -365,15 +368,15 @@ func TestProtoHttp(t *testing.T) { { name: "Permanent GRPCError", encoding: "", - err: status.New(codes.InvalidArgument, "").Err(), - expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: ""}, + err: status.New(codes.InvalidArgument, "Bad Request").Err(), + expectedStatus: &spb.Status{Code: int32(codes.InvalidArgument), Message: "Bad Request"}, expectedStatusCode: http.StatusBadRequest, }, { name: "Retryable GRPCError", encoding: "", - err: status.New(codes.Unavailable, "").Err(), - expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: ""}, + err: status.New(codes.Unavailable, "Service Unavailable").Err(), + expectedStatus: &spb.Status{Code: int32(codes.Unavailable), Message: "Service Unavailable"}, expectedStatusCode: http.StatusServiceUnavailable, }, } @@ -553,6 +556,45 @@ func TestHTTPNewPortAlreadyUsed(t *testing.T) { require.Error(t, r.Start(context.Background(), componenttest.NewNopHost())) } +// TestOTLPReceiverGRPCMetricsIngestTest checks that the metrics receiver +// is returning the proper response (return and metrics) when the next consumer +// in the pipeline reports error. +func TestOTLPReceiverGRPCMetricsIngestTest(t *testing.T) { + // Get a new available port + addr := testutil.GetAvailableLocalAddress(t) + + // Create a sink + sink := &errOrSinkConsumer{MetricsSink: new(consumertest.MetricsSink)} + + // Create a telemetry instance + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + // Create telemetry settings + settings := tt.NewTelemetrySettings() + + recv := newGRPCReceiver(t, settings, addr, sink) + require.NotNil(t, recv) + require.NoError(t, recv.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, recv.Shutdown(context.Background())) }) + + cc, err := grpc.NewClient(addr, grpc.WithTransportCredentials(insecure.NewCredentials())) + require.NoError(t, err) + defer func() { + assert.NoError(t, cc.Close()) + }() + // Set up the error case + sink.SetConsumeError(errors.New("consumer error")) + + md := testdata.GenerateMetrics(1) + _, err = pmetricotlp.NewGRPCClient(cc).Export(context.Background(), pmetricotlp.NewExportRequestFromMetrics(md)) + errStatus, ok := status.FromError(err) + require.True(t, ok) + assert.Equal(t, codes.Unavailable, errStatus.Code()) + + // Assert receiver metrics including receiver_requests + assertReceiverMetrics(t, tt, otlpReceiverID, "grpc", 0, 2) +} + // TestOTLPReceiverGRPCTracesIngestTest checks that the gRPC trace receiver // is returning the proper response (return and metrics) when the next consumer // in the pipeline reports error. The test changes the responses returned by the @@ -1262,8 +1304,41 @@ func (esc *errOrSinkConsumer) checkData(t *testing.T, data any, dataLen int) { } } -func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, refused int64) { - got, err := tt.GetMetric("otelcol_receiver_accepted_spans") +func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, rejected int64) { + var refused, failed int64 + var outcome string + gateEnabled := receiverhelper.NewReceiverMetricsGate.IsEnabled() + // The errors in the OTLP tests are not downstream, so they should be "failed" when the gate is enabled. + if gateEnabled { + failed = rejected + outcome = "failure" + } else { + // When the gate is disabled, all errors are "refused". + refused = rejected + } + + got, err := tt.GetMetric("otelcol_receiver_failed_spans") + require.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_failed_spans", + Description: "The number of spans that failed to be processed by the receiver due to internal errors. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport)), + Value: failed, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_accepted_spans") require.NoError(t, err) metricdatatest.AssertEqual(t, metricdata.Metrics{ @@ -1304,4 +1379,165 @@ func assertReceiverTraces(t *testing.T, tt *componenttest.Telemetry, id componen }, }, }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Assert receiver_requests metric + if gateEnabled { + got, err := tt.GetMetric("otelcol_receiver_requests") + require.NoError(t, err) + + // Calculate expected requests based on accepted and refused counts + var expectedRequests []metricdata.DataPoint[int64] + if accepted > 0 { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport), + attribute.String("outcome", "success")), + Value: accepted, + }) + } + if rejected > 0 { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport), + attribute.String("outcome", outcome)), + Value: rejected, + }) + } + + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_requests", + Description: "The number of requests performed.", + Unit: "{requests}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: expectedRequests, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } else { + _, err := tt.GetMetric("otelcol_receiver_requests") + require.Error(t, err) + } +} + +func assertReceiverMetrics(t *testing.T, tt *componenttest.Telemetry, id component.ID, transport string, accepted, rejected int64) { + var refused, failed int64 + var outcome string + gateEnabled := receiverhelper.NewReceiverMetricsGate.IsEnabled() + // The error used in the metrics test is not downstream. + if gateEnabled { + failed = rejected + outcome = "failure" + } else { + // When the gate is disabled, all errors are "refused". + refused = rejected + } + + got, err := tt.GetMetric("otelcol_receiver_failed_metric_points") + require.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_failed_metric_points", + Description: "The number of metric points that failed to be processed by the receiver due to internal errors. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport)), + Value: failed, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_accepted_metric_points") + require.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_metric_points", + Description: "Number of metric points successfully pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport)), + Value: accepted, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_refused_metric_points") + require.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_metric_points", + Description: "Number of metric points that could not be pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport)), + Value: refused, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Assert receiver_requests metric + if gateEnabled { + got, err := tt.GetMetric("otelcol_receiver_requests") + require.NoError(t, err) + + // Calculate expected requests based on accepted and refused counts + var expectedRequests []metricdata.DataPoint[int64] + if accepted > 0 { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport), + attribute.String("outcome", "success")), + Value: accepted, + }) + } + if rejected > 0 { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", transport), + attribute.String("outcome", outcome)), + Value: 1, // One request failed + }) + } + + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_requests", + Description: "The number of requests performed.", + Unit: "{requests}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: expectedRequests, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } else { + _, err := tt.GetMetric("otelcol_receiver_requests") + require.Error(t, err) + } } diff --git a/receiver/otlpreceiver/otlphttp_test.go b/receiver/otlpreceiver/otlphttp_test.go index eb70c487866..f0639297b50 100644 --- a/receiver/otlpreceiver/otlphttp_test.go +++ b/receiver/otlpreceiver/otlphttp_test.go @@ -23,6 +23,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/internal/testutil" "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" + "go.opentelemetry.io/collector/receiver/otlpreceiver/internal/errors" ) func TestHttpRetryAfter(t *testing.T) { @@ -126,7 +127,10 @@ func TestHttpRetryAfter(t *testing.T) { } else { errStatus := &spb.Status{} require.NoError(t, proto.Unmarshal(respBytes, errStatus)) - s, ok := status.FromError(tt.err) + // The HTTP receiver transforms errors through GetStatusFromError + // We need to get the expected transformed error, not the original + expectedErr := errors.GetStatusFromError(tt.err) + s, ok := status.FromError(expectedErr) require.True(t, ok) assert.True(t, proto.Equal(errStatus, s.Proto())) } diff --git a/receiver/receiverhelper/documentation.md b/receiver/receiverhelper/documentation.md index 4f88fd0956d..79b7b847827 100644 --- a/receiver/receiverhelper/documentation.md +++ b/receiver/receiverhelper/documentation.md @@ -30,6 +30,30 @@ Number of spans successfully pushed into the pipeline. [alpha] | ---- | ----------- | ---------- | --------- | | {spans} | Sum | Int | true | +### otelcol_receiver_failed_log_records + +The number of log records that failed to be processed by the receiver due to internal errors. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {records} | Sum | Int | true | + +### otelcol_receiver_failed_metric_points + +The number of metric points that failed to be processed by the receiver due to internal errors. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {datapoints} | Sum | Int | true | + +### otelcol_receiver_failed_spans + +The number of spans that failed to be processed by the receiver due to internal errors. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {spans} | Sum | Int | true | + ### otelcol_receiver_refused_log_records Number of log records that could not be pushed into the pipeline. [alpha] @@ -53,3 +77,17 @@ Number of spans that could not be pushed into the pipeline. [alpha] | Unit | Metric Type | Value Type | Monotonic | | ---- | ----------- | ---------- | --------- | | {spans} | Sum | Int | true | + +### otelcol_receiver_requests + +The number of requests performed. [alpha] + +| Unit | Metric Type | Value Type | Monotonic | +| ---- | ----------- | ---------- | --------- | +| {requests} | Sum | Int | true | + +#### Attributes + +| Name | Description | Values | +| ---- | ----------- | ------ | +| outcome | The outcome of receiver requests | Str: ``success``, ``refused``, ``failure`` | diff --git a/receiver/receiverhelper/featuregates.go b/receiver/receiverhelper/featuregates.go new file mode 100644 index 00000000000..3125fc7c742 --- /dev/null +++ b/receiver/receiverhelper/featuregates.go @@ -0,0 +1,14 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package receiverhelper // import "go.opentelemetry.io/collector/receiver/receiverhelper" + +import "go.opentelemetry.io/collector/featuregate" + +// NewReceiverMetricsGate is the feature gate that controls whether to distinguish downstream errors from internal errors in pipeline telemetry. +var NewReceiverMetricsGate = featuregate.GlobalRegistry().MustRegister( + "receiverhelper.newReceiverMetrics", + featuregate.StageAlpha, + featuregate.WithRegisterFromVersion("v0.138.0"), + featuregate.WithRegisterDescription("Controls whether receivers emit new metrics and span attributes to distinguish downstream errors from internal errors. This is a breaking change for the semantics of the otelcol_receiver_refused_metric_points, otelcol_receiver_refused_log_records and otelcol_receiver_refused_spans."), +) diff --git a/receiver/receiverhelper/go.mod b/receiver/receiverhelper/go.mod index 82864a0050f..59a41355c12 100644 --- a/receiver/receiverhelper/go.mod +++ b/receiver/receiverhelper/go.mod @@ -6,6 +6,8 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.38.0 go.opentelemetry.io/collector/component/componenttest v0.132.0 + go.opentelemetry.io/collector/consumer/consumererror v0.132.0 + go.opentelemetry.io/collector/featuregate v1.38.0 go.opentelemetry.io/collector/pipeline v1.38.0 go.opentelemetry.io/collector/receiver v1.38.0 go.opentelemetry.io/otel v1.37.0 @@ -28,9 +30,9 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/consumer v1.38.0 // indirect - go.opentelemetry.io/collector/featuregate v1.38.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.132.0 // indirect go.opentelemetry.io/collector/pdata v1.38.0 // indirect + go.opentelemetry.io/collector/pdata/pprofile v0.132.0 // indirect go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect go.opentelemetry.io/otel/log v0.13.0 // indirect go.opentelemetry.io/otel/sdk v1.37.0 // indirect @@ -68,3 +70,5 @@ replace go.opentelemetry.io/collector/pipeline => ../../pipeline replace go.opentelemetry.io/collector/internal/telemetry => ../../internal/telemetry replace go.opentelemetry.io/collector/featuregate => ../../featuregate + +replace go.opentelemetry.io/collector/consumer/consumererror => ../../consumer/consumererror diff --git a/receiver/receiverhelper/internal/metadata/generated_telemetry.go b/receiver/receiverhelper/internal/metadata/generated_telemetry.go index e3db30b5089..962bc2e90fe 100644 --- a/receiver/receiverhelper/internal/metadata/generated_telemetry.go +++ b/receiver/receiverhelper/internal/metadata/generated_telemetry.go @@ -29,9 +29,13 @@ type TelemetryBuilder struct { ReceiverAcceptedLogRecords metric.Int64Counter ReceiverAcceptedMetricPoints metric.Int64Counter ReceiverAcceptedSpans metric.Int64Counter + ReceiverFailedLogRecords metric.Int64Counter + ReceiverFailedMetricPoints metric.Int64Counter + ReceiverFailedSpans metric.Int64Counter ReceiverRefusedLogRecords metric.Int64Counter ReceiverRefusedMetricPoints metric.Int64Counter ReceiverRefusedSpans metric.Int64Counter + ReceiverRequests metric.Int64Counter } // TelemetryBuilderOption applies changes to default builder. @@ -81,6 +85,24 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) + builder.ReceiverFailedLogRecords, err = builder.meter.Int64Counter( + "otelcol_receiver_failed_log_records", + metric.WithDescription("The number of log records that failed to be processed by the receiver due to internal errors. [alpha]"), + metric.WithUnit("{records}"), + ) + errs = errors.Join(errs, err) + builder.ReceiverFailedMetricPoints, err = builder.meter.Int64Counter( + "otelcol_receiver_failed_metric_points", + metric.WithDescription("The number of metric points that failed to be processed by the receiver due to internal errors. [alpha]"), + metric.WithUnit("{datapoints}"), + ) + errs = errors.Join(errs, err) + builder.ReceiverFailedSpans, err = builder.meter.Int64Counter( + "otelcol_receiver_failed_spans", + metric.WithDescription("The number of spans that failed to be processed by the receiver due to internal errors. [alpha]"), + metric.WithUnit("{spans}"), + ) + errs = errors.Join(errs, err) builder.ReceiverRefusedLogRecords, err = builder.meter.Int64Counter( "otelcol_receiver_refused_log_records", metric.WithDescription("Number of log records that could not be pushed into the pipeline. [alpha]"), @@ -99,5 +121,11 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) + builder.ReceiverRequests, err = builder.meter.Int64Counter( + "otelcol_receiver_requests", + metric.WithDescription("The number of requests performed. [alpha]"), + metric.WithUnit("{requests}"), + ) + errs = errors.Join(errs, err) return &builder, errs } diff --git a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go index 29010d9663f..199c88c22e8 100644 --- a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go +++ b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest.go @@ -60,6 +60,54 @@ func AssertEqualReceiverAcceptedSpans(t *testing.T, tt *componenttest.Telemetry, metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualReceiverFailedLogRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_failed_log_records", + Description: "The number of log records that failed to be processed by the receiver due to internal errors. [alpha]", + Unit: "{records}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_failed_log_records") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualReceiverFailedMetricPoints(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_failed_metric_points", + Description: "The number of metric points that failed to be processed by the receiver due to internal errors. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_failed_metric_points") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualReceiverFailedSpans(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_failed_spans", + Description: "The number of spans that failed to be processed by the receiver due to internal errors. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_failed_spans") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualReceiverRefusedLogRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_receiver_refused_log_records", @@ -107,3 +155,19 @@ func AssertEqualReceiverRefusedSpans(t *testing.T, tt *componenttest.Telemetry, require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } + +func AssertEqualReceiverRequests(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_receiver_requests", + Description: "The number of requests performed. [alpha]", + Unit: "{requests}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_receiver_requests") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} diff --git a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go index 05d2b3a669d..e34c4eceb60 100644 --- a/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go +++ b/receiver/receiverhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -22,9 +22,13 @@ func TestSetupTelemetry(t *testing.T) { tb.ReceiverAcceptedLogRecords.Add(context.Background(), 1) tb.ReceiverAcceptedMetricPoints.Add(context.Background(), 1) tb.ReceiverAcceptedSpans.Add(context.Background(), 1) + tb.ReceiverFailedLogRecords.Add(context.Background(), 1) + tb.ReceiverFailedMetricPoints.Add(context.Background(), 1) + tb.ReceiverFailedSpans.Add(context.Background(), 1) tb.ReceiverRefusedLogRecords.Add(context.Background(), 1) tb.ReceiverRefusedMetricPoints.Add(context.Background(), 1) tb.ReceiverRefusedSpans.Add(context.Background(), 1) + tb.ReceiverRequests.Add(context.Background(), 1) AssertEqualReceiverAcceptedLogRecords(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) @@ -34,6 +38,15 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualReceiverAcceptedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverFailedLogRecords(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverFailedMetricPoints(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverFailedSpans(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualReceiverRefusedLogRecords(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) @@ -43,6 +56,9 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualReceiverRefusedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualReceiverRequests(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) require.NoError(t, testTel.Shutdown(context.Background())) } diff --git a/receiver/receiverhelper/internal/obsmetrics.go b/receiver/receiverhelper/internal/obsmetrics.go index 3b90592b6b6..c44aa2df47a 100644 --- a/receiver/receiverhelper/internal/obsmetrics.go +++ b/receiver/receiverhelper/internal/obsmetrics.go @@ -18,12 +18,16 @@ const ( AcceptedSpansKey = "accepted_spans" // RefusedSpansKey used to identify spans refused (ie.: not ingested) by the Collector. RefusedSpansKey = "refused_spans" + // FailedSpansKey used to identify spans failed to be processed by the Collector. + FailedSpansKey = "failed_spans" // AcceptedMetricPointsKey used to identify metric points accepted by the Collector. AcceptedMetricPointsKey = "accepted_metric_points" // RefusedMetricPointsKey used to identify metric points refused (ie.: not ingested) by the // Collector. RefusedMetricPointsKey = "refused_metric_points" + // FailedMetricPointKey used to identify metric points failed to be processed by the Collector. + FailedMetricPointsKey = "failed_metric_points" // AcceptedLogRecordsKey used to identify log records accepted by the Collector. AcceptedLogRecordsKey = "accepted_log_records" @@ -31,6 +35,9 @@ const ( // Collector. RefusedLogRecordsKey = "refused_log_records" + // FailedLogRecordsKey used to identify log records failed to be processed by the Collector. + FailedLogRecordsKey = "failed_log_records" + ReceiveTraceDataOperationSuffix = SpanNameSep + "TraceDataReceived" ReceiverMetricsOperationSuffix = SpanNameSep + "MetricsReceived" ReceiverLogsOperationSuffix = SpanNameSep + "LogsReceived" diff --git a/receiver/receiverhelper/metadata.yaml b/receiver/receiverhelper/metadata.yaml index 7232c8559b9..5d19ac537d6 100644 --- a/receiver/receiverhelper/metadata.yaml +++ b/receiver/receiverhelper/metadata.yaml @@ -1,12 +1,10 @@ type: receiverhelper github_project: open-telemetry/opentelemetry-collector - status: disable_codecov_badge: true class: pkg stability: - beta: [traces, metrics, logs] - + beta: [metrics, traces, logs] telemetry: metrics: receiver_accepted_spans: @@ -18,7 +16,6 @@ telemetry: sum: value_type: int monotonic: true - receiver_refused_spans: enabled: true stability: @@ -28,17 +25,24 @@ telemetry: sum: value_type: int monotonic: true - - receiver_accepted_metric_points: + receiver_failed_spans: enabled: true stability: level: alpha + description: The number of spans that failed to be processed by the receiver due to internal errors. + unit: "{spans}" + sum: + value_type: int + monotonic: true + receiver_accepted_metric_points: + stability: + level: alpha + enabled: true description: Number of metric points successfully pushed into the pipeline. unit: "{datapoints}" sum: value_type: int monotonic: true - receiver_refused_metric_points: enabled: true stability: @@ -48,7 +52,15 @@ telemetry: sum: value_type: int monotonic: true - + receiver_failed_metric_points: + enabled: true + stability: + level: alpha + description: The number of metric points that failed to be processed by the receiver due to internal errors. + unit: "{datapoints}" + sum: + value_type: int + monotonic: true receiver_accepted_log_records: enabled: true stability: @@ -58,7 +70,6 @@ telemetry: sum: value_type: int monotonic: true - receiver_refused_log_records: enabled: true stability: @@ -68,3 +79,31 @@ telemetry: sum: value_type: int monotonic: true + receiver_failed_log_records: + enabled: true + stability: + level: alpha + description: The number of log records that failed to be processed by the receiver due to internal errors. + unit: "{records}" + sum: + value_type: int + monotonic: true + receiver_requests: + enabled: true + stability: + level: alpha + description: The number of requests performed. + unit: "{requests}" + sum: + value_type: int + monotonic: true + attributes: + - outcome +attributes: + outcome: + description: The outcome of receiver requests + type: string + enum: + - success + - refused + - failure diff --git a/receiver/receiverhelper/obsreport.go b/receiver/receiverhelper/obsreport.go index 7c3bfb34d78..eb7fa68043a 100644 --- a/receiver/receiverhelper/obsreport.go +++ b/receiver/receiverhelper/obsreport.go @@ -14,6 +14,7 @@ import ( "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pipeline" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper/internal" @@ -160,34 +161,63 @@ func (rec *ObsReport) endOp( ) { numAccepted := numReceivedItems numRefused := 0 + numFailedErrors := 0 if err != nil { numAccepted = 0 - numRefused = numReceivedItems + // If gate is enabled, we distinguish between refused and failed. + if NewReceiverMetricsGate.IsEnabled() { + if consumererror.IsDownstream(err) { + numRefused = numReceivedItems + } else { + numFailedErrors = numReceivedItems + } + } else { + // When the gate is disabled, all errors are considered "refused". + numRefused = numReceivedItems + } } span := trace.SpanFromContext(receiverCtx) - rec.recordMetrics(receiverCtx, signal, numAccepted, numRefused) + rec.recordMetrics(receiverCtx, signal, numAccepted, numRefused, numFailedErrors) + + // The new otelcol_receiver_requests metric is only emitted when the feature gate is enabled. + if NewReceiverMetricsGate.IsEnabled() { + var outcome string + switch { + case err == nil: + outcome = "success" + case consumererror.IsDownstream(err): + outcome = "refused" + default: + outcome = "failure" + } + rec.telemetryBuilder.ReceiverRequests.Add(receiverCtx, 1, rec.otelAttrs, metric.WithAttributeSet(attribute.NewSet(attribute.String("outcome", outcome)))) + } // end span according to errors if span.IsRecording() { - var acceptedItemsKey, refusedItemsKey string + var acceptedItemsKey, refusedItemsKey, failedItemsKey string switch signal { case pipeline.SignalTraces: acceptedItemsKey = internal.AcceptedSpansKey refusedItemsKey = internal.RefusedSpansKey + failedItemsKey = internal.FailedSpansKey case pipeline.SignalMetrics: acceptedItemsKey = internal.AcceptedMetricPointsKey refusedItemsKey = internal.RefusedMetricPointsKey + failedItemsKey = internal.FailedMetricPointsKey case pipeline.SignalLogs: acceptedItemsKey = internal.AcceptedLogRecordsKey refusedItemsKey = internal.RefusedLogRecordsKey + failedItemsKey = internal.FailedLogRecordsKey } span.SetAttributes( attribute.String(internal.FormatKey, format), attribute.Int64(acceptedItemsKey, int64(numAccepted)), attribute.Int64(refusedItemsKey, int64(numRefused)), + attribute.Int64(failedItemsKey, int64(numFailedErrors)), ) if err != nil { span.SetStatus(codes.Error, err.Error()) @@ -196,20 +226,24 @@ func (rec *ObsReport) endOp( span.End() } -func (rec *ObsReport) recordMetrics(receiverCtx context.Context, signal pipeline.Signal, numAccepted, numRefused int) { - var acceptedMeasure, refusedMeasure metric.Int64Counter +func (rec *ObsReport) recordMetrics(receiverCtx context.Context, signal pipeline.Signal, numAccepted, numRefused, numFailedErrors int) { + var acceptedMeasure, refusedMeasure, failedMeasure metric.Int64Counter switch signal { case pipeline.SignalTraces: acceptedMeasure = rec.telemetryBuilder.ReceiverAcceptedSpans refusedMeasure = rec.telemetryBuilder.ReceiverRefusedSpans + failedMeasure = rec.telemetryBuilder.ReceiverFailedSpans case pipeline.SignalMetrics: acceptedMeasure = rec.telemetryBuilder.ReceiverAcceptedMetricPoints refusedMeasure = rec.telemetryBuilder.ReceiverRefusedMetricPoints + failedMeasure = rec.telemetryBuilder.ReceiverFailedMetricPoints case pipeline.SignalLogs: acceptedMeasure = rec.telemetryBuilder.ReceiverAcceptedLogRecords refusedMeasure = rec.telemetryBuilder.ReceiverRefusedLogRecords + failedMeasure = rec.telemetryBuilder.ReceiverFailedLogRecords } acceptedMeasure.Add(receiverCtx, int64(numAccepted), rec.otelAttrs) refusedMeasure.Add(receiverCtx, int64(numRefused), rec.otelAttrs) + failedMeasure.Add(receiverCtx, int64(numFailedErrors), rec.otelAttrs) } diff --git a/receiver/receiverhelper/obsreport_test.go b/receiver/receiverhelper/obsreport_test.go index 212805f363a..47a99981c98 100644 --- a/receiver/receiverhelper/obsreport_test.go +++ b/receiver/receiverhelper/obsreport_test.go @@ -17,6 +17,8 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/featuregate" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper/internal" "go.opentelemetry.io/collector/receiver/receiverhelper/internal/metadatatest" @@ -39,251 +41,440 @@ type testParams struct { } func TestReceiveTraceDataOp(t *testing.T) { - testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { - parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() - - params := []testParams{ - {items: 13, err: errFake}, - {items: 42, err: nil}, - } - for i, param := range params { - rec, err := newReceiver(ObsReportSettings{ - ReceiverID: receiverID, - Transport: transport, - ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, - }) - require.NoError(t, err) - ctx := rec.StartTracesOp(parentCtx) - assert.NotNil(t, ctx) - rec.EndTracesOp(ctx, format, params[i].items, param.err) - } - - spans := tt.SpanRecorder.Ended() - require.Len(t, spans, len(params)) - - var acceptedSpans, refusedSpans int - for i, span := range spans { - assert.Equal(t, "receiver/"+receiverID.String()+"/TraceDataReceived", span.Name()) - switch { - case params[i].err == nil: - acceptedSpans += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - refusedSpans += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected param: %v", params[i]) - } - } - - metadatatest.AssertEqualReceiverAcceptedSpans(t, tt, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String(internal.ReceiverKey, receiverID.String()), - attribute.String(internal.TransportKey, transport)), - Value: int64(acceptedSpans), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) - metadatatest.AssertEqualReceiverRefusedSpans(t, tt, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String(internal.ReceiverKey, receiverID.String()), - attribute.String(internal.TransportKey, transport)), - Value: int64(refusedSpans), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + originalState := NewReceiverMetricsGate.IsEnabled() + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), originalState)) }) + + for _, tc := range []struct { + name string + enabled bool + }{{"gate_enabled", true}, {"gate_disabled", false}} { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), tc.enabled)) + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 13, err: consumererror.NewDownstream(errFake)}, + {items: 42, err: nil}, + {items: 7, err: errors.New("non-downstream error")}, // Regular error to test numFailedErrors path + } + for i, param := range params { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + ctx := rec.StartTracesOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndTracesOp(ctx, format, params[i].items, param.err) + } + + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, len(params)) + + var acceptedSpans, refusedSpans, failedSpans int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiverID.String()+"/TraceDataReceived", span.Name()) + err := params[i].err + if err == nil { + acceptedSpans += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedSpansKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + } else { + isDownstream := consumererror.IsDownstream(err) + if !tc.enabled || (tc.enabled && isDownstream) { + refusedSpans += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedSpansKey, Value: attribute.Int64Value(0)}) + } else { + failedSpans += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + } + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, err.Error(), span.Status().Description) + } + } + + metadatatest.AssertEqualReceiverAcceptedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(acceptedSpans), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(refusedSpans), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(failedSpans), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Assert otelcol_receiver_requests metric with outcome attribute + if tc.enabled { + outcomes := make(map[string]int64) + for _, param := range params { + var outcome string + switch { + case param.err == nil: + outcome = "success" + case consumererror.IsDownstream(param.err): + outcome = "refused" + default: + outcome = "failure" + } + outcomes[outcome]++ + } + var expectedRequests []metricdata.DataPoint[int64] + for outcome, count := range outcomes { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport), + attribute.String("outcome", outcome)), + Value: count, + }) + } + metadatatest.AssertEqualReceiverRequests(t, tt, expectedRequests, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } + }) + }) + } } func TestReceiveLogsOp(t *testing.T) { - testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { - parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() - - params := []testParams{ - {items: 13, err: errFake}, - {items: 42, err: nil}, - } - for i, param := range params { - rec, err := newReceiver(ObsReportSettings{ - ReceiverID: receiverID, - Transport: transport, - ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, - }) - require.NoError(t, err) - - ctx := rec.StartLogsOp(parentCtx) - assert.NotNil(t, ctx) - rec.EndLogsOp(ctx, format, params[i].items, param.err) - } - - spans := tt.SpanRecorder.Ended() - require.Len(t, spans, len(params)) - - var acceptedLogRecords, refusedLogRecords int - for i, span := range spans { - assert.Equal(t, "receiver/"+receiverID.String()+"/LogsReceived", span.Name()) - switch { - case params[i].err == nil: - acceptedLogRecords += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedLogRecordsKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - refusedLogRecords += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedLogRecordsKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected param: %v", params[i]) - } - } - metadatatest.AssertEqualReceiverAcceptedLogRecords(t, tt, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String(internal.ReceiverKey, receiverID.String()), - attribute.String(internal.TransportKey, transport)), - Value: int64(acceptedLogRecords), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) - metadatatest.AssertEqualReceiverRefusedLogRecords(t, tt, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String(internal.ReceiverKey, receiverID.String()), - attribute.String(internal.TransportKey, transport)), - Value: int64(refusedLogRecords), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + originalState := NewReceiverMetricsGate.IsEnabled() + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), originalState)) }) + + for _, tc := range []struct { + name string + enabled bool + }{{"gate_enabled", true}, {"gate_disabled", false}} { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), tc.enabled)) + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 13, err: consumererror.NewDownstream(errFake)}, + {items: 42, err: nil}, + {items: 7, err: errors.New("non-downstream error")}, + } + for i, param := range params { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + ctx := rec.StartLogsOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndLogsOp(ctx, format, params[i].items, param.err) + } + + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, len(params)) + + var acceptedLogRecords, refusedLogRecords, failedLogRecords int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiverID.String()+"/LogsReceived", span.Name()) + err := params[i].err + if err == nil { + acceptedLogRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedLogRecordsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedLogRecordsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + } else { + isDownstream := consumererror.IsDownstream(err) + if !tc.enabled || (tc.enabled && isDownstream) { + refusedLogRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedLogRecordsKey, Value: attribute.Int64Value(0)}) + } else { + failedLogRecords += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedLogRecordsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedLogRecordsKey, Value: attribute.Int64Value(int64(params[i].items))}) + } + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedLogRecordsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, err.Error(), span.Status().Description) + } + } + metadatatest.AssertEqualReceiverAcceptedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(acceptedLogRecords), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(refusedLogRecords), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(failedLogRecords), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Assert otelcol_receiver_requests metric with outcome attribute + if tc.enabled { + outcomes := make(map[string]int64) + for _, param := range params { + var outcome string + switch { + case param.err == nil: + outcome = "success" + case consumererror.IsDownstream(param.err): + outcome = "refused" + default: + outcome = "failure" + } + outcomes[outcome]++ + } + var expectedRequests []metricdata.DataPoint[int64] + for outcome, count := range outcomes { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport), + attribute.String("outcome", outcome)), + Value: count, + }) + } + metadatatest.AssertEqualReceiverRequests(t, tt, expectedRequests, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } + }) + }) + } } func TestReceiveMetricsOp(t *testing.T) { - testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { - parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() - - params := []testParams{ - {items: 23, err: errFake}, - {items: 29, err: nil}, - } - for i, param := range params { - rec, err := newReceiver(ObsReportSettings{ - ReceiverID: receiverID, - Transport: transport, - ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, - }) - require.NoError(t, err) - - ctx := rec.StartMetricsOp(parentCtx) - assert.NotNil(t, ctx) - rec.EndMetricsOp(ctx, format, params[i].items, param.err) - } - - spans := tt.SpanRecorder.Ended() - require.Len(t, spans, len(params)) - - var acceptedMetricPoints, refusedMetricPoints int - for i, span := range spans { - assert.Equal(t, "receiver/"+receiverID.String()+"/MetricsReceived", span.Name()) - switch { - case params[i].err == nil: - acceptedMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - refusedMetricPoints += params[i].items - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedMetricPointsKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected param: %v", params[i]) - } - } - - metadatatest.AssertEqualReceiverAcceptedMetricPoints(t, tt, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String(internal.ReceiverKey, receiverID.String()), - attribute.String(internal.TransportKey, transport)), - Value: int64(acceptedMetricPoints), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) - metadatatest.AssertEqualReceiverRefusedMetricPoints(t, tt, - []metricdata.DataPoint[int64]{ - { - Attributes: attribute.NewSet( - attribute.String(internal.ReceiverKey, receiverID.String()), - attribute.String(internal.TransportKey, transport)), - Value: int64(refusedMetricPoints), - }, - }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + originalState := NewReceiverMetricsGate.IsEnabled() + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), originalState)) }) + + for _, tc := range []struct { + name string + enabled bool + }{{"gate_enabled", true}, {"gate_disabled", false}} { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), tc.enabled)) + testTelemetry(t, func(t *testing.T, tt *componenttest.Telemetry) { + parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 13, err: consumererror.NewDownstream(errFake)}, + {items: 42, err: nil}, + {items: 7, err: errors.New("non-downstream error")}, + } + for i, param := range params { + rec, err := newReceiver(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, err) + + ctx := rec.StartMetricsOp(parentCtx) + assert.NotNil(t, ctx) + rec.EndMetricsOp(ctx, format, params[i].items, param.err) + } + + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, len(params)) + + var acceptedMetricPoints, refusedMetricPoints, failedMetricPoints int + for i, span := range spans { + assert.Equal(t, "receiver/"+receiverID.String()+"/MetricsReceived", span.Name()) + err := params[i].err + if err == nil { + acceptedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedMetricPointsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + } else { + isDownstream := consumererror.IsDownstream(err) + if !tc.enabled || (tc.enabled && isDownstream) { + refusedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedMetricPointsKey, Value: attribute.Int64Value(0)}) + } else { + failedMetricPoints += params[i].items + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedMetricPointsKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedMetricPointsKey, Value: attribute.Int64Value(int64(params[i].items))}) + } + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedMetricPointsKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, err.Error(), span.Status().Description) + } + } + + metadatatest.AssertEqualReceiverAcceptedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(acceptedMetricPoints), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverRefusedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(refusedMetricPoints), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(failedMetricPoints), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Assert otelcol_receiver_requests metric with outcome attribute + if tc.enabled { + outcomes := make(map[string]int64) + for _, param := range params { + var outcome string + switch { + case param.err == nil: + outcome = "success" + case consumererror.IsDownstream(param.err): + outcome = "refused" + default: + outcome = "failure" + } + outcomes[outcome]++ + } + var expectedRequests []metricdata.DataPoint[int64] + for outcome, count := range outcomes { + expectedRequests = append(expectedRequests, metricdata.DataPoint[int64]{ + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport), + attribute.String("outcome", outcome)), + Value: count, + }) + } + metadatatest.AssertEqualReceiverRequests(t, tt, expectedRequests, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + } + }) + }) + } } func TestReceiveWithLongLivedCtx(t *testing.T) { - tt := componenttest.NewTelemetry() - t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + originalState := NewReceiverMetricsGate.IsEnabled() + t.Cleanup(func() { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), originalState)) + }) - longLivedCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) - defer parentSpan.End() + for _, tc := range []struct { + name string + enabled bool + }{{"gate_enabled", true}, {"gate_disabled", false}} { + t.Run(tc.name, func(t *testing.T) { + require.NoError(t, featuregate.GlobalRegistry().Set(NewReceiverMetricsGate.ID(), tc.enabled)) + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + longLivedCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + params := []testParams{ + {items: 17, err: nil}, + {items: 23, err: consumererror.NewDownstream(errFake)}, + } + for i := range params { + // Use a new context on each operation to simulate distinct operations + // under the same long lived context. + rec, rerr := NewObsReport(ObsReportSettings{ + ReceiverID: receiverID, + Transport: transport, + LongLivedCtx: true, + ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + }) + require.NoError(t, rerr) + ctx := rec.StartTracesOp(longLivedCtx) + assert.NotNil(t, ctx) + rec.EndTracesOp(ctx, format, params[i].items, params[i].err) + } - params := []testParams{ - {items: 17, err: nil}, - {items: 23, err: errFake}, - } - for i := range params { - // Use a new context on each operation to simulate distinct operations - // under the same long lived context. - rec, rerr := NewObsReport(ObsReportSettings{ - ReceiverID: receiverID, - Transport: transport, - LongLivedCtx: true, - ReceiverCreateSettings: receiver.Settings{ID: receiverID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, len(params)) + + for i, span := range spans { + assert.False(t, span.Parent().IsValid()) + require.Len(t, span.Links(), 1) + link := span.Links()[0] + assert.Equal(t, parentSpan.SpanContext().TraceID(), link.SpanContext.TraceID()) + assert.Equal(t, parentSpan.SpanContext().SpanID(), link.SpanContext.SpanID()) + assert.Equal(t, "receiver/"+receiverID.String()+"/TraceDataReceived", span.Name()) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.TransportKey, Value: attribute.StringValue(transport)}) + switch { + case params[i].err == nil: + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(0)}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedSpansKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Unset, span.Status().Code) + case consumererror.IsDownstream(params[i].err): + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(0)}) + // For downstream errors + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) + require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.FailedSpansKey, Value: attribute.Int64Value(0)}) + assert.Equal(t, codes.Error, span.Status().Code) + assert.Equal(t, params[i].err.Error(), span.Status().Description) + default: + t.Fatalf("unexpected error: %v", params[i].err) + } + } }) - require.NoError(t, rerr) - ctx := rec.StartTracesOp(longLivedCtx) - assert.NotNil(t, ctx) - rec.EndTracesOp(ctx, format, params[i].items, params[i].err) - } - - spans := tt.SpanRecorder.Ended() - require.Len(t, spans, len(params)) - - for i, span := range spans { - assert.False(t, span.Parent().IsValid()) - require.Len(t, span.Links(), 1) - link := span.Links()[0] - assert.Equal(t, parentSpan.SpanContext().TraceID(), link.SpanContext.TraceID()) - assert.Equal(t, parentSpan.SpanContext().SpanID(), link.SpanContext.SpanID()) - assert.Equal(t, "receiver/"+receiverID.String()+"/TraceDataReceived", span.Name()) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.TransportKey, Value: attribute.StringValue(transport)}) - switch { - case params[i].err == nil: - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(0)}) - assert.Equal(t, codes.Unset, span.Status().Code) - case errors.Is(params[i].err, errFake): - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.AcceptedSpansKey, Value: attribute.Int64Value(0)}) - require.Contains(t, span.Attributes(), attribute.KeyValue{Key: internal.RefusedSpansKey, Value: attribute.Int64Value(int64(params[i].items))}) - assert.Equal(t, codes.Error, span.Status().Code) - assert.Equal(t, params[i].err.Error(), span.Status().Description) - default: - t.Fatalf("unexpected error: %v", params[i].err) - } } } @@ -319,6 +510,15 @@ func TestCheckReceiverTracesViews(t *testing.T) { Value: int64(0), }, }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedSpans(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } func TestCheckReceiverMetricsViews(t *testing.T) { @@ -353,6 +553,15 @@ func TestCheckReceiverMetricsViews(t *testing.T) { Value: int64(0), }, }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } func TestCheckReceiverLogsViews(t *testing.T) { @@ -387,6 +596,15 @@ func TestCheckReceiverLogsViews(t *testing.T) { Value: int64(0), }, }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + metadatatest.AssertEqualReceiverFailedLogRecords(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ReceiverKey, receiverID.String()), + attribute.String(internal.TransportKey, transport)), + Value: int64(0), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } func testTelemetry(t *testing.T, testFunc func(t *testing.T, tt *componenttest.Telemetry)) {