Skip to content
7 changes: 7 additions & 0 deletions .chloggen/awss3exporter-upload-telemetry.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
change_type: enhancement
component: exporter/awss3
note: "Add upload telemetry for the AWS S3 exporter to expose upload attempts, failures, bytes, object sizes, and last successful upload time."
issues: [43]
subtext: |-
This improves observability for parquet-backed export flows by making upload progress and failure modes visible without relying only on logs.
change_logs: [user]
47 changes: 47 additions & 0 deletions exporter/awss3exporter/documentation.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
[comment]: <> (Code generated by mdatagen. DO NOT EDIT.)
Comment thread
amir-jakoby marked this conversation as resolved.

# awss3

## Internal Telemetry

The following telemetry is emitted by this component.

### otelcol_exporter_awss3_last_successful_upload_timestamp

Unix timestamp in seconds of the last successful S3 upload

| Unit | Metric Type | Value Type | Stability |
| ---- | ----------- | ---------- | --------- |
| s | Gauge | Int | Alpha |

### otelcol_exporter_awss3_upload_attempt_total

Number of started S3 upload attempts that reached the uploader

| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| 1 | Sum | Int | true | Alpha |

### otelcol_exporter_awss3_upload_bytes

Final bytes uploaded to S3 after any exporter-level compression

| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| By | Sum | Int | true | Alpha |

### otelcol_exporter_awss3_upload_failed_total

Number of started S3 upload attempts that failed

| Unit | Metric Type | Value Type | Monotonic | Stability |
| ---- | ----------- | ---------- | --------- | --------- |
| 1 | Sum | Int | true | Alpha |

### otelcol_exporter_awss3_upload_object_size

Final size in bytes of each successfully uploaded S3 object

