From 821ebfb18e2513aea185169785a24f9ba1f48843 Mon Sep 17 00:00:00 2001 From: Mengnan Gong Date: Sun, 20 Apr 2025 21:50:34 +0800 Subject: [PATCH] [exporter/awss3exporter] Add the retry mode, max attempts and max backoff to the settings The AWS S3 client comes with a "standard" retryer implementation enabled by default. Explicitly mention it in the documentation to avoid confusion. Also expose the retry settings so the user can have control. Signed-off-by: Mengnan Gong --- .chloggen/main.yaml | 27 ++++++++ exporter/awss3exporter/README.md | 60 ++++++++++++------ exporter/awss3exporter/config.go | 22 +++++++ exporter/awss3exporter/config_test.go | 71 ++++++++++++++++++++++ exporter/awss3exporter/factory.go | 3 + exporter/awss3exporter/s3_writer.go | 14 ++++- exporter/awss3exporter/testdata/retry.yaml | 25 ++++++++ 7 files changed, 201 insertions(+), 21 deletions(-) create mode 100644 .chloggen/main.yaml create mode 100644 exporter/awss3exporter/testdata/retry.yaml diff --git a/.chloggen/main.yaml b/.chloggen/main.yaml new file mode 100644 index 0000000000000..3dd40a62abce1 --- /dev/null +++ b/.chloggen/main.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awss3exporter + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add the retry mode, max attempts and max backoff to the settings + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [36264] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/exporter/awss3exporter/README.md b/exporter/awss3exporter/README.md index f26e26dd85ac0..de89a786c6c80 100644 --- a/exporter/awss3exporter/README.md +++ b/exporter/awss3exporter/README.md @@ -20,26 +20,29 @@ This exporter targets to support proto/json format. The following exporter configuration parameters are supported. -| Name | Description | Default | -|:--------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------| -| `region` | AWS region. | "us-east-1" | -| `s3_bucket` | S3 bucket | | -| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | -| `s3_partition_format` | filepath formatting for the partition; See [strftime](https://www.man7.org/linux/man-pages/man3/strftime.3.html) for format specification. | "year=%Y/month=%m/day=%d/hour=%H/minute=%M" | -| `role_arn` | the Role ARN to be assumed | | -| `file_prefix` | file prefix defined by user | | -| `marshaler` | marshaler used to produce output data | `otlp_json` | -| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | | -| `encoding_file_extension` | file format extension suffix when using the `encoding` configuration option. May be left empty for no suffix to be appended. | | -| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | -| `storage_class` | [S3 storageclass](https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html) | STANDARD | -| `acl` | [S3 Object Canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl) | none (does not set by default) | -| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | -| `disable_ssl` | set this to `true` to disable SSL when sending requests | false | -| `compression` | should the file be compressed | none | -| `sending_queue` | [exporters common queuing](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | disabled | -| `timeout` | [exporters common timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | 5s | -| `resource_attrs_to_s3` | determines the mapping of S3 configuration values to resource attribute values for uploading operations. | | +| Name | Description | Default | +|:--------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------------------------------------------| +| `region` | AWS region. | "us-east-1" | +| `s3_bucket` | S3 bucket | | +| `s3_prefix` | prefix for the S3 key (root directory inside bucket). | | +| `s3_partition_format` | filepath formatting for the partition; See [strftime](https://www.man7.org/linux/man-pages/man3/strftime.3.html) for format specification. | "year=%Y/month=%m/day=%d/hour=%H/minute=%M" | +| `role_arn` | the Role ARN to be assumed | | +| `file_prefix` | file prefix defined by user | | +| `marshaler` | marshaler used to produce output data | `otlp_json` | +| `encoding` | Encoding extension to use to marshal data. Overrides the `marshaler` configuration option if set. | | +| `encoding_file_extension` | file format extension suffix when using the `encoding` configuration option. May be left empty for no suffix to be appended. | | +| `endpoint` | (REST API endpoint) overrides the endpoint used by the exporter instead of constructing it from `region` and `s3_bucket` | | +| `storage_class` | [S3 storageclass](https://docs.aws.amazon.com/AmazonS3/latest/userguide/storage-class-intro.html) | STANDARD | +| `acl` | [S3 Object Canned ACL](https://docs.aws.amazon.com/AmazonS3/latest/userguide/acl-overview.html#canned-acl) | none (does not set by default) | +| `s3_force_path_style` | [set this to `true` to force the request to use path-style addressing](http://docs.aws.amazon.com/AmazonS3/latest/dev/VirtualHosting.html) | false | +| `disable_ssl` | set this to `true` to disable SSL when sending requests | false | +| `compression` | should the file be compressed | none | +| `sending_queue` | [exporters common queuing](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | disabled | +| `timeout` | [exporters common timeout](https://github.com/open-telemetry/opentelemetry-collector/blob/main/exporter/exporterhelper/README.md) | 5s | +| `resource_attrs_to_s3` | determines the mapping of S3 configuration values to resource attribute values for uploading operations. | | +| `retry_mode` | The retryer implementation, the supported values are "standard", "adaptive" and "nop". "nop" will set the retryer as `aws.NopRetryer`, which effectively disable the retry. | standard | +| `retry_max_attempts` | The max number of attempts for retrying a request if the `retry_mode` is set. Setting max attempts to 0 will allow the SDK to retry all retryable errors until the request succeeds, or a non-retryable error is returned. | 3 | +| `retry_max_backoff` | the max backoff delay that can occur before retrying a request if `retry_mode` is set | 20s | ### Marshaler @@ -141,6 +144,23 @@ metric/YYYY/MM/DD/HH/mm ... ``` +## Retry + +Standard is the default retryer implementation used by service clients. See the [retry](https://pkg.go.dev/github.com/aws/aws-sdk-go-v2/aws/retry) package documentation for details on what errors are considered as retryable by the standard retryer implementation. + +See also the [aws-sdk-go reference](https://docs.aws.amazon.com/sdk-for-go/v2/developer-guide/configure-retries-timeouts.html) for more information. + +```yaml +exporters: + awss3: + s3uploader: + region: 'eu-central-1' + s3_bucket: 'databucket' + s3_prefix: 'metric' + retry_mode: "standard" + retry_max_attempts: 5 + retry_max_backoff: "30s" +``` ## AWS Credential Configuration diff --git a/exporter/awss3exporter/config.go b/exporter/awss3exporter/config.go index 9a5f88ee6d3c5..333468ae69950 100644 --- a/exporter/awss3exporter/config.go +++ b/exporter/awss3exporter/config.go @@ -5,6 +5,7 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect import ( "errors" + "time" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/config/configcompression" @@ -12,6 +13,12 @@ import ( "go.uber.org/multierr" ) +const ( + DefaultRetryMode = "standard" + DefaultRetryMaxAttempts = 3 + DefaultRetryMaxBackoff = 20 * time.Second +) + // S3UploaderConfig contains aws s3 uploader related config to controls things // like bucket, prefix, batching, connections, retries, etc. type S3UploaderConfig struct { @@ -40,6 +47,17 @@ type S3UploaderConfig struct { // before uploading to S3. // Valid values are: `gzip` or no value set. Compression configcompression.Type `mapstructure:"compression"` + + // RetryMode specifies the retry mode for S3 client, default is "standard". + // Valid values are: "standard", "adaptive", or "nop". + // "nop" will disable retry by setting the retryer to aws.NopRetryer. + RetryMode string `mapstructure:"retry_mode"` + // RetryMaxAttempts specifies the maximum number of attempts for S3 client. + // Default is 3 (SDK default). + RetryMaxAttempts int `mapstructure:"retry_max_attempts"` + // RetryMaxBackoff specifies the maximum backoff delay for S3 client. + // Default is 20 seconds (SDK default). + RetryMaxBackoff time.Duration `mapstructure:"retry_max_backoff"` } type MarshalerType string @@ -116,5 +134,9 @@ func (c *Config) Validate() error { errs = multierr.Append(errs, errors.New("marshaler does not support compression")) } } + + if c.S3Uploader.RetryMode != "nop" && c.S3Uploader.RetryMode != "standard" && c.S3Uploader.RetryMode != "adaptive" { + errs = multierr.Append(errs, errors.New("invalid retry mode, must be either 'standard', 'adaptive' or 'nop'")) + } return errs } diff --git a/exporter/awss3exporter/config_test.go b/exporter/awss3exporter/config_test.go index 0d5989e3abc42..04d64feb91aa1 100644 --- a/exporter/awss3exporter/config_test.go +++ b/exporter/awss3exporter/config_test.go @@ -7,6 +7,7 @@ import ( "errors" "path/filepath" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -46,6 +47,9 @@ func TestLoadConfig(t *testing.T) { S3Bucket: "foo", S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_json", }, e, @@ -87,6 +91,9 @@ func TestConfig(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", Endpoint: "http://endpoint.com", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_json", }, e, @@ -119,6 +126,9 @@ func TestConfigS3StorageClass(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", Endpoint: "http://endpoint.com", StorageClass: "STANDARD_IA", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, QueueSettings: queueCfg, TimeoutSettings: timeoutCfg, @@ -154,6 +164,9 @@ func TestConfigS3ACL(t *testing.T) { Endpoint: "http://endpoint.com", StorageClass: "STANDARD", ACL: "bucket-owner-read", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, QueueSettings: queueCfg, TimeoutSettings: timeoutCfg, @@ -189,6 +202,9 @@ func TestConfigS3ACLDefined(t *testing.T) { Endpoint: "http://endpoint.com", StorageClass: "STANDARD", ACL: "bucket-owner-full-control", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, QueueSettings: queueCfg, TimeoutSettings: timeoutCfg, @@ -227,6 +243,9 @@ func TestConfigForS3CompatibleSystems(t *testing.T) { S3ForcePathStyle: true, DisableSSL: true, StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_json", }, e, @@ -340,6 +359,9 @@ func TestMarshallerName(t *testing.T) { S3Bucket: "foo", S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "sumo_ic", }, e, @@ -355,6 +377,9 @@ func TestMarshallerName(t *testing.T) { S3Bucket: "bar", S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_proto", }, e, @@ -388,6 +413,9 @@ func TestCompressionName(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", Compression: "gzip", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_json", }, e, @@ -404,6 +432,9 @@ func TestCompressionName(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", Compression: "none", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_proto", }, e, @@ -438,6 +469,9 @@ func TestResourceAttrsToS3(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", Endpoint: "http://endpoint.com", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_json", ResourceAttrsToS3: ResourceAttrsToS3{ @@ -446,3 +480,40 @@ func TestResourceAttrsToS3(t *testing.T) { }, e, ) } + +func TestRetry(t *testing.T) { + factories, err := otelcoltest.NopFactories() + assert.NoError(t, err) + + factory := NewFactory() + factories.Exporters[factory.Type()] = factory + cfg, err := otelcoltest.LoadConfigAndValidate( + filepath.Join("testdata", "retry.yaml"), factories) + + require.NoError(t, err) + require.NotNil(t, cfg) + + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false + timeoutCfg := exporterhelper.NewDefaultTimeoutConfig() + + e := cfg.Exporters[component.MustNewID("awss3")].(*Config) + + assert.Equal(t, &Config{ + QueueSettings: queueCfg, + TimeoutSettings: timeoutCfg, + S3Uploader: S3UploaderConfig{ + Region: "us-east-1", + S3Bucket: "foo", + S3Prefix: "bar", + S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", + Endpoint: "http://endpoint.com", + StorageClass: "STANDARD_IA", + RetryMode: "standard", + RetryMaxAttempts: 5, + RetryMaxBackoff: 30 * time.Second, + }, + MarshalerName: "otlp_json", + }, e, + ) +} diff --git a/exporter/awss3exporter/factory.go b/exporter/awss3exporter/factory.go index 15fbe179c1cc8..fb1b73708d2e6 100644 --- a/exporter/awss3exporter/factory.go +++ b/exporter/awss3exporter/factory.go @@ -57,6 +57,9 @@ func createDefaultConfig() component.Config { Region: "us-east-1", S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", StorageClass: "STANDARD", + RetryMode: DefaultRetryMode, + RetryMaxAttempts: DefaultRetryMaxAttempts, + RetryMaxBackoff: DefaultRetryMaxBackoff, }, MarshalerName: "otlp_json", } diff --git a/exporter/awss3exporter/s3_writer.go b/exporter/awss3exporter/s3_writer.go index e69bc6c6c8219..aa3a89d983f7e 100644 --- a/exporter/awss3exporter/s3_writer.go +++ b/exporter/awss3exporter/s3_writer.go @@ -7,6 +7,7 @@ import ( "context" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/aws/retry" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/credentials/stscreds" "github.com/aws/aws-sdk-go-v2/service/s3" @@ -28,6 +29,15 @@ func newUploadManager( configOpts = append(configOpts, config.WithRegion(region)) } + switch conf.S3Uploader.RetryMode { + case "nop": + configOpts = append(configOpts, config.WithRetryer(func() aws.Retryer { + return aws.NopRetryer{} + })) + default: + configOpts = append(configOpts, config.WithRetryMode(aws.RetryMode(conf.S3Uploader.RetryMode))) + } + cfg, err := config.LoadDefaultConfig(ctx, configOpts...) if err != nil { return nil, err @@ -39,12 +49,14 @@ func newUploadManager( DisableHTTPS: conf.S3Uploader.DisableSSL, } o.UsePathStyle = conf.S3Uploader.S3ForcePathStyle + o.Retryer = retry.AddWithMaxAttempts(o.Retryer, conf.S3Uploader.RetryMaxAttempts) + o.Retryer = retry.AddWithMaxBackoffDelay(o.Retryer, conf.S3Uploader.RetryMaxBackoff) }, } if conf.S3Uploader.Endpoint != "" { s3Opts = append(s3Opts, func(o *s3.Options) { - o.BaseEndpoint = aws.String((conf.S3Uploader.Endpoint)) + o.BaseEndpoint = aws.String(conf.S3Uploader.Endpoint) }) } diff --git a/exporter/awss3exporter/testdata/retry.yaml b/exporter/awss3exporter/testdata/retry.yaml new file mode 100644 index 0000000000000..560cd6018f234 --- /dev/null +++ b/exporter/awss3exporter/testdata/retry.yaml @@ -0,0 +1,25 @@ +receivers: + nop: + +exporters: + awss3: + s3uploader: + region: 'us-east-1' + s3_bucket: 'foo' + s3_prefix: 'bar' + s3_partition_format: 'year=%Y/month=%m/day=%d/hour=%H/minute=%M' + endpoint: "http://endpoint.com" + storage_class: "STANDARD_IA" + retry_mode: "standard" + retry_max_attempts: 5 + retry_max_backoff: "30s" + +processors: + nop: + +service: + pipelines: + traces: + receivers: [nop] + processors: [nop] + exporters: [awss3]