diff --git a/.chloggen/avoid-extra-copy.yaml b/.chloggen/avoid-extra-copy.yaml index 4ed25275ab1b2..eb2da58385f8b 100644 --- a/.chloggen/avoid-extra-copy.yaml +++ b/.chloggen/avoid-extra-copy.yaml @@ -2,7 +2,7 @@ change_type: enhancement # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: coralogixexporter +component: exporter/coralogix # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: Add Automatic AWS PrivateLink set up via new `private_link` configuration option diff --git a/.chloggen/config.yaml b/.chloggen/config.yaml index e5fd1bda66af8..a82011d2fb1e8 100644 --- a/.chloggen/config.yaml +++ b/.chloggen/config.yaml @@ -55,6 +55,7 @@ components: - exporter/honeycombmarker - exporter/influxdb - exporter/kafka + - exporter/kedascaler - exporter/loadbalancing - exporter/logicmonitor - exporter/logzio @@ -190,10 +191,12 @@ components: - processor/groupbyattrs - processor/groupbytrace - processor/hetznerdetector + - processor/hotreloadprocessor - processor/interval - processor/isolationforest - processor/k8sattributes - processor/logdedup + - processor/logstometricsprocessor - processor/logstransform - processor/metricsgeneration - processor/metricstarttime @@ -241,6 +244,7 @@ components: - receiver/collectd - receiver/couchdb - receiver/datadog + - receiver/datadoglog - receiver/docker_stats - receiver/elasticsearch - receiver/envoyals diff --git a/.chloggen/oracle_query_traceid_fix.yaml b/.chloggen/oracle_query_traceid_fix.yaml index febd9d678d7cc..e1d85b3f4b8f4 100644 --- a/.chloggen/oracle_query_traceid_fix.yaml +++ b/.chloggen/oracle_query_traceid_fix.yaml @@ -4,7 +4,7 @@ change_type: bug_fix # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: oracledbreceiver +component: receiver/oracledb # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: Fix for wrong trace id in oracle top query records diff --git a/.chloggen/saw-6556-loadbalancing-compressed-queue.yaml b/.chloggen/saw-6556-loadbalancing-compressed-queue.yaml new file mode 100644 index 0000000000000..5cae9ad7eb014 --- /dev/null +++ b/.chloggen/saw-6556-loadbalancing-compressed-queue.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: exporter/loadbalancing + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add optional sending queue payload compression (snappy or zstd) for the loadbalancing exporter, with an opt-in mode to store in-memory queue entries as encoded payloads. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [6556] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + Persistent queue payloads are encoded with a queue header and decoded on dequeue. + In-memory queue compression is opt-in via `sending_queue.compress_in_memory`. + Zstd encoder/decoder resources are cleaned up on exporter shutdown. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: ['user'] diff --git a/.chloggen/upgrade-contrib-version.yaml b/.chloggen/upgrade-contrib-version.yaml index b125deb19b448..e3234b66c2182 100644 --- a/.chloggen/upgrade-contrib-version.yaml +++ b/.chloggen/upgrade-contrib-version.yaml @@ -4,7 +4,7 @@ change_type: enhancement # The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) -component: core +component: all # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). note: Update dependencies to v0.136.0 and v0.136.1, including improvements to kedascalerexporter error handling and datadog log processing optimization diff --git a/exporter/loadbalancingexporter/README.md b/exporter/loadbalancingexporter/README.md index 9a8b9bab21548..fd0c7500a87f2 100644 --- a/exporter/loadbalancingexporter/README.md +++ b/exporter/loadbalancingexporter/README.md @@ -236,6 +236,17 @@ service: - loadbalancing ``` +To compress payloads in an in-memory sending queue, set: + +```yaml +exporters: + loadbalancing: + sending_queue: + enabled: true + payload_compression: zstd + compress_in_memory: true +``` + Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md)) > [!IMPORTANT] > The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information. diff --git a/exporter/loadbalancingexporter/config.go b/exporter/loadbalancingexporter/config.go index 9ebb76b9adeb6..c623db7c58555 100644 --- a/exporter/loadbalancingexporter/config.go +++ b/exporter/loadbalancingexporter/config.go @@ -4,6 +4,7 @@ package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" import ( + "fmt" "time" "github.com/aws/aws-sdk-go-v2/service/servicediscovery/types" @@ -37,7 +38,7 @@ const ( type Config struct { TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"` configretry.BackOffConfig `mapstructure:"retry_on_failure"` - QueueSettings exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"` + QueueSettings QueueSettings `mapstructure:"sending_queue"` Protocol Protocol `mapstructure:"protocol"` Resolver ResolverSettings `mapstructure:"resolver"` @@ -52,6 +53,45 @@ type Config struct { RoutingAttributes []string `mapstructure:"routing_attributes"` } +type QueueSettings struct { + exporterhelper.QueueBatchConfig `mapstructure:",squash"` + PayloadCompression QueuePayloadCompression `mapstructure:"payload_compression"` + CompressInMemory bool `mapstructure:"compress_in_memory"` +} + +type QueuePayloadCompression string + +const ( + QueuePayloadCompressionNone QueuePayloadCompression = "none" + QueuePayloadCompressionSnappy QueuePayloadCompression = "snappy" + QueuePayloadCompressionZstd QueuePayloadCompression = "zstd" +) + +func (q QueueSettings) Validate() error { + if err := q.QueueBatchConfig.Validate(); err != nil { + return err + } + switch q.PayloadCompression { + case "", QueuePayloadCompressionNone, QueuePayloadCompressionSnappy, QueuePayloadCompressionZstd: + // Valid payload compression value. + default: + return fmt.Errorf("sending_queue.payload_compression must be one of [none, snappy, zstd], found %q", q.PayloadCompression) + } + + if q.CompressInMemory && !q.Enabled { + return fmt.Errorf("sending_queue.compress_in_memory requires sending_queue.enabled=true") + } + if q.CompressInMemory && (q.PayloadCompression == "" || q.PayloadCompression == QueuePayloadCompressionNone) { + return fmt.Errorf("sending_queue.compress_in_memory requires sending_queue.payload_compression to be set to snappy or zstd") + } + + return nil +} + +func (cfg *Config) Validate() error { + return cfg.QueueSettings.Validate() +} + // Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment. type Protocol struct { OTLP otlpexporter.Config `mapstructure:"otlp"` diff --git a/exporter/loadbalancingexporter/config_test.go b/exporter/loadbalancingexporter/config_test.go index c34644174d085..0c1ca287c3f10 100644 --- a/exporter/loadbalancingexporter/config_test.go +++ b/exporter/loadbalancingexporter/config_test.go @@ -25,3 +25,30 @@ func TestLoadConfig(t *testing.T) { require.NoError(t, sub.Unmarshal(cfg)) require.NotNil(t, cfg) } + +func TestConfigValidatePayloadCompression(t *testing.T) { + cfg := createDefaultConfig().(*Config) + require.NoError(t, cfg.Validate()) + + cfg.QueueSettings.Enabled = true + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy + require.NoError(t, cfg.Validate()) + + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionZstd + require.NoError(t, cfg.Validate()) + + cfg.QueueSettings.PayloadCompression = QueuePayloadCompression("invalid") + require.Error(t, cfg.Validate()) +} + +func TestConfigValidateCompressInMemory(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.QueueSettings.CompressInMemory = true + require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory requires sending_queue.enabled=true") + + cfg.QueueSettings.Enabled = true + require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory requires sending_queue.payload_compression") + + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy + require.NoError(t, cfg.Validate()) +} diff --git a/exporter/loadbalancingexporter/factory.go b/exporter/loadbalancingexporter/factory.go index 8b7ef81acba44..a6720d7309975 100644 --- a/exporter/loadbalancingexporter/factory.go +++ b/exporter/loadbalancingexporter/factory.go @@ -15,6 +15,7 @@ import ( "go.opentelemetry.io/collector/exporter/otlpexporter" metricnoop "go.opentelemetry.io/otel/metric/noop" tracenoop "go.opentelemetry.io/otel/trace/noop" + "go.uber.org/multierr" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata" @@ -39,6 +40,8 @@ func createDefaultConfig() component.Config { otlpFactory := otlpexporter.NewFactory() otlpDefaultCfg := otlpFactory.CreateDefaultConfig().(*otlpexporter.Config) otlpDefaultCfg.ClientConfig.Endpoint = "placeholder:4317" + queueCfg := exporterhelper.NewDefaultQueueConfig() + queueCfg.Enabled = false return &Config{ // By default we disable resilience options on loadbalancing exporter level @@ -46,6 +49,10 @@ func createDefaultConfig() component.Config { Protocol: Protocol{ OTLP: *otlpDefaultCfg, }, + QueueSettings: QueueSettings{ + QueueBatchConfig: queueCfg, + PayloadCompression: QueuePayloadCompressionNone, + }, } } @@ -71,12 +78,18 @@ func buildExporterSettings(typ component.Type, params exporter.Settings, endpoin return params } -func buildExporterResilienceOptions(options []exporterhelper.Option, cfg *Config) []exporterhelper.Option { +func buildExporterResilienceOptions(options []exporterhelper.Option, cfg *Config, payloadCodec *queuePayloadCodec) []exporterhelper.Option { if cfg.TimeoutSettings.Timeout > 0 { options = append(options, exporterhelper.WithTimeout(cfg.TimeoutSettings)) } if cfg.QueueSettings.Enabled { - options = append(options, exporterhelper.WithQueue(cfg.QueueSettings)) + options = append(options, exporterhelper.WithQueue(cfg.QueueSettings.QueueBatchConfig)) + if payloadCodec != nil { + options = append(options, exporterhelper.WithQueueBatchPayloadCodec(payloadCodec)) + if cfg.QueueSettings.CompressInMemory { + options = append(options, exporterhelper.WithQueueBatchInMemoryEncoding(true)) + } + } } if cfg.Enabled { options = append(options, exporterhelper.WithRetry(cfg.BackOffConfig)) @@ -91,10 +104,11 @@ func createTracesExporter(ctx context.Context, params exporter.Settings, cfg com if err != nil { return nil, fmt.Errorf("cannot configure loadbalancing traces exporter: %w", err) } + payloadCodec := newQueuePayloadCodecIfEnabled(c) options := []exporterhelper.Option{ exporterhelper.WithStart(exp.Start), - exporterhelper.WithShutdown(exp.Shutdown), + exporterhelper.WithShutdown(shutdownWithCodec(component.ShutdownFunc(exp.Shutdown), payloadCodec)), exporterhelper.WithCapabilities(exp.Capabilities()), } @@ -103,7 +117,7 @@ func createTracesExporter(ctx context.Context, params exporter.Settings, cfg com params, cfg, exp.ConsumeTraces, - buildExporterResilienceOptions(options, c)..., + buildExporterResilienceOptions(options, c, payloadCodec)..., ) } @@ -113,10 +127,11 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, cfg compo if err != nil { return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err) } + payloadCodec := newQueuePayloadCodecIfEnabled(c) options := []exporterhelper.Option{ exporterhelper.WithStart(exporter.Start), - exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithShutdown(shutdownWithCodec(component.ShutdownFunc(exporter.Shutdown), payloadCodec)), exporterhelper.WithCapabilities(exporter.Capabilities()), } @@ -125,7 +140,7 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, cfg compo params, cfg, exporter.ConsumeLogs, - buildExporterResilienceOptions(options, c)..., + buildExporterResilienceOptions(options, c, payloadCodec)..., ) } @@ -135,10 +150,11 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg co if err != nil { return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err) } + payloadCodec := newQueuePayloadCodecIfEnabled(c) options := []exporterhelper.Option{ exporterhelper.WithStart(exporter.Start), - exporterhelper.WithShutdown(exporter.Shutdown), + exporterhelper.WithShutdown(shutdownWithCodec(component.ShutdownFunc(exporter.Shutdown), payloadCodec)), exporterhelper.WithCapabilities(exporter.Capabilities()), } @@ -147,6 +163,26 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg co params, cfg, exporter.ConsumeMetrics, - buildExporterResilienceOptions(options, c)..., + buildExporterResilienceOptions(options, c, payloadCodec)..., ) } + +func newQueuePayloadCodecIfEnabled(cfg *Config) *queuePayloadCodec { + if !cfg.QueueSettings.Enabled { + return nil + } + if cfg.QueueSettings.PayloadCompression == "" { + return nil + } + + return newQueuePayloadCodec(cfg.QueueSettings.PayloadCompression) +} + +func shutdownWithCodec(shutdown component.ShutdownFunc, codec *queuePayloadCodec) component.ShutdownFunc { + if codec == nil { + return shutdown + } + return func(ctx context.Context) error { + return multierr.Append(shutdown.Shutdown(ctx), codec.Close()) + } +} diff --git a/exporter/loadbalancingexporter/factory_test.go b/exporter/loadbalancingexporter/factory_test.go index 84e9f2ed7f36f..fca75c5d0730a 100644 --- a/exporter/loadbalancingexporter/factory_test.go +++ b/exporter/loadbalancingexporter/factory_test.go @@ -205,30 +205,65 @@ func TestBuildExporterResilienceOptions(t *testing.T) { t.Run("Shouldn't have resilience options by default", func(t *testing.T) { o := []exporterhelper.Option{} cfg := createDefaultConfig().(*Config) - assert.Empty(t, buildExporterResilienceOptions(o, cfg)) + assert.Empty(t, buildExporterResilienceOptions(o, cfg, nil)) }) t.Run("Should have timeout option if defined", func(t *testing.T) { o := []exporterhelper.Option{} cfg := createDefaultConfig().(*Config) cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() - assert.Len(t, buildExporterResilienceOptions(o, cfg), 1) + assert.Len(t, buildExporterResilienceOptions(o, cfg, nil), 1) }) t.Run("Should have timeout and queue options if defined", func(t *testing.T) { o := []exporterhelper.Option{} cfg := createDefaultConfig().(*Config) cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() - cfg.QueueSettings = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() - assert.Len(t, buildExporterResilienceOptions(o, cfg), 2) + assert.Len(t, buildExporterResilienceOptions(o, cfg, newQueuePayloadCodecIfEnabled(cfg)), 3) + }) + t.Run("Should have timeout, queue and compression options when compression is enabled", func(t *testing.T) { + o := []exporterhelper.Option{} + cfg := createDefaultConfig().(*Config) + cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() + cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy + + assert.Len(t, buildExporterResilienceOptions(o, cfg, newQueuePayloadCodec(cfg.QueueSettings.PayloadCompression)), 3) + }) + t.Run("Should include in-memory queue compression option when enabled", func(t *testing.T) { + o := []exporterhelper.Option{} + cfg := createDefaultConfig().(*Config) + cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() + cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy + cfg.QueueSettings.CompressInMemory = true + + assert.Len(t, buildExporterResilienceOptions(o, cfg, newQueuePayloadCodec(cfg.QueueSettings.PayloadCompression)), 4) }) t.Run("Should have all resilience options if defined", func(t *testing.T) { o := []exporterhelper.Option{} cfg := createDefaultConfig().(*Config) cfg.TimeoutSettings = exporterhelper.NewDefaultTimeoutConfig() - cfg.QueueSettings = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.QueueBatchConfig = exporterhelper.NewDefaultQueueConfig() + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionNone cfg.BackOffConfig = configretry.NewDefaultBackOffConfig() - assert.Len(t, buildExporterResilienceOptions(o, cfg), 3) + assert.Len(t, buildExporterResilienceOptions(o, cfg, newQueuePayloadCodecIfEnabled(cfg)), 4) + }) +} + +func TestNewQueuePayloadCodecIfEnabled(t *testing.T) { + cfg := createDefaultConfig().(*Config) + cfg.QueueSettings.Enabled = true + + t.Run("legacy empty string disables codec", func(t *testing.T) { + cfg.QueueSettings.PayloadCompression = "" + assert.Nil(t, newQueuePayloadCodecIfEnabled(cfg)) + }) + + t.Run("explicit none enables compatibility codec", func(t *testing.T) { + cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionNone + assert.NotNil(t, newQueuePayloadCodecIfEnabled(cfg)) }) } diff --git a/exporter/loadbalancingexporter/go.mod b/exporter/loadbalancingexporter/go.mod index 0004e3ecdc3a9..0bc4a36b13045 100644 --- a/exporter/loadbalancingexporter/go.mod +++ b/exporter/loadbalancingexporter/go.mod @@ -3,12 +3,14 @@ module github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadba go 1.25.1 require ( - github.com/aws/aws-sdk-go-v2/config v1.31.20 - github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.39.16 + github.com/aws/aws-sdk-go-v2/config v1.31.19 + github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.39.15 github.com/aws/smithy-go v1.23.2 github.com/goccy/go-json v0.10.5 - github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.140.1 - github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.140.1 + github.com/golang/snappy v1.0.0 + github.com/klauspost/compress v1.18.1 + github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics v0.140.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/batchpersignal v0.140.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.140.1 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.140.1 github.com/stretchr/testify v1.11.1 @@ -32,27 +34,27 @@ require ( go.opentelemetry.io/otel/trace v1.38.0 go.uber.org/goleak v1.3.0 go.uber.org/multierr v1.11.0 - go.uber.org/zap v1.27.0 + go.uber.org/zap v1.27.1 gopkg.in/yaml.v3 v3.0.1 - k8s.io/api v0.34.2 - k8s.io/apimachinery v0.34.2 - k8s.io/client-go v0.34.2 + k8s.io/api v0.34.1 + k8s.io/apimachinery v0.34.1 + k8s.io/client-go v0.34.1 k8s.io/utils v0.0.0-20250604170112-4c0f3b243397 sigs.k8s.io/controller-runtime v0.22.4 ) require ( github.com/aws/aws-sdk-go-v2 v1.39.6 // indirect - github.com/aws/aws-sdk-go-v2/credentials v1.18.24 // indirect + github.com/aws/aws-sdk-go-v2/credentials v1.18.23 // indirect github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13 // indirect github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 // indirect github.com/aws/aws-sdk-go-v2/internal/endpoints/v2 v2.7.13 // indirect github.com/aws/aws-sdk-go-v2/internal/ini v1.8.4 // indirect github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3 // indirect github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 // indirect - github.com/aws/aws-sdk-go-v2/service/sso v1.30.3 // indirect - github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.7 // indirect - github.com/aws/aws-sdk-go-v2/service/sts v1.40.2 // indirect + github.com/aws/aws-sdk-go-v2/service/sso v1.30.2 // indirect + github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.6 // indirect + github.com/aws/aws-sdk-go-v2/service/sts v1.40.1 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v5 v5.0.3 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect @@ -72,7 +74,6 @@ require ( github.com/go-viper/mapstructure/v2 v2.4.0 // indirect github.com/gobwas/glob v0.2.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect - github.com/golang/snappy v1.0.0 // indirect github.com/google/gnostic-models v0.7.0 // indirect github.com/google/go-cmp v0.7.0 // indirect github.com/google/go-tpm v0.9.7 // indirect @@ -83,7 +84,6 @@ require ( github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect - github.com/klauspost/compress v1.18.1 // indirect github.com/knadh/koanf/maps v0.1.2 // indirect github.com/knadh/koanf/providers/confmap v1.0.0 // indirect github.com/knadh/koanf/v2 v2.3.0 // indirect @@ -111,7 +111,7 @@ require ( github.com/tklauser/numcpus v0.10.0 // indirect github.com/x448/float16 v0.8.4 // indirect github.com/yusufpapurcu/wmi v1.2.4 // indirect - go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/auto/sdk v1.2.1 // indirect go.opentelemetry.io/collector v0.140.0 // indirect go.opentelemetry.io/collector/client v1.46.0 // indirect go.opentelemetry.io/collector/component/componentstatus v0.140.0 // indirect @@ -181,16 +181,16 @@ require ( go.yaml.in/yaml/v3 v3.0.4 // indirect golang.org/x/crypto v0.43.0 // indirect golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect - golang.org/x/net v0.45.0 // indirect - golang.org/x/oauth2 v0.31.0 // indirect + golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 // indirect + golang.org/x/oauth2 v0.32.0 // indirect golang.org/x/sys v0.37.0 // indirect golang.org/x/term v0.36.0 // indirect golang.org/x/text v0.30.0 // indirect golang.org/x/time v0.9.0 // indirect gonum.org/v1/gonum v0.16.0 // indirect - google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 // indirect - google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 // indirect - google.golang.org/grpc v1.76.0 // indirect + google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 // indirect + google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 // indirect + google.golang.org/grpc v1.77.0 // indirect google.golang.org/protobuf v1.36.10 // indirect gopkg.in/evanphx/json-patch.v4 v4.12.0 // indirect gopkg.in/inf.v0 v0.9.1 // indirect @@ -217,3 +217,5 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden replace github.com/open-telemetry/opentelemetry-collector-contrib/internal/exp/metrics => ../../internal/exp/metrics + +replace go.opentelemetry.io/collector/exporter/exporterhelper => github.com/Sawmills/opentelemetry-collector/exporter/exporterhelper v0.140.1-0.20260303212412-92b47b11df4d diff --git a/exporter/loadbalancingexporter/go.sum b/exporter/loadbalancingexporter/go.sum index 293dba27c567a..4a4c5fe09545a 100644 --- a/exporter/loadbalancingexporter/go.sum +++ b/exporter/loadbalancingexporter/go.sum @@ -1,9 +1,11 @@ +github.com/Sawmills/opentelemetry-collector/exporter/exporterhelper v0.140.1-0.20260303212412-92b47b11df4d h1:no8qSZ8DgrYUZfD1Se0K++Ih1CFRA/tZFAOaR9X3YOE= +github.com/Sawmills/opentelemetry-collector/exporter/exporterhelper v0.140.1-0.20260303212412-92b47b11df4d/go.mod h1:OMSquxnWfU6bY9AVL61MP5cdxheYvBD4T/TQXe4/Dtw= github.com/aws/aws-sdk-go-v2 v1.39.6 h1:2JrPCVgWJm7bm83BDwY5z8ietmeJUbh3O2ACnn+Xsqk= github.com/aws/aws-sdk-go-v2 v1.39.6/go.mod h1:c9pm7VwuW0UPxAEYGyTmyurVcNrbF6Rt/wixFqDhcjE= -github.com/aws/aws-sdk-go-v2/config v1.31.20 h1:/jWF4Wu90EhKCgjTdy1DGxcbcbNrjfBHvksEL79tfQc= -github.com/aws/aws-sdk-go-v2/config v1.31.20/go.mod h1:95Hh1Tc5VYKL9NJ7tAkDcqeKt+MCXQB1hQZaRdJIZE0= -github.com/aws/aws-sdk-go-v2/credentials v1.18.24 h1:iJ2FmPT35EaIB0+kMa6TnQ+PwG5A1prEdAw+PsMzfHg= -github.com/aws/aws-sdk-go-v2/credentials v1.18.24/go.mod h1:U91+DrfjAiXPDEGYhh/x29o4p0qHX5HDqG7y5VViv64= +github.com/aws/aws-sdk-go-v2/config v1.31.19 h1:qdUtOw4JhZr2YcKO3g0ho/IcFXfXrrb8xlX05Y6EvSw= +github.com/aws/aws-sdk-go-v2/config v1.31.19/go.mod h1:tMJ8bur01t8eEm0atLadkIIFA154OJ4JCKZeQ+o+R7k= +github.com/aws/aws-sdk-go-v2/credentials v1.18.23 h1:IQILcxVgMO2BVLaJ2aAv21dKWvE1MduNrbvuK43XL2Q= +github.com/aws/aws-sdk-go-v2/credentials v1.18.23/go.mod h1:JRodHszhVdh5TPUknxDzJzrMiznG+M+FfR3WSWKgCI8= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13 h1:T1brd5dR3/fzNFAQch/iBKeX07/ffu/cLu+q+RuzEWk= github.com/aws/aws-sdk-go-v2/feature/ec2/imds v1.18.13/go.mod h1:Peg/GBAQ6JDt+RoBf4meB1wylmAipb7Kg2ZFakZTlwk= github.com/aws/aws-sdk-go-v2/internal/configsources v1.4.13 h1:a+8/MLcWlIxo1lF9xaGt3J/u3yOZx+CdSveSNwjhD40= @@ -16,14 +18,14 @@ github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3 h1:x2Ibm/A github.com/aws/aws-sdk-go-v2/service/internal/accept-encoding v1.13.3/go.mod h1:IW1jwyrQgMdhisceG8fQLmQIydcT/jWY21rFhzgaKwo= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13 h1:kDqdFvMY4AtKoACfzIGD8A0+hbT41KTKF//gq7jITfM= github.com/aws/aws-sdk-go-v2/service/internal/presigned-url v1.13.13/go.mod h1:lmKuogqSU3HzQCwZ9ZtcqOc5XGMqtDK7OIc2+DxiUEg= -github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.39.16 h1:lzAqM9zMFwAy3ghxjeJfROdwnzO/KCPY8RYEAYpGbCM= -github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.39.16/go.mod h1:lYyuDbeQ6vtjRP4gb9h2MReluEb0US5u+07X84akGKg= -github.com/aws/aws-sdk-go-v2/service/sso v1.30.3 h1:NjShtS1t8r5LUfFVtFeI8xLAHQNTa7UI0VawXlrBMFQ= -github.com/aws/aws-sdk-go-v2/service/sso v1.30.3/go.mod h1:fKvyjJcz63iL/ftA6RaM8sRCtN4r4zl4tjL3qw5ec7k= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.7 h1:gTsnx0xXNQ6SBbymoDvcoRHL+q4l/dAFsQuKfDWSaGc= -github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.7/go.mod h1:klO+ejMvYsB4QATfEOIXk8WAEwN4N0aBfJpvC+5SZBo= -github.com/aws/aws-sdk-go-v2/service/sts v1.40.2 h1:HK5ON3KmQV2HcAunnx4sKLB9aPf3gKGwVAf7xnx0QT0= -github.com/aws/aws-sdk-go-v2/service/sts v1.40.2/go.mod h1:E19xDjpzPZC7LS2knI9E6BaRFDK43Eul7vd6rSq2HWk= +github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.39.15 h1:f4rrodaG1r72h7qF2b1245nzcLUfN4OdgyQF2nw5+Jw= +github.com/aws/aws-sdk-go-v2/service/servicediscovery v1.39.15/go.mod h1:lYyuDbeQ6vtjRP4gb9h2MReluEb0US5u+07X84akGKg= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.2 h1:/p6MxkbQoCzaGQT3WO0JwG0FlQyG9RD8VmdmoKc5xqU= +github.com/aws/aws-sdk-go-v2/service/sso v1.30.2/go.mod h1:fKvyjJcz63iL/ftA6RaM8sRCtN4r4zl4tjL3qw5ec7k= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.6 h1:0dES42T2dhICCbVB3JSTTn7+Bz93wfJEK1b7jksZIyQ= +github.com/aws/aws-sdk-go-v2/service/ssooidc v1.35.6/go.mod h1:klO+ejMvYsB4QATfEOIXk8WAEwN4N0aBfJpvC+5SZBo= +github.com/aws/aws-sdk-go-v2/service/sts v1.40.1 h1:5sbIM57lHLaEaNWdIx23JH30LNBsSDkjN/QXGcRLAFc= +github.com/aws/aws-sdk-go-v2/service/sts v1.40.1/go.mod h1:E19xDjpzPZC7LS2knI9E6BaRFDK43Eul7vd6rSq2HWk= github.com/aws/smithy-go v1.23.2 h1:Crv0eatJUQhaManss33hS5r40CG3ZFH+21XSkqMrIUM= github.com/aws/smithy-go v1.23.2/go.mod h1:LEj2LM3rBRQJxPZTB4KuzZkaZYnZPnvgIhb4pu07mx0= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= @@ -203,8 +205,8 @@ github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9de github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yusufpapurcu/wmi v1.2.4 h1:zFUKzehAFReQwLys1b/iSMl+JQGSCSjtVqQn9bBrPo0= github.com/yusufpapurcu/wmi v1.2.4/go.mod h1:SBZ9tNy3G9/m5Oi98Zks0QjeHVDvuK0qfxQmPyzfmi0= -go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= -go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/auto/sdk v1.2.1 h1:jXsnJ4Lmnqd11kwkBV2LgLoFMZKizbCi5fNZ/ipaZ64= +go.opentelemetry.io/auto/sdk v1.2.1/go.mod h1:KRTj+aOaElaLi+wW1kO/DZRXwkF4C5xPbEe3ZiIhN7Y= go.opentelemetry.io/collector v0.140.0 h1:6RI7/l3TtQj+93xk+gpNh6TpvevOsz9E5KB2s3h00j8= go.opentelemetry.io/collector v0.140.0/go.mod h1:myrJeCdIuFeUGCUEgs4lWflywff9VANpuJEtdY1pKEk= go.opentelemetry.io/collector/client v1.46.0 h1:nAEVyKIECez8P92RXa78mjRvaynkivYdukT07lzF7Gs= @@ -267,8 +269,6 @@ go.opentelemetry.io/collector/consumer/xconsumer v0.140.0 h1:VTTybtJLbGN6aGw1bB7 go.opentelemetry.io/collector/consumer/xconsumer v0.140.0/go.mod h1:CtwSgAXVisCEJ+ElKeDa0yDo/Oie7l1vWAx1elFyWZc= go.opentelemetry.io/collector/exporter v1.46.0 h1:wCNH6dyG/PFtN40Q4ZCPWXgPuoX44cT9U4TuNVcLUvw= go.opentelemetry.io/collector/exporter v1.46.0/go.mod h1:EiNU4i+iG0n1FQBkWkwS7Nzd+vjlKsefy1bLHj913EU= -go.opentelemetry.io/collector/exporter/exporterhelper v0.140.0 h1:Euh2mfLhZoPgccNY++PfX0H3aFwthVFjR38x4RllXcM= -go.opentelemetry.io/collector/exporter/exporterhelper v0.140.0/go.mod h1:0WQCcouhn/efm75++yuzhNj51Q+8kR3HrGDLGjoUrso= go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.140.0 h1:jyw54m867IaPktvM5tU7T2vA3TY8/9M1de81mvJYa2A= go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper v0.140.0/go.mod h1:La5T7cyiinV4qxjD/l2MI2FDL30ArKaBp6Lji+RBzm8= go.opentelemetry.io/collector/exporter/exportertest v0.140.0 h1:WdRm8xXdjMcNnsVQHHTbGxmsp+4MuNMKhS0dR++bKOY= @@ -401,8 +401,8 @@ go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto= go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE= go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= -go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8= -go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= +go.uber.org/zap v1.27.1 h1:08RqriUEv8+ArZRYSTXy1LeBScaMpVSTBhCeaZYfMYc= +go.uber.org/zap v1.27.1/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E= go.yaml.in/yaml/v2 v2.4.3 h1:6gvOSjQoTB3vt1l+CU+tSyi/HOjfOjRLJ4YwYZGwRO0= go.yaml.in/yaml/v2 v2.4.3/go.mod h1:zSxWcmIDjOzPXpjlTTbAsKokqkDNAVtZO0WOMiT90s8= go.yaml.in/yaml/v3 v3.0.4 h1:tfq32ie2Jv2UxXFdLJdh3jXuOzWiL1fo0bu/FbuKpbc= @@ -420,10 +420,10 @@ golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU= -golang.org/x/net v0.45.0 h1:RLBg5JKixCy82FtLJpeNlVM0nrSqpCRYzVU1n8kj0tM= -golang.org/x/net v0.45.0/go.mod h1:ECOoLqd5U3Lhyeyo/QDCEVQ4sNgYsqvCZ722XogGieY= -golang.org/x/oauth2 v0.31.0 h1:8Fq0yVZLh4j4YA47vHKFTa9Ew5XIrCP8LC6UeNZnLxo= -golang.org/x/oauth2 v0.31.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= +golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82 h1:6/3JGEh1C88g7m+qzzTbl3A0FtsLguXieqofVLU/JAo= +golang.org/x/net v0.46.1-0.20251013234738-63d1a5100f82/go.mod h1:Q9BGdFy1y4nkUwiLvT5qtyhAnEHgnQ/zd8PfU6nc210= +golang.org/x/oauth2 v0.32.0 h1:jsCblLleRMDrxMN29H3z/k1KliIvpLgCkE6R8FXXNgY= +golang.org/x/oauth2 v0.32.0/go.mod h1:lzm5WQJQwKZ3nwavOZ3IS5Aulzxi68dUSgRHujetwEA= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= @@ -454,12 +454,12 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gonum.org/v1/gonum v0.16.0 h1:5+ul4Swaf3ESvrOnidPp4GZbzf0mxVQpDCYUQE7OJfk= gonum.org/v1/gonum v0.16.0/go.mod h1:fef3am4MQ93R2HHpKnLk4/Tbh/s0+wqD5nfa6Pnwy4E= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5 h1:BIRfGDEjiHRrk0QKZe3Xv2ieMhtgRGeLcZQ0mIVn4EY= -google.golang.org/genproto/googleapis/api v0.0.0-20250825161204-c5933d9347a5/go.mod h1:j3QtIyytwqGr1JUDtYXwtMXWPKsEa5LtzIFN1Wn5WvE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5 h1:eaY8u2EuxbRv7c3NiGK0/NedzVsCcV6hDuU5qPX5EGE= -google.golang.org/genproto/googleapis/rpc v0.0.0-20250825161204-c5933d9347a5/go.mod h1:M4/wBTSeyLxupu3W3tJtOgB14jILAS/XWPSSa3TAlJc= -google.golang.org/grpc v1.76.0 h1:UnVkv1+uMLYXoIz6o7chp59WfQUYA2ex/BXQ9rHZu7A= -google.golang.org/grpc v1.76.0/go.mod h1:Ju12QI8M6iQJtbcsV+awF5a4hfJMLi4X0JLo94ULZ6c= +google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8 h1:mepRgnBZa07I4TRuomDE4sTIYieg/osKmzIf4USdWS4= +google.golang.org/genproto/googleapis/api v0.0.0-20251022142026-3a174f9686a8/go.mod h1:fDMmzKV90WSg1NbozdqrE64fkuTv6mlq2zxo9ad+3yo= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8 h1:M1rk8KBnUsBDg1oPGHNCxG4vc1f49epmTO7xscSajMk= +google.golang.org/genproto/googleapis/rpc v0.0.0-20251022142026-3a174f9686a8/go.mod h1:7i2o+ce6H/6BluujYR+kqX3GKH+dChPTQU19wjRPiGk= +google.golang.org/grpc v1.77.0 h1:wVVY6/8cGA6vvffn+wWK5ToddbgdU3d8MNENr4evgXM= +google.golang.org/grpc v1.77.0/go.mod h1:z0BY1iVj0q8E1uSQCjL9cppRj+gnZjzDnzV0dHhrNig= google.golang.org/protobuf v1.36.10 h1:AYd7cD/uASjIL6Q9LiTjz8JLcrh/88q5UObnmY3aOOE= google.golang.org/protobuf v1.36.10/go.mod h1:HTf+CrKn2C3g5S8VImy6tdcUvCska2kB7j23XfzDpco= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= @@ -472,14 +472,14 @@ gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -k8s.io/api v0.34.2 h1:fsSUNZhV+bnL6Aqrp6O7lMTy6o5x2C4XLjnh//8SLYY= -k8s.io/api v0.34.2/go.mod h1:MMBPaWlED2a8w4RSeanD76f7opUoypY8TFYkSM+3XHw= +k8s.io/api v0.34.1 h1:jC+153630BMdlFukegoEL8E/yT7aLyQkIVuwhmwDgJM= +k8s.io/api v0.34.1/go.mod h1:SB80FxFtXn5/gwzCoN6QCtPD7Vbu5w2n1S0J5gFfTYk= k8s.io/apiextensions-apiserver v0.34.1 h1:NNPBva8FNAPt1iSVwIE0FsdrVriRXMsaWFMqJbII2CI= k8s.io/apiextensions-apiserver v0.34.1/go.mod h1:hP9Rld3zF5Ay2Of3BeEpLAToP+l4s5UlxiHfqRaRcMc= -k8s.io/apimachinery v0.34.2 h1:zQ12Uk3eMHPxrsbUJgNF8bTauTVR2WgqJsTmwTE/NW4= -k8s.io/apimachinery v0.34.2/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= -k8s.io/client-go v0.34.2 h1:Co6XiknN+uUZqiddlfAjT68184/37PS4QAzYvQvDR8M= -k8s.io/client-go v0.34.2/go.mod h1:2VYDl1XXJsdcAxw7BenFslRQX28Dxz91U9MWKjX97fE= +k8s.io/apimachinery v0.34.1 h1:dTlxFls/eikpJxmAC7MVE8oOeP1zryV7iRyIjB0gky4= +k8s.io/apimachinery v0.34.1/go.mod h1:/GwIlEcWuTX9zKIg2mbw0LRFIsXwrfoVxn+ef0X13lw= +k8s.io/client-go v0.34.1 h1:ZUPJKgXsnKwVwmKKdPfw4tB58+7/Ik3CrjOEhsiZ7mY= +k8s.io/client-go v0.34.1/go.mod h1:kA8v0FP+tk6sZA0yKLRG67LWjqufAoSHA2xVGKw9Of8= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kube-openapi v0.0.0-20250710124328-f3f2b991d03b h1:MloQ9/bdJyIu9lb1PzujOPolHyvO06MXG5TUIj2mNAA= diff --git a/exporter/loadbalancingexporter/payload_codec.go b/exporter/loadbalancingexporter/payload_codec.go new file mode 100644 index 0000000000000..cd1f7cbe74cad --- /dev/null +++ b/exporter/loadbalancingexporter/payload_codec.go @@ -0,0 +1,151 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter" + +import ( + "errors" + "fmt" + "sync" + + "github.com/golang/snappy" + "github.com/klauspost/compress/zstd" +) + +var ( + errInvalidCompressedPayload = errors.New("invalid compressed queue payload") + queuePayloadMagic = [3]byte{'s', 'q', 'c'} + queuePayloadVersion byte = 1 +) + +const ( + queuePayloadCodecNone byte = 0 + queuePayloadCodecSnappy byte = 1 + queuePayloadCodecZstd byte = 2 +) + +type queuePayloadCodec struct { + compression QueuePayloadCompression + zstdOnce sync.Once + closeOnce sync.Once + zstdEnc *zstd.Encoder + zstdDec *zstd.Decoder + zstdErr error +} + +func newQueuePayloadCodec(compression QueuePayloadCompression) *queuePayloadCodec { + return &queuePayloadCodec{compression: compression} +} + +func (c *queuePayloadCodec) Encode(payload []byte) ([]byte, error) { + codecID, err := codecIDForCompression(c.compression) + if err != nil { + return nil, err + } + + compressed, err := c.compress(codecID, payload) + if err != nil { + return nil, err + } + + out := make([]byte, 0, len(compressed)+5) + out = append(out, queuePayloadMagic[:]...) + out = append(out, queuePayloadVersion, codecID) + out = append(out, compressed...) + return out, nil +} + +func (c *queuePayloadCodec) Decode(payload []byte) ([]byte, error) { + if len(payload) < 5 { + if c.compression == QueuePayloadCompressionNone { + return payload, nil + } + return nil, errInvalidCompressedPayload + } + if payload[0] != queuePayloadMagic[0] || payload[1] != queuePayloadMagic[1] || payload[2] != queuePayloadMagic[2] { + if c.compression == QueuePayloadCompressionNone { + return payload, nil + } + return nil, errInvalidCompressedPayload + } + if payload[3] != queuePayloadVersion { + return nil, fmt.Errorf("%w: unsupported version %d", errInvalidCompressedPayload, payload[3]) + } + + return c.decompress(payload[4], payload[5:]) +} + +func (c *queuePayloadCodec) compress(codecID byte, payload []byte) ([]byte, error) { + switch codecID { + case queuePayloadCodecNone: + return payload, nil + case queuePayloadCodecSnappy: + return snappy.Encode(nil, payload), nil + case queuePayloadCodecZstd: + if err := c.initZstd(); err != nil { + return nil, err + } + return c.zstdEnc.EncodeAll(payload, nil), nil + default: + return nil, fmt.Errorf("unsupported queue payload codec %d", codecID) + } +} + +func (c *queuePayloadCodec) decompress(codecID byte, payload []byte) ([]byte, error) { + switch codecID { + case queuePayloadCodecNone: + return payload, nil + case queuePayloadCodecSnappy: + return snappy.Decode(nil, payload) + case queuePayloadCodecZstd: + if err := c.initZstd(); err != nil { + return nil, err + } + return c.zstdDec.DecodeAll(payload, nil) + default: + return nil, fmt.Errorf("%w: unsupported codec %d", errInvalidCompressedPayload, codecID) + } +} + +func (c *queuePayloadCodec) initZstd() error { + c.zstdOnce.Do(func() { + c.zstdEnc, c.zstdErr = zstd.NewWriter(nil) + if c.zstdErr != nil { + return + } + c.zstdDec, c.zstdErr = zstd.NewReader(nil) + if c.zstdErr != nil { + if closeErr := c.zstdEnc.Close(); closeErr != nil { + c.zstdErr = errors.Join(c.zstdErr, closeErr) + } + c.zstdEnc = nil + } + }) + return c.zstdErr +} + +func (c *queuePayloadCodec) Close() error { + var closeErr error + c.closeOnce.Do(func() { + if c.zstdDec != nil { + c.zstdDec.Close() + } + if c.zstdEnc != nil { + closeErr = c.zstdEnc.Close() + } + }) + return closeErr +} + +func codecIDForCompression(compression QueuePayloadCompression) (byte, error) { + switch compression { + case QueuePayloadCompressionNone: + return queuePayloadCodecNone, nil + case QueuePayloadCompressionSnappy: + return queuePayloadCodecSnappy, nil + case QueuePayloadCompressionZstd: + return queuePayloadCodecZstd, nil + default: + return 0, fmt.Errorf("unsupported queue payload compression %q", compression) + } +} diff --git a/exporter/loadbalancingexporter/payload_codec_test.go b/exporter/loadbalancingexporter/payload_codec_test.go new file mode 100644 index 0000000000000..5ce6c65c89d00 --- /dev/null +++ b/exporter/loadbalancingexporter/payload_codec_test.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package loadbalancingexporter + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +func TestQueuePayloadCodecRoundTrip(t *testing.T) { + tests := []struct { + name string + compression QueuePayloadCompression + }{ + {name: "none", compression: QueuePayloadCompressionNone}, + {name: "snappy", compression: QueuePayloadCompressionSnappy}, + {name: "zstd", compression: QueuePayloadCompressionZstd}, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + codec := newQueuePayloadCodec(tt.compression) + original := []byte("hello compressed queue payload") + + encoded, err := codec.Encode(original) + require.NoError(t, err) + + decoded, err := codec.Decode(encoded) + require.NoError(t, err) + require.Equal(t, original, decoded) + }) + } +} + +func TestQueuePayloadCodecDecodeRejectsInvalidPayload(t *testing.T) { + codec := newQueuePayloadCodec(QueuePayloadCompressionSnappy) + + _, err := codec.Decode([]byte("bad")) + require.Error(t, err) + + encoded, err := codec.Encode([]byte("hello")) + require.NoError(t, err) + + encoded[0] = 'x' + _, err = codec.Decode(encoded) + require.Error(t, err) + + encoded, err = codec.Encode([]byte("hello")) + require.NoError(t, err) + + encoded[3] = 0xFF + _, err = codec.Decode(encoded) + require.ErrorContains(t, err, "unsupported version") +} + +func TestQueuePayloadCodecNoneDecodeLegacyRawPayload(t *testing.T) { + codec := newQueuePayloadCodec(QueuePayloadCompressionNone) + raw := []byte("legacy-raw-payload") + + decoded, err := codec.Decode(raw) + require.NoError(t, err) + require.Equal(t, raw, decoded) +} + +func TestQueuePayloadCodecNoneDecodeCompressedPayload(t *testing.T) { + snappyCodec := newQueuePayloadCodec(QueuePayloadCompressionSnappy) + noneCodec := newQueuePayloadCodec(QueuePayloadCompressionNone) + original := []byte("payload written before compression mode switched to none") + + encoded, err := snappyCodec.Encode(original) + require.NoError(t, err) + + decoded, err := noneCodec.Decode(encoded) + require.NoError(t, err) + require.Equal(t, original, decoded) +}