diff --git a/.chloggen/exporterhelper-dropped-items.yaml b/.chloggen/exporterhelper-dropped-items.yaml new file mode 100644 index 00000000000..91e73661d98 --- /dev/null +++ b/.chloggen/exporterhelper-dropped-items.yaml @@ -0,0 +1,34 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/otlp) +component: pkg/exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add support for reporting items that an exporter intentionally drops, so they are no longer counted as successfully sent. + +# One or more tracking issues or pull requests related to the change +issues: [13643] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Exporter authors can return `exporterhelper.NewDroppedItemsErr(n, reason)` + from their push function to report that `n` items were intentionally dropped. + The dropped items are subtracted from the `exporter_sent_` counters + and reported via four new telemetry metrics (`exporter_dropped_log_records`, + `exporter_dropped_metric_points`, `exporter_dropped_profile_samples`, + `exporter_dropped_spans`). At detailed telemetry level the metrics also + include the optional `exporter.dropped.reason` attribute; the attribute is + filtered out at lower levels to bound cardinality. + The new spans emitted by exporterhelper carry an `items.dropped` attribute. + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user, api] diff --git a/exporter/exporterhelper/documentation.md b/exporter/exporterhelper/documentation.md index 3fca19d9f1e..80a83f5838e 100644 --- a/exporter/exporterhelper/documentation.md +++ b/exporter/exporterhelper/documentation.md @@ -6,6 +6,38 @@ The following telemetry is emitted by this component. +### otelcol_exporter_dropped_log_records + +Number of log records intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason. + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {record} | Sum | Int | true | Alpha | + +### otelcol_exporter_dropped_metric_points + +Number of metric points intentionally dropped by the exporter (e.g. unsupported temporality). At detailed telemetry level, includes attribute: exporter.dropped.reason. + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {datapoint} | Sum | Int | true | Alpha | + +### otelcol_exporter_dropped_profile_samples + +Number of profile samples intentionally dropped by the exporter. At detailed telemetry level, includes attribute: exporter.dropped.reason. + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {sample} | Sum | Int | true | Development | + +### otelcol_exporter_dropped_spans + +Number of spans intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason. + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| {span} | Sum | Int | true | Alpha | + ### otelcol_exporter_enqueue_failed_log_records Number of log records failed to be added to the sending queue. diff --git a/exporter/exporterhelper/errors.go b/exporter/exporterhelper/errors.go new file mode 100644 index 00000000000..993f52a6c30 --- /dev/null +++ b/exporter/exporterhelper/errors.go @@ -0,0 +1,26 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" + +import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr" + +// DroppedItemsErr is a non-failure sentinel that an exporter's push function +// can return to signal that it intentionally dropped a number of items rather +// than failing to send them. The error is not propagated to the rest of the +// pipeline. exporterhelper subtracts the dropped count from +// exporter_sent_ and records it on exporter_dropped_ instead. +// +// Typical use is an exporter that receives data it cannot translate to the +// destination format (e.g. the Prometheus exporter dropping non-monotonic +// DELTA sums) and wants the operator's internal telemetry to reflect a drop +// rather than a successful send. +type DroppedItemsErr = experr.DroppedItemsErr + +// NewDroppedItemsErr creates a [DroppedItemsErr] for the given count and an +// optional human-readable reason. The reason is recorded as the +// exporter.dropped.reason metric attribute at detailed telemetry level only, +// to keep cardinality bounded at lower levels. +func NewDroppedItemsErr(dropped int, reason string) error { + return experr.NewDroppedItemsErr(dropped, reason) +} diff --git a/exporter/exporterhelper/internal/experr/err.go b/exporter/exporterhelper/internal/experr/err.go index 4080e0369d6..4a37b118428 100644 --- a/exporter/exporterhelper/internal/experr/err.go +++ b/exporter/exporterhelper/internal/experr/err.go @@ -5,6 +5,7 @@ package experr // import "go.opentelemetry.io/collector/exporter/exporterhelper/ import ( "errors" + "fmt" ) type shutdownErr struct { @@ -27,3 +28,37 @@ func IsShutdownErr(err error) bool { var sdErr shutdownErr return errors.As(err, &sdErr) } + +// DroppedItemsErr is a non-failure sentinel that an exporter can return to +// signal that it intentionally dropped a number of items rather than failing +// to send them. The error is not propagated to the pipeline as a failure — +// it is used only to update the dropped-items telemetry counters. +type DroppedItemsErr struct { + // Dropped is the number of items that were intentionally dropped. + Dropped int + // Reason is an optional human-readable description of why the items were + // dropped (e.g. "incompatible temporality"). + Reason string +} + +// NewDroppedItemsErr creates a DroppedItemsErr for the given count and reason. +func NewDroppedItemsErr(dropped int, reason string) error { + return &DroppedItemsErr{Dropped: dropped, Reason: reason} +} + +func (d *DroppedItemsErr) Error() string { + if d.Reason != "" { + return fmt.Sprintf("dropped %d items: %s", d.Dropped, d.Reason) + } + return fmt.Sprintf("dropped %d items", d.Dropped) +} + +// DroppedItemsFromErr extracts the DroppedItemsErr from err, if present, and +// returns it together with a boolean indicating success. +func DroppedItemsFromErr(err error) (*DroppedItemsErr, bool) { + var d *DroppedItemsErr + if errors.As(err, &d) { + return d, true + } + return nil, false +} diff --git a/exporter/exporterhelper/internal/experr/err_test.go b/exporter/exporterhelper/internal/experr/err_test.go index ac0580025e5..e3fa0897b3d 100644 --- a/exporter/exporterhelper/internal/experr/err_test.go +++ b/exporter/exporterhelper/internal/experr/err_test.go @@ -22,3 +22,27 @@ func TestIsShutdownErr(t *testing.T) { err = NewShutdownErr(err) require.True(t, IsShutdownErr(err)) } + +func TestNewDroppedItemsErr_WithReason(t *testing.T) { + err := NewDroppedItemsErr(5, "incompatible temporality") + assert.Equal(t, "dropped 5 items: incompatible temporality", err.Error()) +} + +func TestNewDroppedItemsErr_NoReason(t *testing.T) { + err := NewDroppedItemsErr(3, "") + assert.Equal(t, "dropped 3 items", err.Error()) +} + +func TestDroppedItemsFromErr(t *testing.T) { + err := NewDroppedItemsErr(7, "some reason") + d, ok := DroppedItemsFromErr(err) + require.True(t, ok) + assert.Equal(t, 7, d.Dropped) + assert.Equal(t, "some reason", d.Reason) + + _, ok = DroppedItemsFromErr(errors.New("not a dropped items error")) + assert.False(t, ok) + + _, ok = DroppedItemsFromErr(nil) + assert.False(t, ok) +} diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index c94d67c9f5c..0cf1b55de63 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -28,6 +28,10 @@ type TelemetryBuilder struct { meter metric.Meter mu sync.Mutex registrations []metric.Registration + ExporterDroppedLogRecords metric.Int64Counter + ExporterDroppedMetricPoints metric.Int64Counter + ExporterDroppedProfileSamples metric.Int64Counter + ExporterDroppedSpans metric.Int64Counter ExporterEnqueueFailedLogRecords metric.Int64Counter ExporterEnqueueFailedMetricPoints metric.Int64Counter ExporterEnqueueFailedProfileSamples metric.Int64Counter @@ -116,6 +120,30 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error + builder.ExporterDroppedLogRecords, err = builder.meter.Int64Counter( + "otelcol_exporter_dropped_log_records", + metric.WithDescription("Number of log records intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason. [Alpha]"), + metric.WithUnit("{record}"), + ) + errs = errors.Join(errs, err) + builder.ExporterDroppedMetricPoints, err = builder.meter.Int64Counter( + "otelcol_exporter_dropped_metric_points", + metric.WithDescription("Number of metric points intentionally dropped by the exporter (e.g. unsupported temporality). At detailed telemetry level, includes attribute: exporter.dropped.reason. [Alpha]"), + metric.WithUnit("{datapoint}"), + ) + errs = errors.Join(errs, err) + builder.ExporterDroppedProfileSamples, err = builder.meter.Int64Counter( + "otelcol_exporter_dropped_profile_samples", + metric.WithDescription("Number of profile samples intentionally dropped by the exporter. At detailed telemetry level, includes attribute: exporter.dropped.reason. [Development]"), + metric.WithUnit("{sample}"), + ) + errs = errors.Join(errs, err) + builder.ExporterDroppedSpans, err = builder.meter.Int64Counter( + "otelcol_exporter_dropped_spans", + metric.WithDescription("Number of spans intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason. [Alpha]"), + metric.WithUnit("{span}"), + ) + errs = errors.Join(errs, err) builder.ExporterEnqueueFailedLogRecords, err = builder.meter.Int64Counter( "otelcol_exporter_enqueue_failed_log_records", metric.WithDescription("Number of log records failed to be added to the sending queue. [Alpha]"), diff --git a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go index 9402feb9329..e493bd56742 100644 --- a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go +++ b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go @@ -12,6 +12,70 @@ import ( "go.opentelemetry.io/collector/component/componenttest" ) +func AssertEqualExporterDroppedLogRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_dropped_log_records", + Description: "Number of log records intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason. [Alpha]", + Unit: "{record}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_dropped_log_records") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterDroppedMetricPoints(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_dropped_metric_points", + Description: "Number of metric points intentionally dropped by the exporter (e.g. unsupported temporality). At detailed telemetry level, includes attribute: exporter.dropped.reason. [Alpha]", + Unit: "{datapoint}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_dropped_metric_points") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterDroppedProfileSamples(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_dropped_profile_samples", + Description: "Number of profile samples intentionally dropped by the exporter. At detailed telemetry level, includes attribute: exporter.dropped.reason. [Development]", + Unit: "{sample}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_dropped_profile_samples") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterDroppedSpans(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_dropped_spans", + Description: "Number of spans intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason. [Alpha]", + Unit: "{span}", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_dropped_spans") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualExporterEnqueueFailedLogRecords(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_exporter_enqueue_failed_log_records", diff --git a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go index 6aa21baf129..7d2a3e4615c 100644 --- a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go +++ b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -28,6 +28,10 @@ func TestSetupTelemetry(t *testing.T) { observer.Observe(1) return nil })) + tb.ExporterDroppedLogRecords.Add(context.Background(), 1) + tb.ExporterDroppedMetricPoints.Add(context.Background(), 1) + tb.ExporterDroppedProfileSamples.Add(context.Background(), 1) + tb.ExporterDroppedSpans.Add(context.Background(), 1) tb.ExporterEnqueueFailedLogRecords.Add(context.Background(), 1) tb.ExporterEnqueueFailedMetricPoints.Add(context.Background(), 1) tb.ExporterEnqueueFailedProfileSamples.Add(context.Background(), 1) @@ -43,6 +47,18 @@ func TestSetupTelemetry(t *testing.T) { tb.ExporterSentMetricPoints.Add(context.Background(), 1) tb.ExporterSentProfileSamples.Add(context.Background(), 1) tb.ExporterSentSpans.Add(context.Background(), 1) + AssertEqualExporterDroppedLogRecords(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterDroppedMetricPoints(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterDroppedProfileSamples(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterDroppedSpans(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) AssertEqualExporterEnqueueFailedLogRecords(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/exporter/exporterhelper/internal/obs_report_sender.go b/exporter/exporterhelper/internal/obs_report_sender.go index cbff9ac00d9..83e89141995 100644 --- a/exporter/exporterhelper/internal/obs_report_sender.go +++ b/exporter/exporterhelper/internal/obs_report_sender.go @@ -12,6 +12,7 @@ import ( "go.opentelemetry.io/otel/metric" semconv "go.opentelemetry.io/otel/semconv/v1.40.0" "go.opentelemetry.io/otel/trace" + "go.uber.org/zap" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -41,15 +42,24 @@ const ( ItemsSent = "items.sent" // ItemsFailed used to track number of items that failed to be sent by exporters. ItemsFailed = "items.failed" + // ItemsDropped used to track number of items intentionally dropped by exporters. + ItemsDropped = "items.dropped" // ErrorPermanentKey indicates whether the error is permanent (non-retryable). ErrorPermanentKey = "error.permanent" + // DroppedReasonKey carries the optional reason from a DroppedItemsErr on the + // exporter_dropped_* metrics. The attribute is only retained at the detailed + // telemetry level (filtered out by a metric view at lower levels) because + // the reason string is exporter-defined and may have unbounded cardinality. + DroppedReasonKey = "exporter.dropped.reason" ) type obsReportSender[K request.Request] struct { component.StartFunc component.ShutdownFunc + logger *zap.Logger + exporterIDStr string spanName string tracer trace.Tracer spanAttrs trace.SpanStartEventOption @@ -57,6 +67,7 @@ type obsReportSender[K request.Request] struct { inFlightMetricAttr metric.MeasurementOption itemsSentInst metric.Int64Counter itemsFailedInst metric.Int64Counter + itemsDroppedInst metric.Int64Counter inFlightInst metric.Int64UpDownCounter next sender.Sender[K] } @@ -71,6 +82,8 @@ func newObsReportSender[K request.Request](set exporter.Settings, signal pipelin expAttr := attribute.String(ExporterKey, idStr) or := &obsReportSender[K]{ + logger: set.Logger, + exporterIDStr: idStr, spanName: ExporterKey + spanNameSep + idStr + spanNameSep + signal.String(), tracer: metadata.Tracer(set.TelemetrySettings), spanAttrs: trace.WithAttributes(expAttr, attribute.String(DataTypeKey, signal.String())), @@ -85,18 +98,22 @@ func newObsReportSender[K request.Request](set exporter.Settings, signal pipelin case pipeline.SignalTraces: or.itemsSentInst = telemetryBuilder.ExporterSentSpans or.itemsFailedInst = telemetryBuilder.ExporterSendFailedSpans + or.itemsDroppedInst = telemetryBuilder.ExporterDroppedSpans case pipeline.SignalMetrics: or.itemsSentInst = telemetryBuilder.ExporterSentMetricPoints or.itemsFailedInst = telemetryBuilder.ExporterSendFailedMetricPoints + or.itemsDroppedInst = telemetryBuilder.ExporterDroppedMetricPoints case pipeline.SignalLogs: or.itemsSentInst = telemetryBuilder.ExporterSentLogRecords or.itemsFailedInst = telemetryBuilder.ExporterSendFailedLogRecords + or.itemsDroppedInst = telemetryBuilder.ExporterDroppedLogRecords case xpipeline.SignalProfiles: or.itemsSentInst = telemetryBuilder.ExporterSentProfileSamples or.itemsFailedInst = telemetryBuilder.ExporterSendFailedProfileSamples + or.itemsDroppedInst = telemetryBuilder.ExporterDroppedProfileSamples } return or, nil @@ -110,6 +127,11 @@ func (ors *obsReportSender[K]) Send(ctx context.Context, req K) error { // Forward the data to the next consumer (this pusher is the next). err := ors.next.Send(c, req) ors.endOp(c, items, err) + // DroppedItemsErr is a non-failure sentinel: don't propagate it as an + // error to the rest of the pipeline. + if _, ok := experr.DroppedItemsFromErr(err); ok { + return nil + } return err } @@ -133,8 +155,42 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numRecords int, err er ors.inFlightInst.Add(ctx, -1, ors.inFlightMetricAttr) } + // Check if the exporter intentionally dropped some items. A + // DroppedItemsErr is a non-failure sentinel: the exporter processed the + // request successfully but chose to discard certain items (e.g. a + // Prometheus exporter dropping non-monotonic DELTA sums). + var ( + numDropped int64 + droppedReason string + ) + if d, ok := experr.DroppedItemsFromErr(err); ok { + numDropped = int64(d.Dropped) + droppedReason = d.Reason + // Clear the error so that it is not counted as a send failure and + // is not propagated to the rest of the pipeline. + err = nil + } + numSent, numFailedToSend := toNumItems(numRecords, err) + // Items that were intentionally dropped should not be counted as + // successfully sent. Clamp to zero in case the exporter reported more + // dropped items than the total item count, and warn — that condition + // indicates an exporter bug. + if numDropped > 0 && numSent <= numDropped { + if ors.logger != nil { + ors.logger.Warn("exporter reported more dropped items than the request contained", + zap.String("exporter", ors.exporterIDStr), + zap.Int64("dropped", numDropped), + zap.Int64("items", numSent), + zap.String("reason", droppedReason), + ) + } + numSent = 0 + } else if numDropped > 0 { + numSent -= numDropped + } + if ors.itemsSentInst != nil { ors.itemsSentInst.Add(ctx, numSent, ors.metricAttr) } @@ -144,6 +200,13 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numRecords int, err er ors.itemsFailedInst.Add(ctx, numFailedToSend, ors.metricAttr, withFailedAttrs) } + if ors.itemsDroppedInst != nil && numDropped > 0 { + withReason := metric.WithAttributeSet( + attribute.NewSet(attribute.String(DroppedReasonKey, droppedReason)), + ) + ors.itemsDroppedInst.Add(ctx, numDropped, ors.metricAttr, withReason) + } + span := trace.SpanFromContext(ctx) defer span.End() // End the span according to errors. @@ -151,6 +214,7 @@ func (ors *obsReportSender[K]) endOp(ctx context.Context, numRecords int, err er span.SetAttributes( attribute.Int64(ItemsSent, numSent), attribute.Int64(ItemsFailed, numFailedToSend), + attribute.Int64(ItemsDropped, numDropped), ) if err != nil { span.SetStatus(otelcodes.Error, err.Error()) diff --git a/exporter/exporterhelper/internal/obs_report_sender_test.go b/exporter/exporterhelper/internal/obs_report_sender_test.go index 5ef9ef7e52d..8d1bb23b2da 100644 --- a/exporter/exporterhelper/internal/obs_report_sender_test.go +++ b/exporter/exporterhelper/internal/obs_report_sender_test.go @@ -16,6 +16,8 @@ import ( "go.opentelemetry.io/otel/sdk/metric/metricdata" "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" semconv "go.opentelemetry.io/otel/semconv/v1.40.0" + "go.uber.org/zap" + "go.uber.org/zap/zaptest/observer" grpccodes "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -628,3 +630,168 @@ type testParams struct { items int err error } + +func TestExportMetricsDroppedOp(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + const totalItems = 10 + const droppedItems = 3 + const reason = "incompatible temporality" + + parentCtx, parentSpan := tt.NewTelemetrySettings().TracerProvider.Tracer("test").Start(context.Background(), t.Name()) + defer parentSpan.End() + + obsrep, err := newObsReportSender( + exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + pipeline.SignalMetrics, + nil, + sender.NewSender(func(context.Context, request.Request) error { + return experr.NewDroppedItemsErr(droppedItems, reason) + }), + ) + require.NoError(t, err) + + // A DroppedItemsErr must not be returned to the caller. + sendErr := obsrep.Send(parentCtx, &requesttest.FakeRequest{Items: totalItems}) + require.NoError(t, sendErr) + + metadatatest.AssertEqualExporterSentMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("exporter", exporterID.String())), + Value: int64(totalItems - droppedItems), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + metadatatest.AssertEqualExporterDroppedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("exporter", exporterID.String()), + attribute.String(DroppedReasonKey, reason), + ), + Value: int64(droppedItems), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // No items should be counted as send failures. + _, getErr := tt.GetMetric("otelcol_exporter_send_failed_metric_points") + require.Error(t, getErr, "send_failed_metric_points should not be recorded when items are dropped") + + // Span attribute items.dropped must reflect the dropped count, and + // items.sent must reflect the post-clamp value. + spans := tt.SpanRecorder.Ended() + require.Len(t, spans, 1) + require.Contains(t, spans[0].Attributes(), attribute.KeyValue{Key: ItemsDropped, Value: attribute.Int64Value(int64(droppedItems))}) + require.Contains(t, spans[0].Attributes(), attribute.KeyValue{Key: ItemsSent, Value: attribute.Int64Value(int64(totalItems - droppedItems))}) + require.Contains(t, spans[0].Attributes(), attribute.KeyValue{Key: ItemsFailed, Value: attribute.Int64Value(0)}) +} + +func TestExportAllItemsDroppedOp(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + const totalItems = 5 + const reason = "unsupported" + + obsrep, err := newObsReportSender( + exporter.Settings{ID: exporterID, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + pipeline.SignalMetrics, + nil, + sender.NewSender(func(context.Context, request.Request) error { + // Drop all items. + return experr.NewDroppedItemsErr(totalItems, reason) + }), + ) + require.NoError(t, err) + + sendErr := obsrep.Send(context.Background(), &requesttest.FakeRequest{Items: totalItems}) + require.NoError(t, sendErr) + + metadatatest.AssertEqualExporterSentMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("exporter", exporterID.String())), + Value: 0, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + metadatatest.AssertEqualExporterDroppedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("exporter", exporterID.String()), + attribute.String(DroppedReasonKey, reason), + ), + Value: int64(totalItems), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) +} + +// TestExportDroppedMoreThanSentLogsWarning exercises the defensive branch where +// an exporter author reports more dropped items than the request contained. The +// behavior must be: clamp numSent to 0, do not count as a send failure, and +// emit a single warning identifying the misbehaving exporter. +func TestExportDroppedMoreThanSentLogsWarning(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + core, observed := observer.New(zap.WarnLevel) + logger := zap.New(core) + + telSettings := tt.NewTelemetrySettings() + settings := exporter.Settings{ + ID: exporterID, + TelemetrySettings: telSettings, + BuildInfo: component.NewDefaultBuildInfo(), + } + settings.Logger = logger + + const totalItems = 3 + const droppedItems = 5 // intentionally more than totalItems + const reason = "exporter bug" + + obsrep, err := newObsReportSender( + settings, + pipeline.SignalMetrics, + nil, + sender.NewSender(func(context.Context, request.Request) error { + return experr.NewDroppedItemsErr(droppedItems, reason) + }), + ) + require.NoError(t, err) + + sendErr := obsrep.Send(context.Background(), &requesttest.FakeRequest{Items: totalItems}) + require.NoError(t, sendErr) + + // sent clamps to zero. + metadatatest.AssertEqualExporterSentMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet(attribute.String("exporter", exporterID.String())), + Value: 0, + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // dropped counter still records the (possibly bogus) value the exporter reported. + metadatatest.AssertEqualExporterDroppedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String("exporter", exporterID.String()), + attribute.String(DroppedReasonKey, reason), + ), + Value: int64(droppedItems), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + // Exactly one warning was emitted, naming the exporter and the counts. + entries := observed.FilterMessage("exporter reported more dropped items than the request contained").All() + require.Len(t, entries, 1) + got := entries[0].ContextMap() + assert.Equal(t, exporterID.String(), got["exporter"]) + assert.Equal(t, int64(droppedItems), got["dropped"]) + assert.Equal(t, int64(totalItems), got["items"]) + assert.Equal(t, reason, got["reason"]) +} diff --git a/exporter/exporterhelper/metadata.yaml b/exporter/exporterhelper/metadata.yaml index 5b8c3199e36..32ccd45a69f 100644 --- a/exporter/exporterhelper/metadata.yaml +++ b/exporter/exporterhelper/metadata.yaml @@ -13,6 +13,42 @@ status: telemetry: metrics: + exporter_dropped_log_records: + enabled: true + stability: alpha + description: "Number of log records intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason." + unit: "{record}" + sum: + value_type: int + monotonic: true + + exporter_dropped_metric_points: + enabled: true + stability: alpha + description: "Number of metric points intentionally dropped by the exporter (e.g. unsupported temporality). At detailed telemetry level, includes attribute: exporter.dropped.reason." + unit: "{datapoint}" + sum: + value_type: int + monotonic: true + + exporter_dropped_profile_samples: + enabled: true + stability: development + description: "Number of profile samples intentionally dropped by the exporter. At detailed telemetry level, includes attribute: exporter.dropped.reason." + unit: "{sample}" + sum: + value_type: int + monotonic: true + + exporter_dropped_spans: + enabled: true + stability: alpha + description: "Number of spans intentionally dropped by the exporter (e.g. unsupported data). At detailed telemetry level, includes attribute: exporter.dropped.reason." + unit: "{span}" + sum: + value_type: int + monotonic: true + exporter_enqueue_failed_log_records: enabled: true stability: alpha diff --git a/exporter/exporterhelper/metrics_dropped_e2e_test.go b/exporter/exporterhelper/metrics_dropped_e2e_test.go new file mode 100644 index 00000000000..1db6139036f --- /dev/null +++ b/exporter/exporterhelper/metrics_dropped_e2e_test.go @@ -0,0 +1,99 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package exporterhelper + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/metadatatest" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/testdata" +) + +// TestMetrics_DroppedItemsErr_EndToEnd exercises the full public exporterhelper +// metrics path with a push function that returns NewDroppedItemsErr. It is the +// public-API contract test for the feature: the caller of ConsumeMetrics must +// see no error, exporter_sent_metric_points must reflect (total - dropped), +// exporter_dropped_metric_points must carry the dropped count plus the reason +// attribute, and exporter_send_failed_metric_points must remain unrecorded. +func TestMetrics_DroppedItemsErr_EndToEnd(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + const droppedItems = 2 + const reason = "incompatible temporality" + + pusher := consumer.ConsumeMetricsFunc(func(_ context.Context, _ pmetric.Metrics) error { + return NewDroppedItemsErr(droppedItems, reason) + }) + + me, err := NewMetrics( + context.Background(), + exporter.Settings{ + ID: fakeMetricsName, + TelemetrySettings: tt.NewTelemetrySettings(), + BuildInfo: component.NewDefaultBuildInfo(), + }, + &fakeMetricsConfig, + pusher, + ) + require.NoError(t, err) + require.NotNil(t, me) + require.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + t.Cleanup(func() { require.NoError(t, me.Shutdown(context.Background())) }) + + // testdata.GenerateMetrics(2) produces 4 metric data points across 2 resources. + md := testdata.GenerateMetrics(2) + totalItems := md.DataPointCount() + require.Greater(t, totalItems, droppedItems, "test fixture must produce more items than we drop") + + // ConsumeMetrics must not surface the sentinel as an error. + require.NoError(t, me.ConsumeMetrics(context.Background(), md)) + + metadatatest.AssertEqualExporterSentMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ExporterKey, fakeMetricsName.String())), + Value: int64(totalItems - droppedItems), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + metadatatest.AssertEqualExporterDroppedMetricPoints(t, tt, + []metricdata.DataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(internal.ExporterKey, fakeMetricsName.String()), + attribute.String(internal.DroppedReasonKey, reason), + ), + Value: int64(droppedItems), + }, + }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) + + _, getErr := tt.GetMetric("otelcol_exporter_send_failed_metric_points") + require.Error(t, getErr, "send_failed_metric_points must not be recorded when items are dropped") +} + +// TestMetrics_DroppedItemsErr_TypeAlias verifies the public DroppedItemsErr is +// the same type as the internal sentinel so errors.As round-trips work for +// exporter authors who type-assert on the public symbol. +func TestMetrics_DroppedItemsErr_TypeAlias(t *testing.T) { + err := NewDroppedItemsErr(7, "reason") + var d *DroppedItemsErr + require.ErrorAs(t, err, &d) + assert.Equal(t, 7, d.Dropped) + assert.Equal(t, "reason", d.Reason) +} diff --git a/service/internal/metricviews/views.go b/service/internal/metricviews/views.go index 137bffd6e06..578091af464 100644 --- a/service/internal/metricviews/views.go +++ b/service/internal/metricviews/views.go @@ -113,6 +113,17 @@ func DefaultViews(level configtelemetry.Level) []config.View { }, }, }, + config.View{ + Selector: &config.ViewSelector{ + MeterName: scope, + InstrumentName: ptr("otelcol_exporter_dropped_*"), + }, + Stream: &config.ViewStream{ + AttributeKeys: &config.IncludeExclude{ + Excluded: []string{"exporter.dropped.reason"}, + }, + }, + }, ) } diff --git a/service/internal/metricviews/views_test.go b/service/internal/metricviews/views_test.go index 5625a510e4d..e74119828ee 100644 --- a/service/internal/metricviews/views_test.go +++ b/service/internal/metricviews/views_test.go @@ -22,17 +22,17 @@ func TestDefaultViews(t *testing.T) { { name: "None", level: configtelemetry.LevelNone, - wantViewsCount: 18, + wantViewsCount: 19, }, { name: "Basic", level: configtelemetry.LevelBasic, - wantViewsCount: 18, + wantViewsCount: 19, }, { name: "Normal", level: configtelemetry.LevelNormal, - wantViewsCount: 15, + wantViewsCount: 16, }, { name: "Detailed", @@ -100,6 +100,59 @@ func TestDefaultViewsFiltersSendFailedAttributes(t *testing.T) { } } +func TestDefaultViewsFiltersDroppedAttributes(t *testing.T) { + tests := []struct { + name string + level configtelemetry.Level + expectDroppedFilteredView bool + }{ + { + name: "basic level filters dropped reason attribute", + level: configtelemetry.LevelBasic, + expectDroppedFilteredView: true, + }, + { + name: "normal level filters dropped reason attribute", + level: configtelemetry.LevelNormal, + expectDroppedFilteredView: true, + }, + { + name: "detailed level does not filter dropped reason attribute", + level: configtelemetry.LevelDetailed, + expectDroppedFilteredView: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + views := DefaultViews(tt.level) + + foundDroppedView := false + for _, view := range views { + if view.Selector == nil || + view.Selector.InstrumentName == nil || + *view.Selector.InstrumentName != "otelcol_exporter_dropped_*" { + continue + } + foundDroppedView = true + require.NotNil(t, view.Stream, "dropped view should have a stream") + require.NotNil(t, view.Stream.AttributeKeys, "dropped view should have attribute keys") + require.Equal(t, []string{"exporter.dropped.reason"}, view.Stream.AttributeKeys.Excluded, + "dropped view should exclude 'exporter.dropped.reason' attribute") + break + } + + if tt.expectDroppedFilteredView { + assert.True(t, foundDroppedView, + "Expected to find dropped attribute filtering view at level %s", tt.level) + } else { + assert.False(t, foundDroppedView, + "Did not expect to find dropped attribute filtering view at level %s", tt.level) + } + }) + } +} + func TestDefaultViews_BatchExporterMetrics(t *testing.T) { tests := []struct { name string