| Unit | Metric Type | Value Type | Stability |
| ---- | ----------- | ---------- | --------- |
| By | Histogram | Int | Alpha |
93 changes: 83 additions & 10 deletions exporter/awss3exporter/exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,21 +36,31 @@ type marshalerWithFlushMetadata interface {
type exporterTelemetry struct {
flushStart metric.Int64Counter
flushComplete metric.Int64Counter
uploadAttempt metric.Int64Counter
uploadStart metric.Int64Counter
uploadComplete metric.Int64Counter
uploadFailed metric.Int64Counter
uploadBytes metric.Int64Counter
flushDuration metric.Int64Histogram
uploadDuration metric.Int64Histogram
uploadObjectSize metric.Int64Histogram
flushToUploadDuration metric.Int64Histogram
lastSuccessfulUpload metric.Int64Gauge
}

const (
flushStartMetricName = "otelcol_exporter_awss3_flush_start_total"
flushCompleteMetricName = "otelcol_exporter_awss3_flush_complete_total"
uploadAttemptMetricName = "otelcol_exporter_awss3_upload_attempt_total"
uploadStartMetricName = "otelcol_exporter_awss3_upload_start_total"
uploadCompleteMetricName = "otelcol_exporter_awss3_upload_complete_total"
uploadFailedMetricName = "otelcol_exporter_awss3_upload_failed_total"
uploadBytesMetricName = "otelcol_exporter_awss3_upload_bytes"
flushDurationMetricName = "otelcol_exporter_awss3_flush_duration"
uploadDurationMetricName = "otelcol_exporter_awss3_upload_duration"
uploadObjectSizeMetricName = "otelcol_exporter_awss3_upload_object_size"
flushToUploadDurationMetricName = "otelcol_exporter_awss3_flush_to_upload_duration"
lastSuccessfulUploadMetricName = "otelcol_exporter_awss3_last_successful_upload_timestamp"
Comment thread
amir-jakoby marked this conversation as resolved.
)

type s3Exporter struct {
Expand Down Expand Up @@ -179,12 +189,13 @@ func (e *s3Exporter) uploadBuffer(

uploadStartedAt := time.Now()
e.telemetry.recordUploadStart(ctx, e.signalType)
err := e.uploader.Upload(ctx, buf, uploadOpts)
uploadedBytes, err := e.uploader.Upload(ctx, buf, uploadOpts)
e.telemetry.recordUploadComplete(
ctx,
e.signalType,
uploadStartedAt,
time.Since(uploadStartedAt),
uploadedBytes,
flushMeta,
err,
)
Expand All @@ -201,30 +212,63 @@ func newExporterTelemetry(settings component.TelemetrySettings, logger *zap.Logg
tel := &exporterTelemetry{}
tel.flushStart = mustCounter(meter, flushStartMetricName, logger)
tel.flushComplete = mustCounter(meter, flushCompleteMetricName, logger)
tel.uploadAttempt = mustCounter(meter, uploadAttemptMetricName, logger, metric.WithUnit("1"))
tel.uploadStart = mustCounter(meter, uploadStartMetricName, logger)
tel.uploadComplete = mustCounter(meter, uploadCompleteMetricName, logger)
tel.flushDuration = mustHistogram(meter, flushDurationMetricName, logger)
tel.uploadDuration = mustHistogram(meter, uploadDurationMetricName, logger)
tel.flushToUploadDuration = mustHistogram(meter, flushToUploadDurationMetricName, logger)
tel.uploadFailed = mustCounter(meter, uploadFailedMetricName, logger, metric.WithUnit("1"))
tel.uploadBytes = mustCounter(meter, uploadBytesMetricName, logger, metric.WithUnit("By"))
tel.flushDuration = mustHistogram(meter, flushDurationMetricName, logger, metric.WithUnit("ms"))
tel.uploadDuration = mustHistogram(meter, uploadDurationMetricName, logger, metric.WithUnit("ms"))
tel.uploadObjectSize = mustHistogram(meter, uploadObjectSizeMetricName, logger, metric.WithUnit("By"))
tel.flushToUploadDuration = mustHistogram(meter, flushToUploadDurationMetricName, logger, metric.WithUnit("ms"))
tel.lastSuccessfulUpload = mustGauge(
meter,
lastSuccessfulUploadMetricName,
logger,
metric.WithUnit("s"),
)
return tel
}

func mustCounter(meter metric.Meter, name string, logger *zap.Logger) metric.Int64Counter {
counter, err := meter.Int64Counter(name)
func mustCounter(
meter metric.Meter,
name string,
logger *zap.Logger,
opts ...metric.Int64CounterOption,
) metric.Int64Counter {
counter, err := meter.Int64Counter(name, opts...)
if err != nil && logger != nil {
logger.Warn("failed to create awss3 exporter counter", zap.String("name", name), zap.Error(err))
}
return counter
}

func mustHistogram(meter metric.Meter, name string, logger *zap.Logger) metric.Int64Histogram {
histogram, err := meter.Int64Histogram(name, metric.WithUnit("ms"))
func mustHistogram(
meter metric.Meter,
name string,
logger *zap.Logger,
opts ...metric.Int64HistogramOption,
) metric.Int64Histogram {
histogram, err := meter.Int64Histogram(name, opts...)
if err != nil && logger != nil {
logger.Warn("failed to create awss3 exporter histogram", zap.String("name", name), zap.Error(err))
}
return histogram
}

func mustGauge(
meter metric.Meter,
name string,
logger *zap.Logger,
opts ...metric.Int64GaugeOption,
) metric.Int64Gauge {
gauge, err := meter.Int64Gauge(name, opts...)
if err != nil && logger != nil {
logger.Warn("failed to create awss3 exporter gauge", zap.String("name", name), zap.Error(err))
}
return gauge
}

func (t *exporterTelemetry) recordFlushStart(ctx context.Context, signalType string) {
if t == nil || t.flushStart == nil {
return
Expand Down Expand Up @@ -253,17 +297,24 @@ func (t *exporterTelemetry) recordFlushComplete(
}

func (t *exporterTelemetry) recordUploadStart(ctx context.Context, signalType string) {
if t == nil || t.uploadStart == nil {
if t == nil {
return
}
t.uploadStart.Add(ctx, 1, metric.WithAttributes(attribute.String("signal", signalType)))
attrs := metric.WithAttributes(attribute.String("signal", signalType))
if t.uploadAttempt != nil {
t.uploadAttempt.Add(ctx, 1, attrs)
}
if t.uploadStart != nil {
t.uploadStart.Add(ctx, 1, attrs)
}
}

func (t *exporterTelemetry) recordUploadComplete(
ctx context.Context,
signalType string,
uploadStartedAt time.Time,
duration time.Duration,
uploadedBytes int64,
flushMeta flushMetadata,
err error,
) {
Expand All @@ -275,9 +326,31 @@ func (t *exporterTelemetry) recordUploadComplete(
if t.uploadComplete != nil {
t.uploadComplete.Add(ctx, 1, metric.WithAttributes(attrs...))
}
if err != nil && t.uploadFailed != nil {
t.uploadFailed.Add(ctx, 1, metric.WithAttributes(attrs...))
}
if t.uploadDuration != nil {
t.uploadDuration.Record(ctx, durationMillis(duration), metric.WithAttributes(attrs...))
}
if err == nil {
if t.uploadBytes != nil {
t.uploadBytes.Add(ctx, uploadedBytes, metric.WithAttributes(attrs...))
}
if t.uploadObjectSize != nil {
t.uploadObjectSize.Record(
ctx,
uploadedBytes,
metric.WithAttributes(attrs...),
)
}
if t.lastSuccessfulUpload != nil {
t.lastSuccessfulUpload.Record(
ctx,
uploadStartedAt.Add(duration).Unix(),
metric.WithAttributes(attrs...),
)
}
}
if t.flushToUploadDuration != nil && !flushMeta.flushCompletedAt.IsZero() {
t.flushToUploadDuration.Record(
ctx,
Expand Down
Loading
Loading