Skip to content
Merged
25 changes: 25 additions & 0 deletions .chloggen/12894.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add new `exporter_queue_batch_send_size` and `exporter_queue_batch_send_size_bytes` metrics, showing the size of telemetry batches from the exporter.

# One or more tracking issues or pull requests related to the change
issues: [12894]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
20 changes: 18 additions & 2 deletions exporter/exporterhelper/documentation.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,17 +30,33 @@ Number of spans failed to be added to the sending queue. [alpha]
| ---- | ----------- | ---------- | --------- |
| {spans} | Sum | Int | true |

### otelcol_exporter_queue_batch_send_size

Number of units in the batch

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {units} | Histogram | Int |

### otelcol_exporter_queue_batch_send_size_bytes

Number of bytes in batch that was sent. Only available on detailed level.

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| By | Histogram | Int |

### otelcol_exporter_queue_capacity

Fixed capacity of the retry queue (in batches) [alpha]
Fixed capacity of the retry queue (in batches). [alpha]

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
| {batches} | Gauge | Int |

### otelcol_exporter_queue_size

Current size of the retry queue (in batches) [alpha]
Current size of the retry queue (in batches). [alpha]

| Unit | Metric Type | Value Type |
| ---- | ----------- | ---------- |
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 12 additions & 4 deletions exporter/exporterhelper/internal/queue/obs_queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,12 @@ const (
// obsQueue is a helper to add observability to a queue.
type obsQueue[T request.Request] struct {
Queue[T]
tb *metadata.TelemetryBuilder
metricAttr metric.MeasurementOption
enqueueFailedInst metric.Int64Counter
tracer trace.Tracer
tb *metadata.TelemetryBuilder
metricAttr metric.MeasurementOption
enqueueFailedInst metric.Int64Counter
queueBatchSizeInst metric.Int64Histogram
queueBatchSizeBytesInst metric.Int64Histogram
tracer trace.Tracer
}

func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T], error) {
Expand Down Expand Up @@ -74,6 +76,9 @@ func newObsQueue[T request.Request](set Settings[T], delegate Queue[T]) (Queue[T
or.enqueueFailedInst = tb.ExporterEnqueueFailedLogRecords
}

or.queueBatchSizeInst = tb.ExporterQueueBatchSendSize
or.queueBatchSizeBytesInst = tb.ExporterQueueBatchSendSizeBytes

return or, nil
}

Expand All @@ -87,6 +92,9 @@ func (or *obsQueue[T]) Offer(ctx context.Context, req T) error {
// be modified by the downstream components like the batcher.
numItems := req.ItemsCount()

or.queueBatchSizeInst.Record(ctx, int64(numItems), or.metricAttr)
or.queueBatchSizeBytesInst.Record(ctx, int64(req.BytesSize()), or.metricAttr)

ctx, span := or.tracer.Start(ctx, "exporter/enqueue")
err := or.Queue.Offer(ctx, req)
span.End()
Expand Down
78 changes: 78 additions & 0 deletions exporter/exporterhelper/internal/queue/obs_queue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,3 +202,81 @@ func TestObsQueueMetricsFailure(t *testing.T) {
},
}, metricdatatest.IgnoreTimestamp(), metricdatatest.IgnoreExemplars())
}

func TestObsQueueLogsBatchSize(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](Settings[request.Request]{
Signal: pipeline.SignalLogs,
ID: exporterID,
Telemetry: tt.NewTelemetrySettings(),
}, newFakeQueue[request.Request](nil, 7, 9))
require.NoError(t, err)
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 2, Bytes: 100}))
metadatatest.AssertEqualExporterQueueBatchSendSize(t, tt,
[]metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(exporterKey, exporterID.String())),
Count: 1,
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Min: metricdata.NewExtrema[int64](2),
Max: metricdata.NewExtrema[int64](2),
Sum: 2,
},
}, metricdatatest.IgnoreTimestamp())
}

func TestObsQueueTracesBatchSize(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](Settings[request.Request]{
Signal: pipeline.SignalTraces,
ID: exporterID,
Telemetry: tt.NewTelemetrySettings(),
}, newFakeQueue[request.Request](nil, 17, 19))
require.NoError(t, err)
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 12, Bytes: 200}))
metadatatest.AssertEqualExporterQueueBatchSendSize(t, tt,
[]metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(exporterKey, exporterID.String())),
Count: 1,
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Min: metricdata.NewExtrema[int64](12),
Max: metricdata.NewExtrema[int64](12),
Sum: 12,
},
}, metricdatatest.IgnoreTimestamp())
}

