diff --git a/receiver/awsxrayreceiver/go.mod b/receiver/awsxrayreceiver/go.mod index 5ff86e4c38eba..56b686cec35db 100644 --- a/receiver/awsxrayreceiver/go.mod +++ b/receiver/awsxrayreceiver/go.mod @@ -24,6 +24,8 @@ require ( go.opentelemetry.io/collector/receiver v0.119.0 go.opentelemetry.io/collector/receiver/receivertest v0.119.0 go.opentelemetry.io/collector/semconv v0.119.0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 ) @@ -57,10 +59,8 @@ require ( go.opentelemetry.io/collector/featuregate v1.25.0 // indirect go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect go.opentelemetry.io/otel/sdk v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.opentelemetry.io/otel/trace v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.34.0 // indirect diff --git a/receiver/awsxrayreceiver/internal/udppoller/poller_test.go b/receiver/awsxrayreceiver/internal/udppoller/poller_test.go index d5077e81f1f54..71b6ef65491da 100644 --- a/receiver/awsxrayreceiver/internal/udppoller/poller_test.go +++ b/receiver/awsxrayreceiver/internal/udppoller/poller_test.go @@ -21,6 +21,9 @@ import ( "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receiverhelper" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -145,7 +148,7 @@ func TestSuccessfullyPollPacket(t *testing.T) { } }, 10*time.Second, 5*time.Millisecond, "poller should return parsed segment") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 2, 0)) + assertReceiverTraces(t, tt, receiverID, 2, 0) } func TestIncompletePacketNoSeparator(t *testing.T) { @@ -175,7 +178,7 @@ func TestIncompletePacketNoSeparator(t *testing.T) { fmt.Sprintf("unable to split incoming data as header and segment, incoming bytes: %v", rawData)) == 0 }, 10*time.Second, 5*time.Millisecond, "poller should reject segment") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } func TestIncompletePacketNoBody(t *testing.T) { @@ -200,7 +203,7 @@ func TestIncompletePacketNoBody(t *testing.T) { lastEntry.Context[1].Integer == 1 }, 10*time.Second, 5*time.Millisecond, "poller should log missing body") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } func TestNonJsonHeader(t *testing.T) { @@ -230,7 +233,7 @@ func TestNonJsonHeader(t *testing.T) { "invalid character 'o'") }, 10*time.Second, 5*time.Millisecond, "poller should reject segment") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } func TestJsonInvalidHeader(t *testing.T) { @@ -266,7 +269,7 @@ func TestJsonInvalidHeader(t *testing.T) { ) }, 10*time.Second, 5*time.Millisecond, "poller should reject segment") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } func TestSocketReadIrrecoverableNetError(t *testing.T) { @@ -303,7 +306,7 @@ func TestSocketReadIrrecoverableNetError(t *testing.T) { errors.Unwrap(lastEntry.Context[0].Interface.(error)).Error() == randErrStr.String() }, 10*time.Second, 5*time.Millisecond, "poller should exit due to irrecoverable net read error") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } func TestSocketReadTimeOutNetError(t *testing.T) { @@ -341,7 +344,7 @@ func TestSocketReadTimeOutNetError(t *testing.T) { errors.Unwrap(lastEntry.Context[0].Interface.(error)).Error() == randErrStr.String() }, 10*time.Second, 5*time.Millisecond, "poller should encounter net read error") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } func TestSocketGenericReadError(t *testing.T) { @@ -377,7 +380,7 @@ func TestSocketGenericReadError(t *testing.T) { errors.Unwrap(lastEntry.Context[0].Interface.(error)).Error() == randErrStr.String() }, 10*time.Second, 5*time.Millisecond, "poller should encounter generic socket read error") - assert.NoError(t, tt.CheckReceiverTraces(Transport, 0, 1)) + assertReceiverTraces(t, tt, receiverID, 0, 1) } type mockNetError struct { @@ -486,3 +489,47 @@ func logSetup() (*zap.Logger, *observer.ObservedLogs) { core, recorded := observer.New(zapcore.InfoLevel) return zap.New(core), recorded } + +func assertReceiverTraces(t *testing.T, tt componenttest.TestTelemetry, id component.ID, accepted, refused int64) { + got, err := tt.GetMetric("otelcol_receiver_accepted_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_spans", + Description: "Number of spans successfully pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", Transport)), + Value: accepted, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_refused_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_spans", + Description: "Number of spans that could not be pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", Transport)), + Value: refused, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} diff --git a/receiver/awsxrayreceiver/receiver_test.go b/receiver/awsxrayreceiver/receiver_test.go index 2114fc4b9f9ee..bfb9886b4dd6b 100644 --- a/receiver/awsxrayreceiver/receiver_test.go +++ b/receiver/awsxrayreceiver/receiver_test.go @@ -23,6 +23,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.uber.org/zap" "go.uber.org/zap/zapcore" "go.uber.org/zap/zaptest/observer" @@ -111,7 +114,7 @@ func TestSegmentsPassedToConsumer(t *testing.T) { return len(got) == 1 }, 10*time.Second, 5*time.Millisecond, "consumer should eventually get the X-Ray span") - assert.NoError(t, tt.CheckReceiverTraces(udppoller.Transport, 18, 0)) + assertReceiverTraces(t, tt, receiverID, 18, 0) } func TestTranslatorErrorsOut(t *testing.T) { @@ -138,7 +141,7 @@ func TestTranslatorErrorsOut(t *testing.T) { "X-Ray segment to OT traces conversion failed") }, 10*time.Second, 5*time.Millisecond, "poller should log warning because consumer errored out") - assert.NoError(t, tt.CheckReceiverTraces(udppoller.Transport, 1, 1)) + assertReceiverTraces(t, tt, receiverID, 1, 1) } func TestSegmentsConsumerErrorsOut(t *testing.T) { @@ -168,7 +171,7 @@ func TestSegmentsConsumerErrorsOut(t *testing.T) { "Trace consumer errored out") }, 10*time.Second, 5*time.Millisecond, "poller should log warning because consumer errored out") - assert.NoError(t, tt.CheckReceiverTraces(udppoller.Transport, 1, 1)) + assertReceiverTraces(t, tt, receiverID, 1, 1) } func TestPollerCloseError(t *testing.T) { @@ -338,3 +341,47 @@ func logSetup() (*zap.Logger, *observer.ObservedLogs) { core, recorded := observer.New(zapcore.InfoLevel) return zap.New(core), recorded } + +func assertReceiverTraces(t *testing.T, tt componenttest.TestTelemetry, id component.ID, accepted, refused int64) { + got, err := tt.GetMetric("otelcol_receiver_accepted_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_spans", + Description: "Number of spans successfully pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", udppoller.Transport)), + Value: accepted, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_refused_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_spans", + Description: "Number of spans that could not be pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", udppoller.Transport)), + Value: refused, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} diff --git a/receiver/carbonreceiver/go.mod b/receiver/carbonreceiver/go.mod index 8992b1a41e71c..1ed124ef66e72 100644 --- a/receiver/carbonreceiver/go.mod +++ b/receiver/carbonreceiver/go.mod @@ -15,7 +15,9 @@ require ( go.opentelemetry.io/collector/pdata v1.25.0 go.opentelemetry.io/collector/receiver v0.119.0 go.opentelemetry.io/collector/receiver/receivertest v0.119.0 + go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/zap v1.27.0 @@ -46,9 +48,7 @@ require ( go.opentelemetry.io/collector/pdata/pprofile v0.119.0 // indirect go.opentelemetry.io/collector/pipeline v0.119.0 // indirect go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect - go.opentelemetry.io/otel v1.34.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect golang.org/x/net v0.33.0 // indirect golang.org/x/sys v0.29.0 // indirect diff --git a/receiver/carbonreceiver/reporter_test.go b/receiver/carbonreceiver/reporter_test.go index eb874bed46b98..3ee809485e4cc 100644 --- a/receiver/carbonreceiver/reporter_test.go +++ b/receiver/carbonreceiver/reporter_test.go @@ -8,10 +8,14 @@ import ( "errors" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/receiver" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/carbonreceiver/internal/metadata" ) @@ -31,12 +35,56 @@ func TestReporterObservability(t *testing.T) { reporter.OnMetricsProcessed(ctx, 17, nil) - require.NoError(t, tt.CheckReceiverMetrics("tcp", 17, 0)) + assertReceiverMetrics(t, tt, receiverID, 17, 0) // Below just exercise the error paths. err = errors.New("fake error for tests") reporter.OnTranslationError(ctx, err) reporter.OnMetricsProcessed(ctx, 10, err) - require.NoError(t, tt.CheckReceiverMetrics("tcp", 17, 10)) + assertReceiverMetrics(t, tt, receiverID, 17, 10) +} + +func assertReceiverMetrics(t *testing.T, tt componenttest.TestTelemetry, id component.ID, accepted, refused int64) { + got, err := tt.GetMetric("otelcol_receiver_accepted_metric_points") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_metric_points", + Description: "Number of metric points successfully pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", "tcp")), + Value: accepted, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_refused_metric_points") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_metric_points", + Description: "Number of metric points that could not be pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", "tcp")), + Value: refused, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } diff --git a/receiver/opencensusreceiver/go.mod b/receiver/opencensusreceiver/go.mod index 0cd61f7f0fdd9..fa689bcbfd4a4 100644 --- a/receiver/opencensusreceiver/go.mod +++ b/receiver/opencensusreceiver/go.mod @@ -27,6 +27,7 @@ require ( go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/goleak v1.3.0 google.golang.org/grpc v1.70.0 @@ -74,7 +75,6 @@ require ( go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect go.opentelemetry.io/collector/semconv v0.119.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect go.uber.org/multierr v1.11.0 // indirect go.uber.org/zap v1.27.0 // indirect golang.org/x/net v0.34.0 // indirect diff --git a/receiver/opencensusreceiver/internal/octrace/observability_test.go b/receiver/opencensusreceiver/internal/octrace/observability_test.go index 42728adb131ea..6ea9ca8ce0a0a 100644 --- a/receiver/opencensusreceiver/internal/octrace/observability_test.go +++ b/receiver/opencensusreceiver/internal/octrace/observability_test.go @@ -19,6 +19,9 @@ import ( "go.opentelemetry.io/collector/consumer/consumertest" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" sdktrace "go.opentelemetry.io/otel/sdk/trace" nooptrace "go.opentelemetry.io/otel/trace/noop" ) @@ -56,7 +59,7 @@ func TestEnsureRecordedMetrics(t *testing.T) { } flush(traceSvcDoneFn) - require.NoError(t, tt.CheckReceiverTraces("grpc", int64(n), 0)) + assertReceiverTraces(t, tt, receiverID, int64(n), 0) } func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) { @@ -79,7 +82,7 @@ func TestEnsureRecordedMetrics_zeroLengthSpansSender(t *testing.T) { } flush(traceSvcDoneFn) - require.NoError(t, tt.CheckReceiverTraces("grpc", 0, 0)) + assertReceiverTraces(t, tt, receiverID, 0, 0) } func TestExportSpanLinkingMaintainsParentLink(t *testing.T) { @@ -144,3 +147,47 @@ func flush(traceSvcDoneFn func()) { // Give it some more time to complete the RPC trace and export. <-time.After(40 * time.Millisecond) } + +func assertReceiverTraces(t *testing.T, tt componenttest.TestTelemetry, id component.ID, accepted, refused int64) { + got, err := tt.GetMetric("otelcol_receiver_accepted_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_spans", + Description: "Number of spans successfully pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", "grpc")), + Value: accepted, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_refused_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_spans", + Description: "Number of spans that could not be pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", id.String()), + attribute.String("transport", "grpc")), + Value: refused, + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} diff --git a/receiver/opencensusreceiver/opencensus_test.go b/receiver/opencensusreceiver/opencensus_test.go index 6781d846abb15..7ec34c5dfa626 100644 --- a/receiver/opencensusreceiver/opencensus_test.go +++ b/receiver/opencensusreceiver/opencensus_test.go @@ -39,6 +39,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "google.golang.org/grpc" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials/insecure" @@ -585,7 +588,48 @@ func TestOCReceiverTrace_HandleNextConsumerResponse(t *testing.T) { } require.Len(t, sink.AllTraces(), tt.expectedReceivedBatches) - require.NoError(t, testTel.CheckReceiverTraces("grpc", int64(tt.expectedReceivedBatches), int64(tt.expectedIngestionBlockedRPCs))) + + got, err := testTel.GetMetric("otelcol_receiver_accepted_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_spans", + Description: "Number of spans successfully pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", exporter.receiverID.String()), + attribute.String("transport", "grpc")), + Value: int64(tt.expectedReceivedBatches), + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = testTel.GetMetric("otelcol_receiver_refused_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_spans", + Description: "Number of spans that could not be pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", exporter.receiverID.String()), + attribute.String("transport", "grpc")), + Value: int64(tt.expectedIngestionBlockedRPCs), + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) }) } } @@ -743,7 +787,48 @@ func TestOCReceiverMetrics_HandleNextConsumerResponse(t *testing.T) { } require.Len(t, sink.AllMetrics(), tt.expectedReceivedBatches) - require.NoError(t, testTel.CheckReceiverMetrics("grpc", int64(tt.expectedReceivedBatches), int64(tt.expectedIngestionBlockedRPCs))) + + got, err := testTel.GetMetric("otelcol_receiver_accepted_metric_points") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_metric_points", + Description: "Number of metric points successfully pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", exporter.receiverID.String()), + attribute.String("transport", "grpc")), + Value: int64(tt.expectedReceivedBatches), + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = testTel.GetMetric("otelcol_receiver_refused_metric_points") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_metric_points", + Description: "Number of metric points that could not be pushed into the pipeline. [alpha]", + Unit: "{datapoints}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", exporter.receiverID.String()), + attribute.String("transport", "grpc")), + Value: int64(tt.expectedIngestionBlockedRPCs), + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) }) } } diff --git a/receiver/otelarrowreceiver/go.mod b/receiver/otelarrowreceiver/go.mod index f3da90789df8e..d22e3afe28cad 100644 --- a/receiver/otelarrowreceiver/go.mod +++ b/receiver/otelarrowreceiver/go.mod @@ -26,6 +26,7 @@ require ( go.opentelemetry.io/collector/receiver/receivertest v0.119.0 go.opentelemetry.io/otel v1.34.0 go.opentelemetry.io/otel/sdk v1.34.0 + go.opentelemetry.io/otel/sdk/metric v1.34.0 go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/goleak v1.3.0 go.uber.org/mock v0.5.0 @@ -78,7 +79,6 @@ require ( go.opentelemetry.io/collector/receiver/xreceiver v0.119.0 // indirect go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.59.0 // indirect go.opentelemetry.io/otel/metric v1.34.0 // indirect - go.opentelemetry.io/otel/sdk/metric v1.34.0 // indirect golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect golang.org/x/mod v0.18.0 // indirect golang.org/x/sync v0.10.0 // indirect diff --git a/receiver/otelarrowreceiver/otelarrow_test.go b/receiver/otelarrowreceiver/otelarrow_test.go index 9a576dc136339..7f3521b29b19d 100644 --- a/receiver/otelarrowreceiver/otelarrow_test.go +++ b/receiver/otelarrowreceiver/otelarrow_test.go @@ -34,6 +34,9 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace/ptraceotlp" "go.opentelemetry.io/collector/receiver" "go.opentelemetry.io/collector/receiver/receivertest" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" "go.uber.org/mock/gomock" "go.uber.org/zap" "go.uber.org/zap/zapcore" @@ -133,7 +136,48 @@ func TestOTelArrowReceiverGRPCTracesIngestTest(t *testing.T) { require.Len(t, sink.AllTraces(), expectedReceivedBatches) expectedIngestionBlockedRPCs := 1 - require.NoError(t, tt.CheckReceiverTraces("grpc", int64(expectedReceivedBatches), int64(expectedIngestionBlockedRPCs))) + + got, err := tt.GetMetric("otelcol_receiver_accepted_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_accepted_spans", + Description: "Number of spans successfully pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", testReceiverID.String()), + attribute.String("transport", "grpc")), + Value: int64(expectedReceivedBatches), + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + got, err = tt.GetMetric("otelcol_receiver_refused_spans") + assert.NoError(t, err) + metricdatatest.AssertEqual(t, + metricdata.Metrics{ + Name: "otelcol_receiver_refused_spans", + Description: "Number of spans that could not be pushed into the pipeline. [alpha]", + Unit: "{spans}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("receiver", testReceiverID.String()), + attribute.String("transport", "grpc")), + Value: int64(expectedIngestionBlockedRPCs), + }, + }, + }, + }, got, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } func TestGRPCInvalidTLSCredentials(t *testing.T) {