From 02fc6490d86606dda91c7ccac4ba7ddcba59c78e Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 11:39:02 -0700 Subject: [PATCH 01/10] feat(awss3exporter): add upload telemetry Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/exporter.go | 86 +++++++++++++++++++++++-- exporter/awss3exporter/exporter_test.go | 73 +++++++++++++++++++++ exporter/awss3exporter/metadata.yaml | 41 ++++++++++++ 3 files changed, 194 insertions(+), 6 deletions(-) diff --git a/exporter/awss3exporter/exporter.go b/exporter/awss3exporter/exporter.go index 490a14ea273ff..d37d39c029f8f 100644 --- a/exporter/awss3exporter/exporter.go +++ b/exporter/awss3exporter/exporter.go @@ -36,21 +36,31 @@ type marshalerWithFlushMetadata interface { type exporterTelemetry struct { flushStart metric.Int64Counter flushComplete metric.Int64Counter + uploadAttempt metric.Int64Counter uploadStart metric.Int64Counter uploadComplete metric.Int64Counter + uploadFailed metric.Int64Counter + uploadBytes metric.Int64Counter flushDuration metric.Int64Histogram uploadDuration metric.Int64Histogram + uploadObjectSize metric.Int64Histogram flushToUploadDuration metric.Int64Histogram + lastSuccessfulUpload metric.Int64Gauge } const ( flushStartMetricName = "otelcol_exporter_awss3_flush_start_total" flushCompleteMetricName = "otelcol_exporter_awss3_flush_complete_total" + uploadAttemptMetricName = "otelcol_exporter_awss3_upload_attempt_total" uploadStartMetricName = "otelcol_exporter_awss3_upload_start_total" uploadCompleteMetricName = "otelcol_exporter_awss3_upload_complete_total" + uploadFailedMetricName = "otelcol_exporter_awss3_upload_failed_total" + uploadBytesMetricName = "otelcol_exporter_awss3_upload_bytes" flushDurationMetricName = "otelcol_exporter_awss3_flush_duration" uploadDurationMetricName = "otelcol_exporter_awss3_upload_duration" + uploadObjectSizeMetricName = "otelcol_exporter_awss3_upload_object_size" flushToUploadDurationMetricName = "otelcol_exporter_awss3_flush_to_upload_duration" + lastSuccessfulUploadMetricName = "otelcol_exporter_awss3_last_successful_upload_timestamp" ) type s3Exporter struct { @@ -185,6 +195,7 @@ func (e *s3Exporter) uploadBuffer( e.signalType, uploadStartedAt, time.Since(uploadStartedAt), + int64(len(buf)), flushMeta, err, ) @@ -201,30 +212,64 @@ func newExporterTelemetry(settings component.TelemetrySettings, logger *zap.Logg tel := &exporterTelemetry{} tel.flushStart = mustCounter(meter, flushStartMetricName, logger) tel.flushComplete = mustCounter(meter, flushCompleteMetricName, logger) + tel.uploadAttempt = mustCounter(meter, uploadAttemptMetricName, logger) tel.uploadStart = mustCounter(meter, uploadStartMetricName, logger) tel.uploadComplete = mustCounter(meter, uploadCompleteMetricName, logger) + tel.uploadFailed = mustCounter(meter, uploadFailedMetricName, logger) + tel.uploadBytes = mustCounter(meter, uploadBytesMetricName, logger, metric.WithUnit("By")) tel.flushDuration = mustHistogram(meter, flushDurationMetricName, logger) tel.uploadDuration = mustHistogram(meter, uploadDurationMetricName, logger) + tel.uploadObjectSize = mustHistogram(meter, uploadObjectSizeMetricName, logger, metric.WithUnit("By")) tel.flushToUploadDuration = mustHistogram(meter, flushToUploadDurationMetricName, logger) + tel.lastSuccessfulUpload = mustGauge( + meter, + lastSuccessfulUploadMetricName, + logger, + metric.WithUnit("s"), + ) return tel } -func mustCounter(meter metric.Meter, name string, logger *zap.Logger) metric.Int64Counter { - counter, err := meter.Int64Counter(name) +func mustCounter( + meter metric.Meter, + name string, + logger *zap.Logger, + opts ...metric.Int64CounterOption, +) metric.Int64Counter { + counter, err := meter.Int64Counter(name, opts...) if err != nil && logger != nil { logger.Warn("failed to create awss3 exporter counter", zap.String("name", name), zap.Error(err)) } return counter } -func mustHistogram(meter metric.Meter, name string, logger *zap.Logger) metric.Int64Histogram { - histogram, err := meter.Int64Histogram(name, metric.WithUnit("ms")) +func mustHistogram( + meter metric.Meter, + name string, + logger *zap.Logger, + opts ...metric.Int64HistogramOption, +) metric.Int64Histogram { + opts = append([]metric.Int64HistogramOption{metric.WithUnit("ms")}, opts...) + histogram, err := meter.Int64Histogram(name, opts...) if err != nil && logger != nil { logger.Warn("failed to create awss3 exporter histogram", zap.String("name", name), zap.Error(err)) } return histogram } +func mustGauge( + meter metric.Meter, + name string, + logger *zap.Logger, + opts ...metric.Int64GaugeOption, +) metric.Int64Gauge { + gauge, err := meter.Int64Gauge(name, opts...) + if err != nil && logger != nil { + logger.Warn("failed to create awss3 exporter gauge", zap.String("name", name), zap.Error(err)) + } + return gauge +} + func (t *exporterTelemetry) recordFlushStart(ctx context.Context, signalType string) { if t == nil || t.flushStart == nil { return @@ -253,10 +298,16 @@ func (t *exporterTelemetry) recordFlushComplete( } func (t *exporterTelemetry) recordUploadStart(ctx context.Context, signalType string) { - if t == nil || t.uploadStart == nil { + if t == nil { return } - t.uploadStart.Add(ctx, 1, metric.WithAttributes(attribute.String("signal", signalType))) + attrs := metric.WithAttributes(attribute.String("signal", signalType)) + if t.uploadAttempt != nil { + t.uploadAttempt.Add(ctx, 1, attrs) + } + if t.uploadStart != nil { + t.uploadStart.Add(ctx, 1, attrs) + } } func (t *exporterTelemetry) recordUploadComplete( @@ -264,6 +315,7 @@ func (t *exporterTelemetry) recordUploadComplete( signalType string, uploadStartedAt time.Time, duration time.Duration, + uploadedBytes int64, flushMeta flushMetadata, err error, ) { @@ -275,9 +327,31 @@ func (t *exporterTelemetry) recordUploadComplete( if t.uploadComplete != nil { t.uploadComplete.Add(ctx, 1, metric.WithAttributes(attrs...)) } + if err != nil && t.uploadFailed != nil { + t.uploadFailed.Add(ctx, 1, metric.WithAttributes(attrs...)) + } if t.uploadDuration != nil { t.uploadDuration.Record(ctx, durationMillis(duration), metric.WithAttributes(attrs...)) } + if err == nil { + if t.uploadBytes != nil { + t.uploadBytes.Add(ctx, uploadedBytes, metric.WithAttributes(attrs...)) + } + if t.uploadObjectSize != nil { + t.uploadObjectSize.Record( + ctx, + uploadedBytes, + metric.WithAttributes(attrs...), + ) + } + if t.lastSuccessfulUpload != nil { + t.lastSuccessfulUpload.Record( + ctx, + uploadStartedAt.Add(duration).Unix(), + metric.WithAttributes(attrs...), + ) + } + } if t.flushToUploadDuration != nil && !flushMeta.flushCompletedAt.IsZero() { t.flushToUploadDuration.Record( ctx, diff --git a/exporter/awss3exporter/exporter_test.go b/exporter/awss3exporter/exporter_test.go index 9cdb0d1770336..dd3184a1afb03 100644 --- a/exporter/awss3exporter/exporter_test.go +++ b/exporter/awss3exporter/exporter_test.go @@ -180,29 +180,84 @@ func TestExporterRecordsEquivalentFlushAndUploadTelemetry(t *testing.T) { assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_flush_start_total") assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_flush_complete_total") + assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_upload_attempt_total") assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_upload_start_total") assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_upload_complete_total") + assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_upload_bytes") assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_flush_duration") uploadDurationPoint := requireHistogramPoint(t, tel, "otelcol_exporter_awss3_upload_duration") + uploadObjectSizePoint := requireHistogramPoint(t, tel, "otelcol_exporter_awss3_upload_object_size") flushToUploadPoint := requireHistogramPoint(t, tel, "otelcol_exporter_awss3_flush_to_upload_duration") flushCompletePoint := requireSumPoint(t, tel, "otelcol_exporter_awss3_flush_complete_total") + uploadAttemptPoint := requireSumPoint(t, tel, "otelcol_exporter_awss3_upload_attempt_total") uploadCompletePoint := requireSumPoint(t, tel, "otelcol_exporter_awss3_upload_complete_total") + uploadBytesPoint := requireSumPoint(t, tel, "otelcol_exporter_awss3_upload_bytes") + lastSuccessfulUploadPoint := requireGaugePoint( + t, + tel, + "otelcol_exporter_awss3_last_successful_upload_timestamp", + ) assert.Equal(t, int64(1), flushCompletePoint.Value) + assert.Equal(t, int64(1), uploadAttemptPoint.Value) assert.Equal(t, int64(1), uploadCompletePoint.Value) + assert.Equal(t, int64(len([]byte("payload"))), uploadBytesPoint.Value) assertMetricAttribute(t, flushCompletePoint.Attributes, "reason", expectedReason) assertMetricAttribute(t, flushCompletePoint.Attributes, "signal", "logs") assertMetricAttribute(t, flushCompletePoint.Attributes, "outcome", "success") + assertMetricAttribute(t, uploadAttemptPoint.Attributes, "signal", "logs") assertMetricAttribute(t, uploadCompletePoint.Attributes, "reason", expectedReason) assertMetricAttribute(t, uploadCompletePoint.Attributes, "signal", "logs") assertMetricAttribute(t, uploadCompletePoint.Attributes, "outcome", "success") + assertMetricAttribute(t, uploadBytesPoint.Attributes, "reason", expectedReason) + assertMetricAttribute(t, uploadBytesPoint.Attributes, "signal", "logs") + assertMetricAttribute(t, uploadBytesPoint.Attributes, "outcome", "success") assertMetricAttribute(t, flushToUploadPoint.Attributes, "reason", expectedReason) assertMetricAttribute(t, flushToUploadPoint.Attributes, "signal", "logs") assertMetricAttribute(t, flushToUploadPoint.Attributes, "outcome", "success") + assertMetricAttribute(t, uploadObjectSizePoint.Attributes, "reason", expectedReason) + assertMetricAttribute(t, uploadObjectSizePoint.Attributes, "signal", "logs") + assertMetricAttribute(t, uploadObjectSizePoint.Attributes, "outcome", "success") + assertMetricAttribute(t, lastSuccessfulUploadPoint.Attributes, "reason", expectedReason) + assertMetricAttribute(t, lastSuccessfulUploadPoint.Attributes, "signal", "logs") + assertMetricAttribute(t, lastSuccessfulUploadPoint.Attributes, "outcome", "success") assert.GreaterOrEqual(t, uploadDurationPoint.Sum, uploadDelay.Milliseconds()) + assert.Equal(t, int64(len([]byte("payload"))), uploadObjectSizePoint.Sum) assert.GreaterOrEqual(t, flushToUploadPoint.Sum, handoffGap.Milliseconds()) assert.Less(t, flushToUploadPoint.Sum, uploadDurationPoint.Sum) + assert.Greater(t, lastSuccessfulUploadPoint.Value, int64(0)) +} + +func TestExporterRecordsUploadFailureMetric(t *testing.T) { + t.Setenv("AWS_EC2_METADATA_DISABLED", "true") + + tel := componenttest.NewTelemetry() + t.Cleanup(func() { + require.NoError(t, tel.Shutdown(context.WithoutCancel(t.Context()))) + }) + + settings := exportertest.NewNopSettings(component.MustNewType("awss3")) + settings.TelemetrySettings = tel.NewTelemetrySettings() + + exporter := newS3Exporter(createDefaultConfig().(*Config), "logs", settings) + exporter.marshaler = &flushMetadataMarshaler{ + buf: []byte("payload"), + flushMeta: flushMetadata{ + reason: "manual", + flushCompletedAt: time.Now().Add(-10 * time.Millisecond), + }, + } + exporter.uploader = &uploaderStub{err: assert.AnError} + + require.ErrorIs(t, exporter.ConsumeLogs(t.Context(), getTestLogs(t)), assert.AnError) + + assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_upload_failed_total") + uploadFailedPoint := requireSumPoint(t, tel, "otelcol_exporter_awss3_upload_failed_total") + assert.Equal(t, int64(1), uploadFailedPoint.Value) + assertMetricAttribute(t, uploadFailedPoint.Attributes, "reason", "manual") + assertMetricAttribute(t, uploadFailedPoint.Attributes, "signal", "logs") + assertMetricAttribute(t, uploadFailedPoint.Attributes, "outcome", "failure") } func assertMetricDataPointCount(t *testing.T, tel *componenttest.Telemetry, name string) { @@ -216,6 +271,8 @@ func assertMetricDataPointCount(t *testing.T, tel *componenttest.Telemetry, name require.Len(t, data.DataPoints, 1) case metricdata.Histogram[int64]: require.Len(t, data.DataPoints, 1) + case metricdata.Gauge[int64]: + require.Len(t, data.DataPoints, 1) default: t.Fatalf("unexpected metric type %T for %s", metric.Data, name) } @@ -245,6 +302,22 @@ func requireSumPoint(t *testing.T, tel *componenttest.Telemetry, name string) me return sum.DataPoints[0] } +func requireGaugePoint( + t *testing.T, + tel *componenttest.Telemetry, + name string, +) metricdata.DataPoint[int64] { + t.Helper() + + metric, err := tel.GetMetric(name) + require.NoError(t, err) + + gauge, ok := metric.Data.(metricdata.Gauge[int64]) + require.True(t, ok, "metric %s is not a gauge", name) + require.Len(t, gauge.DataPoints, 1) + return gauge.DataPoints[0] +} + func assertMetricAttribute(t *testing.T, attrs attribute.Set, key, want string) { t.Helper() diff --git a/exporter/awss3exporter/metadata.yaml b/exporter/awss3exporter/metadata.yaml index 8102debda0412..afd0855974bbd 100644 --- a/exporter/awss3exporter/metadata.yaml +++ b/exporter/awss3exporter/metadata.yaml @@ -9,6 +9,47 @@ status: codeowners: active: [atoulme, pdelewski, Erog38] +telemetry: + metrics: + awss3_upload_attempt_total: + enabled: true + description: Number of started S3 upload attempts that reached the uploader + stability: alpha + unit: "1" + sum: + value_type: int + monotonic: true + awss3_upload_failed_total: + enabled: true + description: Number of started S3 upload attempts that failed + stability: alpha + unit: "1" + sum: + value_type: int + monotonic: true + awss3_upload_bytes: + enabled: true + description: Final bytes uploaded to S3 after any exporter-level compression + stability: alpha + unit: By + sum: + value_type: int + monotonic: true + awss3_upload_object_size: + enabled: true + description: Final size in bytes of each successfully uploaded S3 object + stability: alpha + unit: By + histogram: + value_type: int + awss3_last_successful_upload_timestamp: + enabled: true + description: Unix timestamp in seconds of the last successful S3 upload + stability: alpha + unit: s + gauge: + value_type: int + tests: expect_consumer_error: true goleak: From 42fd4eedfcf2cafbc38b954609fae44c459e575c Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 11:43:36 -0700 Subject: [PATCH 02/10] chore: add awss3 changelog entry Assisted-by: ChatGPT 5.4 --- .chloggen/awss3exporter-upload-telemetry.yaml | 7 +++++++ 1 file changed, 7 insertions(+) create mode 100644 .chloggen/awss3exporter-upload-telemetry.yaml diff --git a/.chloggen/awss3exporter-upload-telemetry.yaml b/.chloggen/awss3exporter-upload-telemetry.yaml new file mode 100644 index 0000000000000..7b0a518a5fece --- /dev/null +++ b/.chloggen/awss3exporter-upload-telemetry.yaml @@ -0,0 +1,7 @@ +change_type: enhancement +component: exporter/awss3 +note: "Add upload telemetry for the AWS S3 exporter to expose upload attempts, failures, bytes, object sizes, and last successful upload time." +issues: [43] +subtext: |- + This improves observability for parquet-backed export flows by making upload progress and failure modes visible without relying only on logs. +change_logs: [user] From 757e1970c2b3d8f28069b10e9d0975ae96380641 Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 11:44:32 -0700 Subject: [PATCH 03/10] test: cover awss3 upload metric failure path Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/exporter_test.go | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/exporter/awss3exporter/exporter_test.go b/exporter/awss3exporter/exporter_test.go index dd3184a1afb03..fc4b8e56a90ab 100644 --- a/exporter/awss3exporter/exporter_test.go +++ b/exporter/awss3exporter/exporter_test.go @@ -258,6 +258,10 @@ func TestExporterRecordsUploadFailureMetric(t *testing.T) { assertMetricAttribute(t, uploadFailedPoint.Attributes, "reason", "manual") assertMetricAttribute(t, uploadFailedPoint.Attributes, "signal", "logs") assertMetricAttribute(t, uploadFailedPoint.Attributes, "outcome", "failure") + + assertMetricAbsentOrNoData(t, tel, "otelcol_exporter_awss3_upload_bytes") + assertMetricAbsentOrNoData(t, tel, "otelcol_exporter_awss3_upload_object_size") + assertMetricAbsentOrNoData(t, tel, "otelcol_exporter_awss3_last_successful_upload_timestamp") } func assertMetricDataPointCount(t *testing.T, tel *componenttest.Telemetry, name string) { @@ -318,6 +322,26 @@ func requireGaugePoint( return gauge.DataPoints[0] } +func assertMetricAbsentOrNoData(t *testing.T, tel *componenttest.Telemetry, name string) { + t.Helper() + + metric, err := tel.GetMetric(name) + if err != nil { + return + } + + switch data := metric.Data.(type) { + case metricdata.Sum[int64]: + require.Len(t, data.DataPoints, 0, "metric %s should not emit points", name) + case metricdata.Histogram[int64]: + require.Len(t, data.DataPoints, 0, "metric %s should not emit points", name) + case metricdata.Gauge[int64]: + require.Len(t, data.DataPoints, 0, "metric %s should not emit points", name) + default: + t.Fatalf("unexpected metric type %T for %s", metric.Data, name) + } +} + func assertMetricAttribute(t *testing.T, attrs attribute.Set, key, want string) { t.Helper() From d3f31c8a686b0fbe5e63d020292c54c37935d355 Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 11:46:36 -0700 Subject: [PATCH 04/10] test: cover awss3 telemetry generation gaps Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/documentation.md | 47 +++++++++ .../awss3exporter/generated_package_test.go | 3 +- .../internal/metadata/generated_telemetry.go | 96 ++++++++++++++++++ .../metadata/generated_telemetry_test.go | 74 ++++++++++++++ .../metadatatest/generated_telemetrytest.go | 99 +++++++++++++++++++ .../generated_telemetrytest_test.go | 44 +++++++++ exporter/awss3exporter/metadata.yaml | 26 ++--- 7 files changed, 374 insertions(+), 15 deletions(-) create mode 100644 exporter/awss3exporter/documentation.md create mode 100644 exporter/awss3exporter/internal/metadata/generated_telemetry.go create mode 100644 exporter/awss3exporter/internal/metadata/generated_telemetry_test.go create mode 100644 exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go create mode 100644 exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go diff --git a/exporter/awss3exporter/documentation.md b/exporter/awss3exporter/documentation.md new file mode 100644 index 0000000000000..2bf615b4675d7 --- /dev/null +++ b/exporter/awss3exporter/documentation.md @@ -0,0 +1,47 @@ +[comment]: <> (Code generated by mdatagen. DO NOT EDIT.) + +# awss3 + +## Internal Telemetry + +The following telemetry is emitted by this component. + +### otelcol_awss3_last_successful_upload_timestamp + +Unix timestamp in seconds of the last successful S3 upload + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| s | Gauge | Int | Alpha | + +### otelcol_awss3_upload_attempt_total + +Number of started S3 upload attempts that reached the uploader + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| 1 | Sum | Int | true | Alpha | + +### otelcol_awss3_upload_bytes + +Final bytes uploaded to S3 after any exporter-level compression + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| By | Sum | Int | true | Alpha | + +### otelcol_awss3_upload_failed_total + +Number of started S3 upload attempts that failed + +| Unit | Metric Type | Value Type | Monotonic | Stability | +| ---- | ----------- | ---------- | --------- | --------- | +| 1 | Sum | Int | true | Alpha | + +### otelcol_awss3_upload_object_size + +Final size in bytes of each successfully uploaded S3 object + +| Unit | Metric Type | Value Type | Stability | +| ---- | ----------- | ---------- | --------- | +| By | Histogram | Int | Alpha | diff --git a/exporter/awss3exporter/generated_package_test.go b/exporter/awss3exporter/generated_package_test.go index ff834e53d2f5b..5c84f386e6d3e 100644 --- a/exporter/awss3exporter/generated_package_test.go +++ b/exporter/awss3exporter/generated_package_test.go @@ -3,9 +3,8 @@ package awss3exporter import ( - "testing" - "go.uber.org/goleak" + "testing" ) func TestMain(m *testing.M) { diff --git a/exporter/awss3exporter/internal/metadata/generated_telemetry.go b/exporter/awss3exporter/internal/metadata/generated_telemetry.go new file mode 100644 index 0000000000000..e8b937b1f090a --- /dev/null +++ b/exporter/awss3exporter/internal/metadata/generated_telemetry.go @@ -0,0 +1,96 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "errors" + "sync" + + "go.opentelemetry.io/otel/metric" + "go.opentelemetry.io/otel/trace" + + "go.opentelemetry.io/collector/component" +) + +func Meter(settings component.TelemetrySettings) metric.Meter { + return settings.MeterProvider.Meter("github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter") +} + +func Tracer(settings component.TelemetrySettings) trace.Tracer { + return settings.TracerProvider.Tracer("github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter") +} + +// TelemetryBuilder provides an interface for components to report telemetry +// as defined in metadata and user config. +type TelemetryBuilder struct { + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + Awss3LastSuccessfulUploadTimestamp metric.Int64Gauge + Awss3UploadAttemptTotal metric.Int64Counter + Awss3UploadBytes metric.Int64Counter + Awss3UploadFailedTotal metric.Int64Counter + Awss3UploadObjectSize metric.Int64Histogram +} + +// TelemetryBuilderOption applies changes to default builder. +type TelemetryBuilderOption interface { + apply(*TelemetryBuilder) +} + +type telemetryBuilderOptionFunc func(mb *TelemetryBuilder) + +func (tbof telemetryBuilderOptionFunc) apply(mb *TelemetryBuilder) { + tbof(mb) +} + +// Shutdown unregister all registered callbacks for async instruments. +func (builder *TelemetryBuilder) Shutdown() { + builder.mu.Lock() + defer builder.mu.Unlock() + for _, reg := range builder.registrations { + reg.Unregister() + } +} + +// NewTelemetryBuilder provides a struct with methods to update all internal telemetry +// for a component +func NewTelemetryBuilder(settings component.TelemetrySettings, options ...TelemetryBuilderOption) (*TelemetryBuilder, error) { + builder := TelemetryBuilder{} + for _, op := range options { + op.apply(&builder) + } + builder.meter = Meter(settings) + var err, errs error + builder.Awss3LastSuccessfulUploadTimestamp, err = builder.meter.Int64Gauge( + "otelcol_awss3_last_successful_upload_timestamp", + metric.WithDescription("Unix timestamp in seconds of the last successful S3 upload [Alpha]"), + metric.WithUnit("s"), + ) + errs = errors.Join(errs, err) + builder.Awss3UploadAttemptTotal, err = builder.meter.Int64Counter( + "otelcol_awss3_upload_attempt_total", + metric.WithDescription("Number of started S3 upload attempts that reached the uploader [Alpha]"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.Awss3UploadBytes, err = builder.meter.Int64Counter( + "otelcol_awss3_upload_bytes", + metric.WithDescription("Final bytes uploaded to S3 after any exporter-level compression [Alpha]"), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) + builder.Awss3UploadFailedTotal, err = builder.meter.Int64Counter( + "otelcol_awss3_upload_failed_total", + metric.WithDescription("Number of started S3 upload attempts that failed [Alpha]"), + metric.WithUnit("1"), + ) + errs = errors.Join(errs, err) + builder.Awss3UploadObjectSize, err = builder.meter.Int64Histogram( + "otelcol_awss3_upload_object_size", + metric.WithDescription("Final size in bytes of each successfully uploaded S3 object [Alpha]"), + metric.WithUnit("By"), + ) + errs = errors.Join(errs, err) + return &builder, errs +} diff --git a/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go b/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go new file mode 100644 index 0000000000000..80ac6a7741f51 --- /dev/null +++ b/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go @@ -0,0 +1,74 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadata + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/metric" + embeddedmetric "go.opentelemetry.io/otel/metric/embedded" + noopmetric "go.opentelemetry.io/otel/metric/noop" + "go.opentelemetry.io/otel/trace" + embeddedtrace "go.opentelemetry.io/otel/trace/embedded" + nooptrace "go.opentelemetry.io/otel/trace/noop" + + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" +) + +type mockMeter struct { + noopmetric.Meter + name string +} +type mockMeterProvider struct { + embeddedmetric.MeterProvider +} + +func (m mockMeterProvider) Meter(name string, opts ...metric.MeterOption) metric.Meter { + return mockMeter{name: name} +} + +type mockTracer struct { + nooptrace.Tracer + name string +} + +type mockTracerProvider struct { + embeddedtrace.TracerProvider +} + +func (m mockTracerProvider) Tracer(name string, opts ...trace.TracerOption) trace.Tracer { + return mockTracer{name: name} +} + +func TestProviders(t *testing.T) { + set := component.TelemetrySettings{ + MeterProvider: mockMeterProvider{}, + TracerProvider: mockTracerProvider{}, + } + + meter := Meter(set) + if m, ok := meter.(mockMeter); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter", m.name) + } else { + require.Fail(t, "returned Meter not mockMeter") + } + + tracer := Tracer(set) + if m, ok := tracer.(mockTracer); ok { + require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter", m.name) + } else { + require.Fail(t, "returned Meter not mockTracer") + } +} + +func TestNewTelemetryBuilder(t *testing.T) { + set := componenttest.NewNopTelemetrySettings() + applied := false + _, err := NewTelemetryBuilder(set, telemetryBuilderOptionFunc(func(b *TelemetryBuilder) { + applied = true + })) + require.NoError(t, err) + require.True(t, applied) +} diff --git a/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go new file mode 100644 index 0000000000000..9c60b382c6794 --- /dev/null +++ b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go @@ -0,0 +1,99 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" +) + +func NewSettings(tt *componenttest.Telemetry) exporter.Settings { + set := exportertest.NewNopSettings(exportertest.NopType) + set.ID = component.NewID(component.MustNewType("awss3")) + set.TelemetrySettings = tt.NewTelemetrySettings() + return set +} + +func AssertEqualAwss3LastSuccessfulUploadTimestamp(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_awss3_last_successful_upload_timestamp", + Description: "Unix timestamp in seconds of the last successful S3 upload [Alpha]", + Unit: "s", + Data: metricdata.Gauge[int64]{ + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_awss3_last_successful_upload_timestamp") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualAwss3UploadAttemptTotal(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_awss3_upload_attempt_total", + Description: "Number of started S3 upload attempts that reached the uploader [Alpha]", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_awss3_upload_attempt_total") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualAwss3UploadBytes(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_awss3_upload_bytes", + Description: "Final bytes uploaded to S3 after any exporter-level compression [Alpha]", + Unit: "By", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_awss3_upload_bytes") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualAwss3UploadFailedTotal(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_awss3_upload_failed_total", + Description: "Number of started S3 upload attempts that failed [Alpha]", + Unit: "1", + Data: metricdata.Sum[int64]{ + Temporality: metricdata.CumulativeTemporality, + IsMonotonic: true, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_awss3_upload_failed_total") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} + +func AssertEqualAwss3UploadObjectSize(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { + want := metricdata.Metrics{ + Name: "otelcol_awss3_upload_object_size", + Description: "Final size in bytes of each successfully uploaded S3 object [Alpha]", + Unit: "By", + Data: metricdata.Histogram[int64]{ + Temporality: metricdata.CumulativeTemporality, + DataPoints: dps, + }, + } + got, err := tt.GetMetric("otelcol_awss3_upload_object_size") + require.NoError(t, err) + metricdatatest.AssertEqual(t, want, got, opts...) +} diff --git a/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go new file mode 100644 index 0000000000000..58141f8901d27 --- /dev/null +++ b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go @@ -0,0 +1,44 @@ +// Code generated by mdatagen. DO NOT EDIT. + +package metadatatest + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel/sdk/metric/metricdata" + "go.opentelemetry.io/otel/sdk/metric/metricdata/metricdatatest" + + "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/metadata" + "go.opentelemetry.io/collector/component/componenttest" +) + +func TestSetupTelemetry(t *testing.T) { + testTel := componenttest.NewTelemetry() + tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) + require.NoError(t, err) + defer tb.Shutdown() + tb.Awss3LastSuccessfulUploadTimestamp.Record(context.Background(), 1) + tb.Awss3UploadAttemptTotal.Add(context.Background(), 1) + tb.Awss3UploadBytes.Add(context.Background(), 1) + tb.Awss3UploadFailedTotal.Add(context.Background(), 1) + tb.Awss3UploadObjectSize.Record(context.Background(), 1) + AssertEqualAwss3LastSuccessfulUploadTimestamp(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualAwss3UploadAttemptTotal(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualAwss3UploadBytes(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualAwss3UploadFailedTotal(t, testTel, + []metricdata.DataPoint[int64]{{Value: 1}}, + metricdatatest.IgnoreTimestamp()) + AssertEqualAwss3UploadObjectSize(t, testTel, + []metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(), + metricdatatest.IgnoreTimestamp()) + + require.NoError(t, testTel.Shutdown(context.Background())) +} diff --git a/exporter/awss3exporter/metadata.yaml b/exporter/awss3exporter/metadata.yaml index afd0855974bbd..298cbab6835b8 100644 --- a/exporter/awss3exporter/metadata.yaml +++ b/exporter/awss3exporter/metadata.yaml @@ -11,6 +11,13 @@ status: telemetry: metrics: + awss3_last_successful_upload_timestamp: + enabled: true + description: Unix timestamp in seconds of the last successful S3 upload + stability: alpha + unit: s + gauge: + value_type: int awss3_upload_attempt_total: enabled: true description: Number of started S3 upload attempts that reached the uploader @@ -19,19 +26,19 @@ telemetry: sum: value_type: int monotonic: true - awss3_upload_failed_total: + awss3_upload_bytes: enabled: true - description: Number of started S3 upload attempts that failed + description: Final bytes uploaded to S3 after any exporter-level compression stability: alpha - unit: "1" + unit: By sum: value_type: int monotonic: true - awss3_upload_bytes: + awss3_upload_failed_total: enabled: true - description: Final bytes uploaded to S3 after any exporter-level compression + description: Number of started S3 upload attempts that failed stability: alpha - unit: By + unit: "1" sum: value_type: int monotonic: true @@ -42,13 +49,6 @@ telemetry: unit: By histogram: value_type: int - awss3_last_successful_upload_timestamp: - enabled: true - description: Unix timestamp in seconds of the last successful S3 upload - stability: alpha - unit: s - gauge: - value_type: int tests: expect_consumer_error: true From fb5866b5b3b688f83babfb8ce5d445f1f5b02433 Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 12:06:25 -0700 Subject: [PATCH 05/10] test: fix awss3 tracer assertion message Assisted-by: ChatGPT 5.4 --- .../awss3exporter/internal/metadata/generated_telemetry_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go b/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go index 80ac6a7741f51..3c326ab353e15 100644 --- a/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go +++ b/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go @@ -59,7 +59,7 @@ func TestProviders(t *testing.T) { if m, ok := tracer.(mockTracer); ok { require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter", m.name) } else { - require.Fail(t, "returned Meter not mockTracer") + require.Fail(t, "returned Tracer not mockTracer") } } From 39c6aa0587c7087f8d49506581b8ddabecfead45 Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 12:22:46 -0700 Subject: [PATCH 06/10] fix: make awss3 histogram units explicit Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/exporter.go | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/exporter/awss3exporter/exporter.go b/exporter/awss3exporter/exporter.go index d37d39c029f8f..7ffef52468379 100644 --- a/exporter/awss3exporter/exporter.go +++ b/exporter/awss3exporter/exporter.go @@ -217,10 +217,10 @@ func newExporterTelemetry(settings component.TelemetrySettings, logger *zap.Logg tel.uploadComplete = mustCounter(meter, uploadCompleteMetricName, logger) tel.uploadFailed = mustCounter(meter, uploadFailedMetricName, logger) tel.uploadBytes = mustCounter(meter, uploadBytesMetricName, logger, metric.WithUnit("By")) - tel.flushDuration = mustHistogram(meter, flushDurationMetricName, logger) - tel.uploadDuration = mustHistogram(meter, uploadDurationMetricName, logger) + tel.flushDuration = mustHistogram(meter, flushDurationMetricName, logger, metric.WithUnit("ms")) + tel.uploadDuration = mustHistogram(meter, uploadDurationMetricName, logger, metric.WithUnit("ms")) tel.uploadObjectSize = mustHistogram(meter, uploadObjectSizeMetricName, logger, metric.WithUnit("By")) - tel.flushToUploadDuration = mustHistogram(meter, flushToUploadDurationMetricName, logger) + tel.flushToUploadDuration = mustHistogram(meter, flushToUploadDurationMetricName, logger, metric.WithUnit("ms")) tel.lastSuccessfulUpload = mustGauge( meter, lastSuccessfulUploadMetricName, @@ -249,7 +249,6 @@ func mustHistogram( logger *zap.Logger, opts ...metric.Int64HistogramOption, ) metric.Int64Histogram { - opts = append([]metric.Int64HistogramOption{metric.WithUnit("ms")}, opts...) histogram, err := meter.Int64Histogram(name, opts...) if err != nil && logger != nil { logger.Warn("failed to create awss3 exporter histogram", zap.String("name", name), zap.Error(err)) From 3274ba5af4950c9648391440720af429313eee8c Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 12:29:27 -0700 Subject: [PATCH 07/10] test: tighten awss3 telemetry assertions Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/exporter_test.go | 5 ++++- .../internal/metadata/generated_telemetry_test.go | 2 +- exporter/awss3exporter/metadata.yaml | 10 +++++----- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/exporter/awss3exporter/exporter_test.go b/exporter/awss3exporter/exporter_test.go index fc4b8e56a90ab..c4844410ffff8 100644 --- a/exporter/awss3exporter/exporter_test.go +++ b/exporter/awss3exporter/exporter_test.go @@ -175,7 +175,9 @@ func TestExporterRecordsEquivalentFlushAndUploadTelemetry(t *testing.T) { uploader := &uploaderStub{delay: uploadDelay} exporter.uploader = uploader + beforeConsume := time.Now() require.NoError(t, exporter.ConsumeLogs(t.Context(), getTestLogs(t))) + afterConsume := time.Now() require.Equal(t, 1, uploader.uploadCalls) assertMetricDataPointCount(t, tel, "otelcol_exporter_awss3_flush_start_total") @@ -226,7 +228,8 @@ func TestExporterRecordsEquivalentFlushAndUploadTelemetry(t *testing.T) { assert.Equal(t, int64(len([]byte("payload"))), uploadObjectSizePoint.Sum) assert.GreaterOrEqual(t, flushToUploadPoint.Sum, handoffGap.Milliseconds()) assert.Less(t, flushToUploadPoint.Sum, uploadDurationPoint.Sum) - assert.Greater(t, lastSuccessfulUploadPoint.Value, int64(0)) + assert.GreaterOrEqual(t, lastSuccessfulUploadPoint.Value, beforeConsume.Unix()) + assert.LessOrEqual(t, lastSuccessfulUploadPoint.Value, afterConsume.Unix()) } func TestExporterRecordsUploadFailureMetric(t *testing.T) { diff --git a/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go b/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go index 3c326ab353e15..80ac6a7741f51 100644 --- a/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go +++ b/exporter/awss3exporter/internal/metadata/generated_telemetry_test.go @@ -59,7 +59,7 @@ func TestProviders(t *testing.T) { if m, ok := tracer.(mockTracer); ok { require.Equal(t, "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter", m.name) } else { - require.Fail(t, "returned Tracer not mockTracer") + require.Fail(t, "returned Meter not mockTracer") } } diff --git a/exporter/awss3exporter/metadata.yaml b/exporter/awss3exporter/metadata.yaml index 298cbab6835b8..4be48a6a632dc 100644 --- a/exporter/awss3exporter/metadata.yaml +++ b/exporter/awss3exporter/metadata.yaml @@ -11,14 +11,14 @@ status: telemetry: metrics: - awss3_last_successful_upload_timestamp: + exporter_awss3_last_successful_upload_timestamp: enabled: true description: Unix timestamp in seconds of the last successful S3 upload stability: alpha unit: s gauge: value_type: int - awss3_upload_attempt_total: + exporter_awss3_upload_attempt_total: enabled: true description: Number of started S3 upload attempts that reached the uploader stability: alpha @@ -26,7 +26,7 @@ telemetry: sum: value_type: int monotonic: true - awss3_upload_bytes: + exporter_awss3_upload_bytes: enabled: true description: Final bytes uploaded to S3 after any exporter-level compression stability: alpha @@ -34,7 +34,7 @@ telemetry: sum: value_type: int monotonic: true - awss3_upload_failed_total: + exporter_awss3_upload_failed_total: enabled: true description: Number of started S3 upload attempts that failed stability: alpha @@ -42,7 +42,7 @@ telemetry: sum: value_type: int monotonic: true - awss3_upload_object_size: + exporter_awss3_upload_object_size: enabled: true description: Final size in bytes of each successfully uploaded S3 object stability: alpha From f3ffb4f8bae356120b4a3a4666603a65b02352b8 Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 12:59:46 -0700 Subject: [PATCH 08/10] fix: satisfy awss3 exporter lint checks Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/documentation.md | 10 +++--- exporter/awss3exporter/exporter_test.go | 6 ++-- .../internal/metadata/generated_telemetry.go | 36 +++++++++---------- .../metadatatest/generated_telemetrytest.go | 30 ++++++++-------- .../generated_telemetrytest_test.go | 20 +++++------ .../internal/upload/partition.go | 2 +- 6 files changed, 52 insertions(+), 52 deletions(-) diff --git a/exporter/awss3exporter/documentation.md b/exporter/awss3exporter/documentation.md index 2bf615b4675d7..5eba77bdea8da 100644 --- a/exporter/awss3exporter/documentation.md +++ b/exporter/awss3exporter/documentation.md @@ -6,7 +6,7 @@ The following telemetry is emitted by this component. -### otelcol_awss3_last_successful_upload_timestamp +### otelcol_exporter_awss3_last_successful_upload_timestamp Unix timestamp in seconds of the last successful S3 upload @@ -14,7 +14,7 @@ Unix timestamp in seconds of the last successful S3 upload | ---- | ----------- | ---------- | --------- | | s | Gauge | Int | Alpha | -### otelcol_awss3_upload_attempt_total +### otelcol_exporter_awss3_upload_attempt_total Number of started S3 upload attempts that reached the uploader @@ -22,7 +22,7 @@ Number of started S3 upload attempts that reached the uploader | ---- | ----------- | ---------- | --------- | --------- | | 1 | Sum | Int | true | Alpha | -### otelcol_awss3_upload_bytes +### otelcol_exporter_awss3_upload_bytes Final bytes uploaded to S3 after any exporter-level compression @@ -30,7 +30,7 @@ Final bytes uploaded to S3 after any exporter-level compression | ---- | ----------- | ---------- | --------- | --------- | | By | Sum | Int | true | Alpha | -### otelcol_awss3_upload_failed_total +### otelcol_exporter_awss3_upload_failed_total Number of started S3 upload attempts that failed @@ -38,7 +38,7 @@ Number of started S3 upload attempts that failed | ---- | ----------- | ---------- | --------- | --------- | | 1 | Sum | Int | true | Alpha | -### otelcol_awss3_upload_object_size +### otelcol_exporter_awss3_upload_object_size Final size in bytes of each successfully uploaded S3 object diff --git a/exporter/awss3exporter/exporter_test.go b/exporter/awss3exporter/exporter_test.go index c4844410ffff8..e2a37590faa0d 100644 --- a/exporter/awss3exporter/exporter_test.go +++ b/exporter/awss3exporter/exporter_test.go @@ -335,11 +335,11 @@ func assertMetricAbsentOrNoData(t *testing.T, tel *componenttest.Telemetry, name switch data := metric.Data.(type) { case metricdata.Sum[int64]: - require.Len(t, data.DataPoints, 0, "metric %s should not emit points", name) + require.Empty(t, data.DataPoints, "metric %s should not emit points", name) case metricdata.Histogram[int64]: - require.Len(t, data.DataPoints, 0, "metric %s should not emit points", name) + require.Empty(t, data.DataPoints, "metric %s should not emit points", name) case metricdata.Gauge[int64]: - require.Len(t, data.DataPoints, 0, "metric %s should not emit points", name) + require.Empty(t, data.DataPoints, "metric %s should not emit points", name) default: t.Fatalf("unexpected metric type %T for %s", metric.Data, name) } diff --git a/exporter/awss3exporter/internal/metadata/generated_telemetry.go b/exporter/awss3exporter/internal/metadata/generated_telemetry.go index e8b937b1f090a..852965827d63c 100644 --- a/exporter/awss3exporter/internal/metadata/generated_telemetry.go +++ b/exporter/awss3exporter/internal/metadata/generated_telemetry.go @@ -23,14 +23,14 @@ func Tracer(settings component.TelemetrySettings) trace.Tracer { // TelemetryBuilder provides an interface for components to report telemetry // as defined in metadata and user config. type TelemetryBuilder struct { - meter metric.Meter - mu sync.Mutex - registrations []metric.Registration - Awss3LastSuccessfulUploadTimestamp metric.Int64Gauge - Awss3UploadAttemptTotal metric.Int64Counter - Awss3UploadBytes metric.Int64Counter - Awss3UploadFailedTotal metric.Int64Counter - Awss3UploadObjectSize metric.Int64Histogram + meter metric.Meter + mu sync.Mutex + registrations []metric.Registration + ExporterAwss3LastSuccessfulUploadTimestamp metric.Int64Gauge + ExporterAwss3UploadAttemptTotal metric.Int64Counter + ExporterAwss3UploadBytes metric.Int64Counter + ExporterAwss3UploadFailedTotal metric.Int64Counter + ExporterAwss3UploadObjectSize metric.Int64Histogram } // TelemetryBuilderOption applies changes to default builder. @@ -62,32 +62,32 @@ func NewTelemetryBuilder(settings component.TelemetrySettings, options ...Teleme } builder.meter = Meter(settings) var err, errs error - builder.Awss3LastSuccessfulUploadTimestamp, err = builder.meter.Int64Gauge( - "otelcol_awss3_last_successful_upload_timestamp", + builder.ExporterAwss3LastSuccessfulUploadTimestamp, err = builder.meter.Int64Gauge( + "otelcol_exporter_awss3_last_successful_upload_timestamp", metric.WithDescription("Unix timestamp in seconds of the last successful S3 upload [Alpha]"), metric.WithUnit("s"), ) errs = errors.Join(errs, err) - builder.Awss3UploadAttemptTotal, err = builder.meter.Int64Counter( - "otelcol_awss3_upload_attempt_total", + builder.ExporterAwss3UploadAttemptTotal, err = builder.meter.Int64Counter( + "otelcol_exporter_awss3_upload_attempt_total", metric.WithDescription("Number of started S3 upload attempts that reached the uploader [Alpha]"), metric.WithUnit("1"), ) errs = errors.Join(errs, err) - builder.Awss3UploadBytes, err = builder.meter.Int64Counter( - "otelcol_awss3_upload_bytes", + builder.ExporterAwss3UploadBytes, err = builder.meter.Int64Counter( + "otelcol_exporter_awss3_upload_bytes", metric.WithDescription("Final bytes uploaded to S3 after any exporter-level compression [Alpha]"), metric.WithUnit("By"), ) errs = errors.Join(errs, err) - builder.Awss3UploadFailedTotal, err = builder.meter.Int64Counter( - "otelcol_awss3_upload_failed_total", + builder.ExporterAwss3UploadFailedTotal, err = builder.meter.Int64Counter( + "otelcol_exporter_awss3_upload_failed_total", metric.WithDescription("Number of started S3 upload attempts that failed [Alpha]"), metric.WithUnit("1"), ) errs = errors.Join(errs, err) - builder.Awss3UploadObjectSize, err = builder.meter.Int64Histogram( - "otelcol_awss3_upload_object_size", + builder.ExporterAwss3UploadObjectSize, err = builder.meter.Int64Histogram( + "otelcol_exporter_awss3_upload_object_size", metric.WithDescription("Final size in bytes of each successfully uploaded S3 object [Alpha]"), metric.WithUnit("By"), ) diff --git a/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go index 9c60b382c6794..84d4e1cc6fe3a 100644 --- a/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go +++ b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest.go @@ -21,23 +21,23 @@ func NewSettings(tt *componenttest.Telemetry) exporter.Settings { return set } -func AssertEqualAwss3LastSuccessfulUploadTimestamp(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { +func AssertEqualExporterAwss3LastSuccessfulUploadTimestamp(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_awss3_last_successful_upload_timestamp", + Name: "otelcol_exporter_awss3_last_successful_upload_timestamp", Description: "Unix timestamp in seconds of the last successful S3 upload [Alpha]", Unit: "s", Data: metricdata.Gauge[int64]{ DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_awss3_last_successful_upload_timestamp") + got, err := tt.GetMetric("otelcol_exporter_awss3_last_successful_upload_timestamp") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } -func AssertEqualAwss3UploadAttemptTotal(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { +func AssertEqualExporterAwss3UploadAttemptTotal(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_awss3_upload_attempt_total", + Name: "otelcol_exporter_awss3_upload_attempt_total", Description: "Number of started S3 upload attempts that reached the uploader [Alpha]", Unit: "1", Data: metricdata.Sum[int64]{ @@ -46,14 +46,14 @@ func AssertEqualAwss3UploadAttemptTotal(t *testing.T, tt *componenttest.Telemetr DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_awss3_upload_attempt_total") + got, err := tt.GetMetric("otelcol_exporter_awss3_upload_attempt_total") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } -func AssertEqualAwss3UploadBytes(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { +func AssertEqualExporterAwss3UploadBytes(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_awss3_upload_bytes", + Name: "otelcol_exporter_awss3_upload_bytes", Description: "Final bytes uploaded to S3 after any exporter-level compression [Alpha]", Unit: "By", Data: metricdata.Sum[int64]{ @@ -62,14 +62,14 @@ func AssertEqualAwss3UploadBytes(t *testing.T, tt *componenttest.Telemetry, dps DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_awss3_upload_bytes") + got, err := tt.GetMetric("otelcol_exporter_awss3_upload_bytes") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } -func AssertEqualAwss3UploadFailedTotal(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { +func AssertEqualExporterAwss3UploadFailedTotal(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.DataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_awss3_upload_failed_total", + Name: "otelcol_exporter_awss3_upload_failed_total", Description: "Number of started S3 upload attempts that failed [Alpha]", Unit: "1", Data: metricdata.Sum[int64]{ @@ -78,14 +78,14 @@ func AssertEqualAwss3UploadFailedTotal(t *testing.T, tt *componenttest.Telemetry DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_awss3_upload_failed_total") + got, err := tt.GetMetric("otelcol_exporter_awss3_upload_failed_total") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } -func AssertEqualAwss3UploadObjectSize(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { +func AssertEqualExporterAwss3UploadObjectSize(t *testing.T, tt *componenttest.Telemetry, dps []metricdata.HistogramDataPoint[int64], opts ...metricdatatest.Option) { want := metricdata.Metrics{ - Name: "otelcol_awss3_upload_object_size", + Name: "otelcol_exporter_awss3_upload_object_size", Description: "Final size in bytes of each successfully uploaded S3 object [Alpha]", Unit: "By", Data: metricdata.Histogram[int64]{ @@ -93,7 +93,7 @@ func AssertEqualAwss3UploadObjectSize(t *testing.T, tt *componenttest.Telemetry, DataPoints: dps, }, } - got, err := tt.GetMetric("otelcol_awss3_upload_object_size") + got, err := tt.GetMetric("otelcol_exporter_awss3_upload_object_size") require.NoError(t, err) metricdatatest.AssertEqual(t, want, got, opts...) } diff --git a/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go index 58141f8901d27..04f01c990919e 100644 --- a/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go +++ b/exporter/awss3exporter/internal/metadatatest/generated_telemetrytest_test.go @@ -19,24 +19,24 @@ func TestSetupTelemetry(t *testing.T) { tb, err := metadata.NewTelemetryBuilder(testTel.NewTelemetrySettings()) require.NoError(t, err) defer tb.Shutdown() - tb.Awss3LastSuccessfulUploadTimestamp.Record(context.Background(), 1) - tb.Awss3UploadAttemptTotal.Add(context.Background(), 1) - tb.Awss3UploadBytes.Add(context.Background(), 1) - tb.Awss3UploadFailedTotal.Add(context.Background(), 1) - tb.Awss3UploadObjectSize.Record(context.Background(), 1) - AssertEqualAwss3LastSuccessfulUploadTimestamp(t, testTel, + tb.ExporterAwss3LastSuccessfulUploadTimestamp.Record(context.Background(), 1) + tb.ExporterAwss3UploadAttemptTotal.Add(context.Background(), 1) + tb.ExporterAwss3UploadBytes.Add(context.Background(), 1) + tb.ExporterAwss3UploadFailedTotal.Add(context.Background(), 1) + tb.ExporterAwss3UploadObjectSize.Record(context.Background(), 1) + AssertEqualExporterAwss3LastSuccessfulUploadTimestamp(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) - AssertEqualAwss3UploadAttemptTotal(t, testTel, + AssertEqualExporterAwss3UploadAttemptTotal(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) - AssertEqualAwss3UploadBytes(t, testTel, + AssertEqualExporterAwss3UploadBytes(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) - AssertEqualAwss3UploadFailedTotal(t, testTel, + AssertEqualExporterAwss3UploadFailedTotal(t, testTel, []metricdata.DataPoint[int64]{{Value: 1}}, metricdatatest.IgnoreTimestamp()) - AssertEqualAwss3UploadObjectSize(t, testTel, + AssertEqualExporterAwss3UploadObjectSize(t, testTel, []metricdata.HistogramDataPoint[int64]{{}}, metricdatatest.IgnoreValue(), metricdatatest.IgnoreTimestamp()) diff --git a/exporter/awss3exporter/internal/upload/partition.go b/exporter/awss3exporter/internal/upload/partition.go index 62ef6a7de9eb6..6756c544df9e7 100644 --- a/exporter/awss3exporter/internal/upload/partition.go +++ b/exporter/awss3exporter/internal/upload/partition.go @@ -22,7 +22,7 @@ import ( // legacyTemplateFuncs provides the Sprig-compatible template functions // used by generated configs (dateInZone, now, randAlpha). var legacyTemplateFuncs = template.FuncMap{ - "now": func() time.Time { return time.Now() }, + "now": time.Now, "dateInZone": func(layout string, t time.Time, zone string) string { loc, err := time.LoadLocation(zone) if err != nil { From b5ea95d27152f3725934e85fbabbf375a2472ef9 Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 13:33:41 -0700 Subject: [PATCH 09/10] fix: report compressed awss3 upload sizes Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/exporter.go | 4 +- exporter/awss3exporter/exporter_test.go | 8 +- .../awss3exporter/internal/upload/writer.go | 11 +- .../internal/upload/writer_test.go | 102 +++++++++++++----- exporter/awss3exporter/s3_writer_test.go | 4 +- 5 files changed, 90 insertions(+), 39 deletions(-) diff --git a/exporter/awss3exporter/exporter.go b/exporter/awss3exporter/exporter.go index 7ffef52468379..3ef39cdc5ad39 100644 --- a/exporter/awss3exporter/exporter.go +++ b/exporter/awss3exporter/exporter.go @@ -189,13 +189,13 @@ func (e *s3Exporter) uploadBuffer( uploadStartedAt := time.Now() e.telemetry.recordUploadStart(ctx, e.signalType) - err := e.uploader.Upload(ctx, buf, uploadOpts) + uploadedBytes, err := e.uploader.Upload(ctx, buf, uploadOpts) e.telemetry.recordUploadComplete( ctx, e.signalType, uploadStartedAt, time.Since(uploadStartedAt), - int64(len(buf)), + uploadedBytes, flushMeta, err, ) diff --git a/exporter/awss3exporter/exporter_test.go b/exporter/awss3exporter/exporter_test.go index e2a37590faa0d..2b755f24fede1 100644 --- a/exporter/awss3exporter/exporter_test.go +++ b/exporter/awss3exporter/exporter_test.go @@ -38,10 +38,10 @@ type testWriter struct { expectedOpts *upload.UploadOptions } -func (testWriter *testWriter) Upload(_ context.Context, buf []byte, uploadOpts *upload.UploadOptions) error { +func (testWriter *testWriter) Upload(_ context.Context, buf []byte, uploadOpts *upload.UploadOptions) (int64, error) { assert.JSONEq(testWriter.t, testLogs, string(buf)) assert.Equal(testWriter.t, testWriter.expectedOpts, uploadOpts) - return nil + return int64(len(buf)), nil } func getTestLogs(tb testing.TB) plog.Logs { @@ -140,12 +140,12 @@ type uploaderStub struct { delay time.Duration } -func (u *uploaderStub) Upload(context.Context, []byte, *upload.UploadOptions) error { +func (u *uploaderStub) Upload(_ context.Context, buf []byte, _ *upload.UploadOptions) (int64, error) { u.uploadCalls++ if u.delay > 0 { time.Sleep(u.delay) } - return u.err + return int64(len(buf)), u.err } func TestExporterRecordsEquivalentFlushAndUploadTelemetry(t *testing.T) { diff --git a/exporter/awss3exporter/internal/upload/writer.go b/exporter/awss3exporter/internal/upload/writer.go index 4f85896e83ed0..dab6bb64ed5af 100644 --- a/exporter/awss3exporter/internal/upload/writer.go +++ b/exporter/awss3exporter/internal/upload/writer.go @@ -20,7 +20,7 @@ import ( ) type Manager interface { - Upload(ctx context.Context, data []byte, opts *UploadOptions) error + Upload(ctx context.Context, data []byte, opts *UploadOptions) (int64, error) } type ManagerOpt func(Manager) @@ -58,15 +58,16 @@ func NewS3Manager(logger *zap.Logger, bucket string, builder *PartitionKeyBuilde return manager } -func (sw *s3manager) Upload(ctx context.Context, data []byte, opts *UploadOptions) error { +func (sw *s3manager) Upload(ctx context.Context, data []byte, opts *UploadOptions) (int64, error) { if len(data) == 0 { - return nil + return 0, nil } content, err := sw.contentBuffer(data) if err != nil { - return err + return 0, err } + uploadedBytes := int64(content.Len()) encoding := "" // Only use ContentEncoding for non-archive formats @@ -103,7 +104,7 @@ func (sw *s3manager) Upload(ctx context.Context, data []byte, opts *UploadOption sw.logger.Debug("uploading object", zap.String("bucket", overrideBucket), zap.String("key", key)) _, err = sw.uploader.UploadObject(ctx, uploadInput) - return err + return uploadedBytes, err } func (sw *s3manager) contentBuffer(raw []byte) (*bytes.Buffer, error) { diff --git a/exporter/awss3exporter/internal/upload/writer_test.go b/exporter/awss3exporter/internal/upload/writer_test.go index 9c125d5eedf1f..a6a27624d9af3 100644 --- a/exporter/awss3exporter/internal/upload/writer_test.go +++ b/exporter/awss3exporter/internal/upload/writer_test.go @@ -4,6 +4,7 @@ package upload import ( + "bytes" "compress/gzip" "io" "net/http" @@ -41,7 +42,7 @@ func TestS3ManagerUpload(t *testing.T) { for _, tc := range []struct { name string - handler func(t *testing.T) http.Handler + handler func(t *testing.T, uploadedSize *int64) http.Handler compression configcompression.Type data []byte errVal string @@ -50,9 +51,13 @@ func TestS3ManagerUpload(t *testing.T) { }{ { name: "successful upload", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -70,8 +75,15 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "successful compression upload gzip", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) + _ = r.Body.Close() + assert.Equal( t, "/my-bucket/telemetry/year=2024/month=01/day=10/hour=10/minute=30/signal-data-noop_random.metrics.gz", @@ -79,7 +91,7 @@ func TestS3ManagerUpload(t *testing.T) { "Must match the expected path", ) - gr, err := gzip.NewReader(r.Body) + gr, err := gzip.NewReader(bytes.NewReader(body)) if !assert.NoError(t, err, "Must not error creating gzip reader") { return } @@ -89,7 +101,6 @@ func TestS3ManagerUpload(t *testing.T) { assert.NoError(t, err, "Must not error reading data from reader") _ = gr.Close() - _ = r.Body.Close() }) }, compression: configcompression.TypeGzip, @@ -99,8 +110,15 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "successful compression upload zstd", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) + _ = r.Body.Close() + assert.Equal( t, "/my-bucket/telemetry/year=2024/month=01/day=10/hour=10/minute=30/signal-data-noop_random.metrics.zst", @@ -108,7 +126,7 @@ func TestS3ManagerUpload(t *testing.T) { "Must match the expected path", ) - reader, err := zstd.NewReader(r.Body) + reader, err := zstd.NewReader(bytes.NewReader(body)) if !assert.NoError(t, err, "Must not error creating zstd reader") { return } @@ -118,7 +136,6 @@ func TestS3ManagerUpload(t *testing.T) { assert.NoError(t, err, "Must not error reading data from reader") reader.Close() - _ = r.Body.Close() }) }, compression: configcompression.TypeZstd, @@ -128,7 +145,7 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "no data upload", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, _ *int64) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = io.Copy(io.Discard, r.Body) _ = r.Body.Close() @@ -143,7 +160,7 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "failed upload", - handler: func(_ *testing.T) http.Handler { + handler: func(_ *testing.T, _ *int64) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { _, _ = io.Copy(io.Discard, r.Body) _ = r.Body.Close() @@ -157,8 +174,15 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "STANDARD_IA storage class", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) + _ = r.Body.Close() + // Example of validating that the S3 storage class header is set correctly assert.Equal(t, "STANDARD_IA", r.Header.Get("x-amz-storage-class")) }) @@ -170,9 +194,13 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "upload with s3 prefix from resource attrbuites", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -190,9 +218,13 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "upload with s3 prefix from resource attrbuites empty", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -210,9 +242,13 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "upload with s3 bucket from resource attributes", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -230,9 +266,13 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "upload with s3 bucket and prefix from resource attributes", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -250,9 +290,13 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "upload with s3 bucket override empty", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -270,9 +314,13 @@ func TestS3ManagerUpload(t *testing.T) { }, { name: "upload with s3 bucket override and custom storage class", - handler: func(t *testing.T) http.Handler { + handler: func(t *testing.T, uploadedSize *int64) http.Handler { return http.HandlerFunc(func(_ http.ResponseWriter, r *http.Request) { - _, _ = io.Copy(io.Discard, r.Body) + body, err := io.ReadAll(r.Body) + if !assert.NoError(t, err) { + return + } + *uploadedSize = int64(len(body)) _ = r.Body.Close() assert.Equal( @@ -294,7 +342,8 @@ func TestS3ManagerUpload(t *testing.T) { t.Run(tc.name, func(t *testing.T) { t.Parallel() - s := httptest.NewServer(tc.handler(t)) + var uploadedSize int64 + s := httptest.NewServer(tc.handler(t, &uploadedSize)) t.Cleanup(s.Close) sm := NewS3Manager( @@ -323,11 +372,12 @@ func TestS3ManagerUpload(t *testing.T) { // to reduce the potential of flaky tests mc := clock.NewMock(time.Date(2024, 0o1, 10, 10, 30, 40, 100, time.Local)) - err := sm.Upload(clock.Context(t.Context(), mc), tc.data, tc.uploadOpts) + reportedUploadSize, err := sm.Upload(clock.Context(t.Context(), mc), tc.data, tc.uploadOpts) if tc.errVal != "" { assert.EqualError(t, err, tc.errVal, "Must match the expected error") } else { assert.NoError(t, err, "Must not have return an error") + assert.Equal(t, uploadedSize, reportedUploadSize, "Must report the actual uploaded object size") } }) } diff --git a/exporter/awss3exporter/s3_writer_test.go b/exporter/awss3exporter/s3_writer_test.go index fc7c23347aa3b..4874379ef76b5 100644 --- a/exporter/awss3exporter/s3_writer_test.go +++ b/exporter/awss3exporter/s3_writer_test.go @@ -67,7 +67,7 @@ func newTestClientFromConfig(t *testing.T, conf Config) testUploaderClient { return captured } - err = manager.Upload(t.Context(), []byte("payload"), nil) + _, err = manager.Upload(t.Context(), []byte("payload"), nil) if !assert.NoError(t, err) { return captured } @@ -182,7 +182,7 @@ func TestUploaderOptions_StaticCredsAndRoleArnUsesAssumeRole(t *testing.T) { return } - err = manager.Upload(t.Context(), []byte("payload"), nil) + _, err = manager.Upload(t.Context(), []byte("payload"), nil) if !assert.NoError(t, err) { return } From a9e975e72ae5696b36bcc5f38d8fe7e99d67c7dd Mon Sep 17 00:00:00 2001 From: Amir Jakoby Date: Fri, 10 Apr 2026 13:45:31 -0700 Subject: [PATCH 10/10] fix: add awss3 upload counter units Assisted-by: ChatGPT 5.4 --- exporter/awss3exporter/exporter.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/awss3exporter/exporter.go b/exporter/awss3exporter/exporter.go index 3ef39cdc5ad39..b369b29bf883c 100644 --- a/exporter/awss3exporter/exporter.go +++ b/exporter/awss3exporter/exporter.go @@ -212,10 +212,10 @@ func newExporterTelemetry(settings component.TelemetrySettings, logger *zap.Logg tel := &exporterTelemetry{} tel.flushStart = mustCounter(meter, flushStartMetricName, logger) tel.flushComplete = mustCounter(meter, flushCompleteMetricName, logger) - tel.uploadAttempt = mustCounter(meter, uploadAttemptMetricName, logger) + tel.uploadAttempt = mustCounter(meter, uploadAttemptMetricName, logger, metric.WithUnit("1")) tel.uploadStart = mustCounter(meter, uploadStartMetricName, logger) tel.uploadComplete = mustCounter(meter, uploadCompleteMetricName, logger) - tel.uploadFailed = mustCounter(meter, uploadFailedMetricName, logger) + tel.uploadFailed = mustCounter(meter, uploadFailedMetricName, logger, metric.WithUnit("1")) tel.uploadBytes = mustCounter(meter, uploadBytesMetricName, logger, metric.WithUnit("By")) tel.flushDuration = mustHistogram(meter, flushDurationMetricName, logger, metric.WithUnit("ms")) tel.uploadDuration = mustHistogram(meter, uploadDurationMetricName, logger, metric.WithUnit("ms"))