diff --git a/.chloggen/12894.yaml b/.chloggen/12894.yaml new file mode 100644 index 000000000000..d4df7a5db313 --- /dev/null +++ b/.chloggen/12894.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add new `exporter_queue_batch_send_size` and `exporter_queue_batch_send_size_bytes` metrics, showing the size of telemetry batches from the exporter. + +# One or more tracking issues or pull requests related to the change +issues: [12894] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/exporter/exporterhelper/documentation.md b/exporter/exporterhelper/documentation.md index 6a94947aeba6..1ff67867155a 100644 --- a/exporter/exporterhelper/documentation.md +++ b/exporter/exporterhelper/documentation.md @@ -30,9 +30,25 @@ Number of spans failed to be added to the sending queue. [alpha] | ---- | ----------- | ---------- | --------- | | {spans} | Sum | Int | true | +### otelcol_exporter_queue_batch_send_size + +Number of units in the batch + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| {units} | Histogram | Int | + +### otelcol_exporter_queue_batch_send_size_bytes + +Number of bytes in batch that was sent. Only available on detailed level. + +| Unit | Metric Type | Value Type | +| ---- | ----------- | ---------- | +| By | Histogram | Int | + ### otelcol_exporter_queue_capacity -Fixed capacity of the retry queue (in batches) [alpha] +Fixed capacity of the retry queue (in batches). [alpha] | Unit | Metric Type | Value Type | | ---- | ----------- | ---------- | @@ -40,7 +56,7 @@ Fixed capacity of the retry queue (in batches) [alpha] ### otelcol_exporter_queue_size -Current size of the retry queue (in batches) [alpha] +Current size of the retry queue (in batches). [alpha] | Unit | Metric Type | Value Type | | ---- | ----------- | ---------- | diff --git a/exporter/exporterhelper/internal/metadata/generated_telemetry.go b/exporter/exporterhelper/internal/metadata/generated_telemetry.go index 4bab921ce885..66114dd2d3cb 100644 --- a/exporter/exporterhelper/internal/metadata/generated_telemetry.go +++ b/exporter/exporterhelper/internal/metadata/generated_telemetry.go @@ -31,6 +31,8 @@ type TelemetryBuilder struct { ExporterEnqueueFailedLogRecords metric.Int64Counter ExporterEnqueueFailedMetricPoints metric.Int64Counter ExporterEnqueueFailedSpans metric.Int64Counter + ExporterQueueBatchSendSize metric.Int64Histogram + ExporterQueueBatchSendSizeBytes metric.Int64Histogram ExporterQueueCapacity metric.Int64ObservableGauge ExporterQueueSize metric.Int64ObservableGauge ExporterSendFailedLogRecords metric.Int64Counter @@ -128,15 +130,29 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme metric.WithUnit("{spans}"), ) errs = errors.Join(errs, err) + builder.ExporterQueueBatchSendSize, err = builder.meter.Int64Histogram( + "otelcol_exporter_queue_batch_send_size", + metric.WithDescription("Number of units in the batch"), + metric.WithUnit("{units}"), + metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}...), + ) + errs = errors.Join(errs, err) + builder.ExporterQueueBatchSendSizeBytes, err = builder.meter.Int64Histogram( + "otelcol_exporter_queue_batch_send_size_bytes", + metric.WithDescription("Number of bytes in batch that was sent. Only available on detailed level."), + metric.WithUnit("By"), + metric.WithExplicitBucketBoundaries([]float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000}...), + ) + errs = errors.Join(errs, err) builder.ExporterQueueCapacity, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_capacity", - metric.WithDescription("Fixed capacity of the retry queue (in batches) [alpha]"), + metric.WithDescription("Fixed capacity of the retry queue (in batches). [alpha]"), metric.WithUnit("{batches}"), ) errs = errors.Join(errs, err) builder.ExporterQueueSize, err = builder.meter.Int64ObservableGauge( "otelcol_exporter_queue_size", - metric.WithDescription("Current size of the retry queue (in batches) [alpha]"), + metric.WithDescription("Current size of the retry queue (in batches). [alpha]"), metric.WithUnit("{batches}"), ) errs = errors.Join(errs, err) diff --git a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go index 13a192501f91..71d1763ce5d6 100644 --- a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go +++ b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest.go @@ -60,10 +60,40 @@ func AssertEqualExporterEnqueueFailedSpans(t *testing.T, tt *componenttest.Telem metricdatatest.AssertEqual(t, want, got, opts...) } +func AssertEqualExporterQueueBatchSendSize(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_queue_batch_send_size", + Description: "Number of units in the batch", + Unit: "{units}", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_queue_batch_send_size") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualExporterQueueBatchSendSizeBytes(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_exporter_queue_batch_send_size_bytes", + Description: "Number of bytes in batch that was sent. Only available on detailed level.", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_exporter_queue_batch_send_size_bytes") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + func AssertEqualExporterQueueCapacity(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_exporter_queue_capacity", - Description: "Fixed capacity of the retry queue (in batches) [alpha]", + Description: "Fixed capacity of the retry queue (in batches). [alpha]", Unit: "{batches}", Data: metricdata.Gauge[int64]{ DataPoints: dps, @@ -77,7 +107,7 @@ func AssertEqualExporterQueueCapacity(t *testing.T, tt *componenttest.Telemetry, func AssertEqualExporterQueueSize(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ Name: "otelcol_exporter_queue_size", - Description: "Current size of the retry queue (in batches) [alpha]", + Description: "Current size of the retry queue (in batches). [alpha]", Unit: "{batches}", Data: metricdata.Gauge[int64]{ DataPoints: dps, diff --git a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go index d8faacd39c10..838a03f504ac 100644 --- a/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go +++ b/exporter/exporterhelper/internal/metadatatest/generated_telemetrytest_test.go @@ -31,6 +31,8 @@ func TestSetupTelemetry(t *testing.T) { tb.ExporterEnqueueFailedLogRecords.Add(context.Background(), 1) tb.ExporterEnqueueFailedMetricPoints.Add(context.Background(), 1) tb.ExporterEnqueueFailedSpans.Add(context.Background(), 1) + tb.ExporterQueueBatchSendSize.Record(context.Background(), 1) + tb.ExporterQueueBatchSendSizeBytes.Record(context.Background(), 1) tb.ExporterSendFailedLogRecords.Add(context.Background(), 1) tb.ExporterSendFailedMetricPoints.Add(context.Background(), 1) tb.ExporterSendFailedSpans.Add(context.Background(), 1) @@ -46,6 +48,12 @@ func TestSetupTelemetry(t *testing.T) { AssertEqualExporterEnqueueFailedSpans(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) + AssertEqualExporterQueueBatchSendSize(t, testTel, + []metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(), + metricdatatest.IgnoreTimestamp()) + AssertEqualExporterQueueBatchSendSizeBytes(t, testTel, + []metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(), + metricdatatest.IgnoreTimestamp()) AssertEqualExporterQueueCapacity(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) diff --git a/exporter/exporterhelper/internal/queue/obs_queue.go b/exporter/exporterhelper/internal/queue/obs_queue.go index 1a077651133d..8cad69620676 100644 --- a/exporter/exporterhelper/internal/queue/obs_queue.go +++ b/exporter/exporterhelper/internal/queue/obs_queue.go @@ -26,10 +26,12 @@ const ( // obsQueue is a helper to add observability to a queue. type obsQueue[T request.Request] struct { Queue[T] - tb *metadata.TelemetryBuilder - metricAttr metric.MeasurementOption - enqueueFailedInst metric.Int64Counter - tracer trace.Tracer + tb *metadata.TelemetryBuilder + metricAttr metric.MeasurementOption + enqueueFailedInst metric.Int64Counter + queueBatchSizeInst metric.Int64Histogram + queueBatchSizeBytesInst metric.Int64Histogram + tracer trace.Tracer } func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) { @@ -74,6 +76,9 @@ func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T or.enqueueFailedInst = tb.ExporterEnqueueFailedLogRecords } + or.queueBatchSizeInst = tb.ExporterQueueBatchSendSize + or.queueBatchSizeBytesInst = tb.ExporterQueueBatchSendSizeBytes + return or, nil } @@ -87,6 +92,9 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error { // be modified by the downstream components like the batcher. numItems := req.ItemsCount() + or.queueBatchSizeInst.Record(ctx, int64(numItems), or.metricAttr) + or.queueBatchSizeBytesInst.Record(ctx, int64(req.BytesSize()), or.metricAttr) + ctx, span := or.tracer.Start(ctx, "exporter/enqueue") err := or.Queue.Offer(ctx, req) span.End() diff --git a/exporter/exporterhelper/internal/queue/obs_queue_test.go b/exporter/exporterhelper/internal/queue/obs_queue_test.go index 5ace81c3080b..f797b26a823a 100644 --- a/exporter/exporterhelper/internal/queue/obs_queue_test.go +++ b/exporter/exporterhelper/internal/queue/obs_queue_test.go @@ -202,3 +202,81 @@ func TestObsQueueMetricsFailure(t *testing.T) { }, }, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars()) } + +func TestObsQueueLogsBatchSize(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := newObsQueue[request.Request](Settings[request.Request]{ + Signal: pipeline.SignalLogs, + ID: exporterID, + Telemetry: tt.NewTelemetrySettings(), + }, newFakeQueue[request.Request](nil, 7, 9)) + require.NoError(t, err) + require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 2, Bytes: 100})) + metadatatest.AssertEqualExporterQueueBatchSendSize(t, tt, + []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(exporterKey, exporterID.String())), + Count: 1, + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Min: metricdata.NewExtrema[int64](2), + Max: metricdata.NewExtrema[int64](2), + Sum: 2, + }, + }, metricdatatest.IgnoreTimestamp()) +} + +func TestObsQueueTracesBatchSize(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := newObsQueue[request.Request](Settings[request.Request]{ + Signal: pipeline.SignalTraces, + ID: exporterID, + Telemetry: tt.NewTelemetrySettings(), + }, newFakeQueue[request.Request](nil, 17, 19)) + require.NoError(t, err) + require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 12, Bytes: 200})) + metadatatest.AssertEqualExporterQueueBatchSendSize(t, tt, + []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(exporterKey, exporterID.String())), + Count: 1, + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Min: metricdata.NewExtrema[int64](12), + Max: metricdata.NewExtrema[int64](12), + Sum: 12, + }, + }, metricdatatest.IgnoreTimestamp()) +} + +func TestObsQueueMetricsBatchSize(t *testing.T) { + tt := componenttest.NewTelemetry() + t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) + + te, err := newObsQueue[request.Request](Settings[request.Request]{ + Signal: pipeline.SignalMetrics, + ID: exporterID, + Telemetry: tt.NewTelemetrySettings(), + }, newFakeQueue[request.Request](nil, 27, 29)) + require.NoError(t, err) + require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 22, Bytes: 300})) + metadatatest.AssertEqualExporterQueueBatchSendSize(t, tt, + []metricdata.HistogramDataPoint[int64]{ + { + Attributes: attribute.NewSet( + attribute.String(exporterKey, exporterID.String())), + Count: 1, + Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000}, + BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}, + Min: metricdata.NewExtrema[int64](22), + Max: metricdata.NewExtrema[int64](22), + Sum: 22, + }, + }, metricdatatest.IgnoreTimestamp()) +} diff --git a/exporter/exporterhelper/internal/request/request.go b/exporter/exporterhelper/internal/request/request.go index 425bea84d882..c0a44082d598 100644 --- a/exporter/exporterhelper/internal/request/request.go +++ b/exporter/exporterhelper/internal/request/request.go @@ -25,6 +25,8 @@ type Request interface { // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. MergeSplit(context.Context, int, SizerType, Request) ([]Request, error) + // BytesSize returns the size of the request in bytes. + BytesSize() int } // ErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial diff --git a/exporter/exporterhelper/internal/requesttest/request.go b/exporter/exporterhelper/internal/requesttest/request.go index 135f1cb77ab8..9c5c03d05509 100644 --- a/exporter/exporterhelper/internal/requesttest/request.go +++ b/exporter/exporterhelper/internal/requesttest/request.go @@ -44,6 +44,10 @@ func (r *FakeRequest) ItemsCount() int { return r.Items } +func (r *FakeRequest) BytesSize() int { + return r.Bytes +} + func (r *FakeRequest) MergeSplit(_ context.Context, maxSize int, szt request.SizerType, r2 request.Request) ([]request.Request, error) { if r.MergeErr != nil { return r.MergeErrResult, r.MergeErr diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index cfa2f1424f6a..cdac6a5e1f87 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -127,6 +127,10 @@ func (req *logsRequest) setCachedSize(size int) { req.cachedSize = size } +func (req *logsRequest) BytesSize() int { + return logsMarshaler.LogsSize(req.ld) +} + type logsExporter struct { *internal.BaseExporter consumer.Logs diff --git a/exporter/exporterhelper/metadata.yaml b/exporter/exporterhelper/metadata.yaml index 5e26f00f9ccb..0af14cce32ef 100644 --- a/exporter/exporterhelper/metadata.yaml +++ b/exporter/exporterhelper/metadata.yaml @@ -107,17 +107,33 @@ telemetry: enabled: true stability: level: alpha - description: Current size of the retry queue (in batches) + description: Current size of the retry queue (in batches). unit: "{batches}" gauge: value_type: int async: true + exporter_queue_batch_send_size: + enabled: true + description: Number of units in the batch + unit: "{units}" + histogram: + value_type: int + bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000 ] + + exporter_queue_batch_send_size_bytes: + enabled: true + description: Number of bytes in batch that was sent. Only available on detailed level. + unit: By + histogram: + value_type: int + bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000 ] + exporter_queue_capacity: enabled: true stability: level: alpha - description: Fixed capacity of the retry queue (in batches) + description: Fixed capacity of the retry queue (in batches). unit: "{batches}" gauge: value_type: int diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index aa4ca2a086a3..280eae108d57 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -126,6 +126,10 @@ func (req *metricsRequest) setCachedSize(count int) { req.cachedSize = count } +func (req *metricsRequest) BytesSize() int { + return metricsMarshaler.MetricsSize(req.md) +} + type metricsExporter struct { *internal.BaseExporter consumer.Metrics diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index a9af178122f7..96e87d6ff321 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -126,6 +126,10 @@ func (req *tracesRequest) setCachedSize(size int) { req.cachedSize = size } +func (req *tracesRequest) BytesSize() int { + return tracesMarshaler.TracesSize(req.td) +} + type tracesExporter struct { *internal.BaseExporter consumer.Traces diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index b53821cfa2f1..31a891e8491f 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -129,6 +129,10 @@ func (req *profilesRequest) setCachedSize(size int) { req.cachedSize = size } +func (req *profilesRequest) BytesSize() int { + return profilesMarshaler.ProfilesSize(req.pd) +} + type profileExporter struct { *internal.BaseExporter xconsumer.Profiles diff --git a/service/service.go b/service/service.go index 38c43caf8dd4..aa89d26c33ef 100644 --- a/service/service.go +++ b/service/service.go @@ -397,6 +397,15 @@ func configureViews(level configtelemetry.Level) []config.View { ) } + // Batch exporter metrics + if level < configtelemetry.LevelDetailed { + scope := ptr("go.opentelemetry.io/collector/exporter/exporterhelper") + views = append(views, dropViewOption(&config.ViewSelector{ + MeterName: scope, + InstrumentName: ptr("otelcol_exporter_queue_batch_send_size_bytes"), + })) + } + // Batch processor metrics scope := ptr("go.opentelemetry.io/collector/processor/batchprocessor") if level < configtelemetry.LevelNormal {