func TestObsQueueMetricsBatchSize(t *testing.T) {
tt := componenttest.NewTelemetry()
t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) })

te, err := newObsQueue[request.Request](Settings[request.Request]{
Signal: pipeline.SignalMetrics,
ID: exporterID,
Telemetry: tt.NewTelemetrySettings(),
}, newFakeQueue[request.Request](nil, 27, 29))
require.NoError(t, err)
require.NoError(t, te.Offer(context.Background(), &requesttest.FakeRequest{Items: 22, Bytes: 300}))
metadatatest.AssertEqualExporterQueueBatchSendSize(t, tt,
[]metricdata.HistogramDataPoint[int64]{
{
Attributes: attribute.NewSet(
attribute.String(exporterKey, exporterID.String())),
Count: 1,
Bounds: []float64{10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000},
BucketCounts: []uint64{0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0},
Min: metricdata.NewExtrema[int64](22),
Max: metricdata.NewExtrema[int64](22),
Sum: 22,
},
}, metricdatatest.IgnoreTimestamp())
}
2 changes: 2 additions & 0 deletions exporter/exporterhelper/internal/request/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ type Request interface {
// Experimental: This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
MergeSplit(context.Context, int, SizerType, Request) ([]Request, error)
// BytesSize returns the size of the request in bytes.
BytesSize() int
}

// ErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/internal/requesttest/request.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,10 @@ func (r *FakeRequest) ItemsCount() int {
return r.Items
}

func (r *FakeRequest) BytesSize() int {
return r.Bytes
}

func (r *FakeRequest) MergeSplit(_ context.Context, maxSize int, szt request.SizerType, r2 request.Request) ([]request.Request, error) {
if r.MergeErr != nil {
return r.MergeErrResult, r.MergeErr
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ func (req *logsRequest) setCachedSize(size int) {
req.cachedSize = size
}

func (req *logsRequest) BytesSize() int {
return logsMarshaler.LogsSize(req.ld)
}

type logsExporter struct {
*internal.BaseExporter
consumer.Logs
Expand Down
20 changes: 18 additions & 2 deletions exporter/exporterhelper/metadata.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -107,17 +107,33 @@ telemetry:
enabled: true
stability:
level: alpha
description: Current size of the retry queue (in batches)
description: Current size of the retry queue (in batches).
unit: "{batches}"
gauge:
value_type: int
async: true

exporter_queue_batch_send_size:
enabled: true
description: Number of units in the batch
unit: "{units}"
histogram:
value_type: int
bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000, 7000, 8000, 9000, 10000, 20000, 30000, 50000, 100000 ]

exporter_queue_batch_send_size_bytes:
enabled: true
description: Number of bytes in batch that was sent. Only available on detailed level.
unit: By
histogram:
value_type: int
bucket_boundaries: [ 10, 25, 50, 75, 100, 250, 500, 750, 1000, 2000, 3000, 4000, 5000, 6000 ]

exporter_queue_capacity:
enabled: true
stability:
level: alpha
description: Fixed capacity of the retry queue (in batches)
description: Fixed capacity of the retry queue (in batches).
unit: "{batches}"
gauge:
value_type: int
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (req *metricsRequest) setCachedSize(count int) {
req.cachedSize = count
}

func (req *metricsRequest) BytesSize() int {
return metricsMarshaler.MetricsSize(req.md)
}

type metricsExporter struct {
*internal.BaseExporter
consumer.Metrics
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/traces.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,10 @@ func (req *tracesRequest) setCachedSize(size int) {
req.cachedSize = size
}

func (req *tracesRequest) BytesSize() int {
return tracesMarshaler.TracesSize(req.td)
}

type tracesExporter struct {
*internal.BaseExporter
consumer.Traces
Expand Down
4 changes: 4 additions & 0 deletions exporter/exporterhelper/xexporterhelper/profiles.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,10 @@ func (req *profilesRequest) setCachedSize(size int) {
req.cachedSize = size
}

func (req *profilesRequest) BytesSize() int {
return profilesMarshaler.ProfilesSize(req.pd)
}

type profileExporter struct {
*internal.BaseExporter
xconsumer.Profiles
Expand Down
Loading
Loading