Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .chloggen/awss3exporter-parquet-compat.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
change_type: enhancement
component: exporter/awss3
note: Add Sawmills awss3 legacy config compatibility and parquet log encoding support to the contrib fork.
issues: [37]
subtext: |-
This accepts legacy `s3_partition`, static credential, SSE, and key-template config used by generated Sawmills configs, and restores the parquet flush bridge needed for the migrated exporter path.
42 changes: 42 additions & 0 deletions exporter/awss3exporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ package awss3exporter // import "github.com/open-telemetry/opentelemetry-collect

import (
"errors"
"fmt"
"time"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configcompression"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.uber.org/multierr"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/upload"
)

const (
Expand All @@ -30,6 +33,8 @@ type S3UploaderConfig struct {
S3BasePrefix string `mapstructure:"s3_base_prefix"`
// S3Prefix is the key (directory) prefix to write to inside the bucket. Appended to S3BasePrefix if provided.
S3Prefix string `mapstructure:"s3_prefix"`
// S3Partition is the legacy partition selector used by generated configs.
S3Partition string `mapstructure:"s3_partition"`
// S3PartitionFormat is used to provide the rollup on how data is written. Uses [strftime](https://www.man7.org/linux/man-pages/man3/strftime.3.html) formatting.
S3PartitionFormat string `mapstructure:"s3_partition_format"`
// S3PartitionTimezone is used to provide timezone for partition time. Defaults to Local timezone.
Expand All @@ -40,6 +45,14 @@ type S3UploaderConfig struct {
Endpoint string `mapstructure:"endpoint"`
// RoleArn is the role policy to use when interacting with S3
RoleArn string `mapstructure:"role_arn"`
// AccessKeyID is the legacy static AWS access key used by generated configs.
AccessKeyID string `mapstructure:"access_key_id"`
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
// SecretAccessKey is the legacy static AWS secret used by generated configs.
SecretAccessKey string `mapstructure:"secret_access_key"`
// ServerSideEncryption is the legacy SSE setting used by generated configs.
ServerSideEncryption string `mapstructure:"server_side_encryption"`
// S3KeyTemplate is the legacy key template used by datadog JSON configs.
S3KeyTemplate string `mapstructure:"s3_key_template"`
// S3ForcePathStyle sets the value for force path style.
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
// DisableSLL forces communication to happen via HTTP instead of HTTPS.
Expand Down Expand Up @@ -102,6 +115,21 @@ type Config struct {
ResourceAttrsToS3 ResourceAttrsToS3 `mapstructure:"resource_attrs_to_s3"`
}

func (c *Config) normalizedS3PartitionFormat() string {
if c.S3Uploader.S3PartitionFormat != "" {
return c.S3Uploader.S3PartitionFormat
}

switch c.S3Uploader.S3Partition {
case "hour":
return "year=%Y/month=%m/day=%d/hour=%H"
case "minute":
return "year=%Y/month=%m/day=%d/hour=%H/minute=%M"
default:
return c.S3Uploader.S3PartitionFormat
}
}
Comment thread
cursor[bot] marked this conversation as resolved.

func (c *Config) Validate() error {
var errs error
validStorageClasses := map[string]bool{
Expand Down Expand Up @@ -133,6 +161,20 @@ func (c *Config) Validate() error {
if c.S3Uploader.S3Bucket == "" && c.S3Uploader.Endpoint == "" {
errs = multierr.Append(errs, errors.New("bucket or endpoint is required"))
}
if c.S3Uploader.S3Partition != "" && c.S3Uploader.S3Partition != "hour" && c.S3Uploader.S3Partition != "minute" {
errs = multierr.Append(errs, errors.New("invalid s3_partition"))
}
if (c.S3Uploader.AccessKeyID == "") != (c.S3Uploader.SecretAccessKey == "") {
errs = multierr.Append(errs, errors.New("access_key_id and secret_access_key must be set together"))
}
if c.S3Uploader.S3KeyTemplate != "" {
tmpl, err := upload.ParseLegacyTemplateForValidation(c.S3Uploader.S3KeyTemplate)
if err != nil {
errs = multierr.Append(errs, fmt.Errorf("invalid s3_key_template: %w", err))
} else if err := upload.ValidateLegacyTemplateForValidation(tmpl); err != nil {
errs = multierr.Append(errs, fmt.Errorf("invalid s3_key_template: %w", err))
}
Comment thread
cursor[bot] marked this conversation as resolved.
}

if !validStorageClasses[c.S3Uploader.StorageClass] {
errs = multierr.Append(errs, errors.New("invalid StorageClass"))
Expand Down
227 changes: 227 additions & 0 deletions exporter/awss3exporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package awss3exporter

import (
"errors"
"os"
"path/filepath"
"testing"
"time"
Expand All @@ -20,6 +21,232 @@ import (
"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/awss3exporter/internal/metadata"
)

func loadConfig(t *testing.T, fixture string) *Config {
t.Helper()
t.Setenv("S3_PREFIX", "123456")
t.Setenv("S3_ACCESS_KEY_ID", "123456")
t.Setenv("S3_SECRET_ACCESS_KEY", "123456")

factories, err := otelcoltest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[metadata.Type] = factory

cfg, err := otelcoltest.LoadConfigAndValidate(filepath.Join("testdata", fixture), factories)
require.NoError(t, err)
require.NotNil(t, cfg)

return cfg.Exporters[component.MustNewID("awss3")].(*Config)
}

func loadConfigFromYAML(t *testing.T, yaml string) (*Config, error) {
t.Helper()

dir := t.TempDir()
fixture := filepath.Join(dir, "config.yaml")
require.NoError(t, os.WriteFile(fixture, []byte(yaml), 0o600))

factories, err := otelcoltest.NopFactories()
require.NoError(t, err)

factory := NewFactory()
factories.Exporters[metadata.Type] = factory

cfg, err := otelcoltest.LoadConfigAndValidate(fixture, factories)
if err != nil {
return nil, err
}

return cfg.Exporters[component.MustNewID("awss3")].(*Config), nil
}

func TestLoadConfig_LegacyFields(t *testing.T) {
cfg := loadConfig(t, "legacy-config.yaml")

assert.Equal(t, "minute", cfg.S3Uploader.S3Partition)
assert.Equal(t, "123456", cfg.S3Uploader.S3Prefix)
assert.Equal(t, "123456", cfg.S3Uploader.AccessKeyID)
assert.Equal(t, "123456", cfg.S3Uploader.SecretAccessKey)
assert.Equal(t, "AES256", cfg.S3Uploader.ServerSideEncryption)
assert.Equal(t, "year=%Y/month=%m/day=%d/hour=%H/minute=%M", cfg.normalizedS3PartitionFormat())
}

func TestLoadConfig_LegacyDatadogTemplate(t *testing.T) {
cfg := loadConfig(t, "legacy-datadog-json.yaml")

assert.Equal(t, "{{.Prefix}}/{{.Date}}/{{.UUID}}.json.gz", cfg.S3Uploader.S3KeyTemplate)
require.NotNil(t, cfg.Encoding)
assert.Equal(t, "datadog_log_encoding/awss3/destination-json", cfg.Encoding.String())
}

func TestLoadConfig_LegacyParquetEncoding(t *testing.T) {
cfg := loadConfig(t, "legacy-parquet.yaml")

assert.Equal(t, "parquet", cfg.EncodingFileExtension)
require.NotNil(t, cfg.Encoding)
assert.Equal(t, "parquet_log_encoding/awss3/destination-parquet", cfg.Encoding.String())
}

func TestLoadConfig_LegacyPartitionValidation(t *testing.T) {
_, err := loadConfigFromYAML(t, `receivers:
nop:

exporters:
awss3:
s3uploader:
region: us-east-1
s3_bucket: sawmills-test
s3_partition: fortnight

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]
`)

require.Error(t, err)
assert.Contains(t, err.Error(), "invalid s3_partition")
}

func TestLoadConfig_LegacyMalformedTypes(t *testing.T) {
_, err := loadConfigFromYAML(t, `receivers:
nop:

exporters:
awss3:
s3uploader:
region: us-east-1
s3_bucket: sawmills-test
s3_partition: minute
s3_prefix: 123456

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]
`)

require.Error(t, err)
assert.Contains(t, err.Error(), "s3uploader.s3_prefix")
}

func TestLoadConfig_LegacyPartitionFormatPrecedence(t *testing.T) {
cfg, err := loadConfigFromYAML(t, `receivers:
nop:

exporters:
awss3:
s3uploader:
region: us-east-1
s3_bucket: sawmills-test
s3_partition: minute
s3_partition_format: "%Y/%m/%d/%H"

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]
`)

require.NoError(t, err)
require.NotNil(t, cfg)
assert.Equal(t, "%Y/%m/%d/%H", cfg.normalizedS3PartitionFormat())
}

func TestLoadConfig_LegacyTemplateValidation(t *testing.T) {
_, err := loadConfigFromYAML(t, `receivers:
nop:

exporters:
awss3:
s3uploader:
region: us-east-1
s3_bucket: sawmills-test
s3_key_template: "{{.Prefix"

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]
`)

require.Error(t, err)
assert.Contains(t, err.Error(), "invalid s3_key_template")
}

func TestLoadConfig_LegacyTemplateValidationRejectsUnknownFields(t *testing.T) {
_, err := loadConfigFromYAML(t, `receivers:
nop:

exporters:
awss3:
s3uploader:
region: us-east-1
s3_bucket: sawmills-test
s3_key_template: "{{.Prefix}}/{{.Bogus}}/{{.UUID}}"

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]
`)

require.Error(t, err)
assert.Contains(t, err.Error(), "invalid s3_key_template")
}

func TestLoadConfig_StaticCredentialsMustBePaired(t *testing.T) {
_, err := loadConfigFromYAML(t, `receivers:
nop:

exporters:
awss3:
s3uploader:
region: us-east-1
s3_bucket: sawmills-test
access_key_id: only-key

processors:
nop:

service:
pipelines:
traces:
receivers: [nop]
processors: [nop]
exporters: [awss3]
`)

require.Error(t, err)
assert.Contains(t, err.Error(), "access_key_id and secret_access_key must be set together")
}

func TestLoadConfig(t *testing.T) {
factories, err := otelcoltest.NopFactories()
assert.NoError(t, err)
Expand Down
Loading
Loading