Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .chloggen/avoid-extra-copy.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: coralogixexporter
component: exporter/coralogix
Comment thread
amir-jakoby marked this conversation as resolved.

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add Automatic AWS PrivateLink set up via new `private_link` configuration option
Expand Down
4 changes: 4 additions & 0 deletions .chloggen/config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ components:
- exporter/honeycombmarker
- exporter/influxdb
- exporter/kafka
- exporter/kedascaler
- exporter/loadbalancing
- exporter/logicmonitor
- exporter/logzio
Expand Down Expand Up @@ -190,10 +191,12 @@ components:
- processor/groupbyattrs
- processor/groupbytrace
- processor/hetznerdetector
- processor/hotreloadprocessor
- processor/interval
- processor/isolationforest
- processor/k8sattributes
- processor/logdedup
- processor/logstometricsprocessor
- processor/logstransform
- processor/metricsgeneration
- processor/metricstarttime
Expand Down Expand Up @@ -241,6 +244,7 @@ components:
- receiver/collectd
- receiver/couchdb
- receiver/datadog
- receiver/datadoglog
- receiver/docker_stats
- receiver/elasticsearch
- receiver/envoyals
Expand Down
2 changes: 1 addition & 1 deletion .chloggen/oracle_query_traceid_fix.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
change_type: bug_fix

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: oracledbreceiver
component: receiver/oracledb

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Fix for wrong trace id in oracle top query records
Expand Down
30 changes: 30 additions & 0 deletions .chloggen/saw-6556-loadbalancing-compressed-queue.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: exporter/loadbalancing

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Add optional sending queue payload compression (snappy or zstd) for the loadbalancing exporter, with an opt-in mode to store in-memory queue entries as encoded payloads.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [6556]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
Persistent queue payloads are encoded with a queue header and decoded on dequeue.
In-memory queue compression is opt-in via `sending_queue.compress_in_memory`.
Zstd encoder/decoder resources are cleaned up on exporter shutdown.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: ['user']
2 changes: 1 addition & 1 deletion .chloggen/upgrade-contrib-version.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: core
component: all

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Update dependencies to v0.136.0 and v0.136.1, including improvements to kedascalerexporter error handling and datadog log processing optimization
Comment thread
amir-jakoby marked this conversation as resolved.
Expand Down
11 changes: 11 additions & 0 deletions exporter/loadbalancingexporter/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,17 @@ service:
- loadbalancing
```

To compress payloads in an in-memory sending queue, set:

```yaml
exporters:
loadbalancing:
sending_queue:
enabled: true
payload_compression: zstd
compress_in_memory: true
```

Kubernetes resolver example (For a more specific example: [example/k8s-resolver](./example/k8s-resolver/README.md))
> [!IMPORTANT]
> The k8s resolver requires proper permissions. See [the full example](./example/k8s-resolver/README.md) for more information.
Expand Down
42 changes: 41 additions & 1 deletion exporter/loadbalancingexporter/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package loadbalancingexporter // import "github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter"

import (
"fmt"
"time"

"github.com/aws/aws-sdk-go-v2/service/servicediscovery/types"
Expand Down Expand Up @@ -37,7 +38,7 @@
type Config struct {
TimeoutSettings exporterhelper.TimeoutConfig `mapstructure:",squash"`
configretry.BackOffConfig `mapstructure:"retry_on_failure"`
QueueSettings exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`
QueueSettings QueueSettings `mapstructure:"sending_queue"`

Protocol Protocol `mapstructure:"protocol"`
Resolver ResolverSettings `mapstructure:"resolver"`
Expand All @@ -52,6 +53,45 @@
RoutingAttributes []string `mapstructure:"routing_attributes"`
}

type QueueSettings struct {
exporterhelper.QueueBatchConfig `mapstructure:",squash"`
PayloadCompression QueuePayloadCompression `mapstructure:"payload_compression"`
CompressInMemory bool `mapstructure:"compress_in_memory"`
}

type QueuePayloadCompression string

const (
QueuePayloadCompressionNone QueuePayloadCompression = "none"
QueuePayloadCompressionSnappy QueuePayloadCompression = "snappy"
QueuePayloadCompressionZstd QueuePayloadCompression = "zstd"
)

func (q QueueSettings) Validate() error {
if err := q.QueueBatchConfig.Validate(); err != nil {
return err
}
switch q.PayloadCompression {
case "", QueuePayloadCompressionNone, QueuePayloadCompressionSnappy, QueuePayloadCompressionZstd:
// Valid payload compression value.
default:
return fmt.Errorf("sending_queue.payload_compression must be one of [none, snappy, zstd], found %q", q.PayloadCompression)
}

if q.CompressInMemory && !q.Enabled {
return fmt.Errorf("sending_queue.compress_in_memory requires sending_queue.enabled=true")

Check failure on line 82 in exporter/loadbalancingexporter/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 82 in exporter/loadbalancingexporter/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, exporter-2)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 82 in exporter/loadbalancingexporter/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, exporter-2)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}
if q.CompressInMemory && (q.PayloadCompression == "" || q.PayloadCompression == QueuePayloadCompressionNone) {
return fmt.Errorf("sending_queue.compress_in_memory requires sending_queue.payload_compression to be set to snappy or zstd")

Check failure on line 85 in exporter/loadbalancingexporter/config.go

View workflow job for this annotation

GitHub Actions / scoped-tests-matrix (ubuntu-latest)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 85 in exporter/loadbalancingexporter/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (linux, exporter-2)

use-errors-new: replace fmt.Errorf by errors.New (revive)

Check failure on line 85 in exporter/loadbalancingexporter/config.go

View workflow job for this annotation

GitHub Actions / lint-matrix (windows, exporter-2)

use-errors-new: replace fmt.Errorf by errors.New (revive)
}

return nil
}

func (cfg *Config) Validate() error {
return cfg.QueueSettings.Validate()
}

// Protocol holds the individual protocol-specific settings. Only OTLP is supported at the moment.
type Protocol struct {
OTLP otlpexporter.Config `mapstructure:"otlp"`
Expand Down
27 changes: 27 additions & 0 deletions exporter/loadbalancingexporter/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,3 +25,30 @@ func TestLoadConfig(t *testing.T) {
require.NoError(t, sub.Unmarshal(cfg))
require.NotNil(t, cfg)
}

func TestConfigValidatePayloadCompression(t *testing.T) {
cfg := createDefaultConfig().(*Config)
require.NoError(t, cfg.Validate())

cfg.QueueSettings.Enabled = true
cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy
require.NoError(t, cfg.Validate())

cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionZstd
require.NoError(t, cfg.Validate())

cfg.QueueSettings.PayloadCompression = QueuePayloadCompression("invalid")
require.Error(t, cfg.Validate())
}

func TestConfigValidateCompressInMemory(t *testing.T) {
cfg := createDefaultConfig().(*Config)
cfg.QueueSettings.CompressInMemory = true
require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory requires sending_queue.enabled=true")

cfg.QueueSettings.Enabled = true
require.ErrorContains(t, cfg.Validate(), "sending_queue.compress_in_memory requires sending_queue.payload_compression")

cfg.QueueSettings.PayloadCompression = QueuePayloadCompressionSnappy
require.NoError(t, cfg.Validate())
}
52 changes: 44 additions & 8 deletions exporter/loadbalancingexporter/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"go.opentelemetry.io/collector/exporter/otlpexporter"
metricnoop "go.opentelemetry.io/otel/metric/noop"
tracenoop "go.opentelemetry.io/otel/trace/noop"
"go.uber.org/multierr"
"go.uber.org/zap"

"github.com/open-telemetry/opentelemetry-collector-contrib/exporter/loadbalancingexporter/internal/metadata"
Expand All @@ -39,13 +40,19 @@ func createDefaultConfig() component.Config {
otlpFactory := otlpexporter.NewFactory()
otlpDefaultCfg := otlpFactory.CreateDefaultConfig().(*otlpexporter.Config)
otlpDefaultCfg.ClientConfig.Endpoint = "placeholder:4317"
queueCfg := exporterhelper.NewDefaultQueueConfig()
queueCfg.Enabled = false

return &Config{
// By default we disable resilience options on loadbalancing exporter level
// to maintain compatibility with workflow in previous versions
Protocol: Protocol{
OTLP: *otlpDefaultCfg,
},
QueueSettings: QueueSettings{
QueueBatchConfig: queueCfg,
PayloadCompression: QueuePayloadCompressionNone,
},
}
}

Expand All @@ -71,12 +78,18 @@ func buildExporterSettings(typ component.Type, params exporter.Settings, endpoin
return params
}

func buildExporterResilienceOptions(options []exporterhelper.Option, cfg *Config) []exporterhelper.Option {
func buildExporterResilienceOptions(options []exporterhelper.Option, cfg *Config, payloadCodec *queuePayloadCodec) []exporterhelper.Option {
if cfg.TimeoutSettings.Timeout > 0 {
options = append(options, exporterhelper.WithTimeout(cfg.TimeoutSettings))
}
if cfg.QueueSettings.Enabled {
options = append(options, exporterhelper.WithQueue(cfg.QueueSettings))
options = append(options, exporterhelper.WithQueue(cfg.QueueSettings.QueueBatchConfig))
if payloadCodec != nil {
options = append(options, exporterhelper.WithQueueBatchPayloadCodec(payloadCodec))
if cfg.QueueSettings.CompressInMemory {
options = append(options, exporterhelper.WithQueueBatchInMemoryEncoding(true))
}
}
}
if cfg.Enabled {
options = append(options, exporterhelper.WithRetry(cfg.BackOffConfig))
Expand All @@ -91,10 +104,11 @@ func createTracesExporter(ctx context.Context, params exporter.Settings, cfg com
if err != nil {
return nil, fmt.Errorf("cannot configure loadbalancing traces exporter: %w", err)
}
payloadCodec := newQueuePayloadCodecIfEnabled(c)

options := []exporterhelper.Option{
exporterhelper.WithStart(exp.Start),
exporterhelper.WithShutdown(exp.Shutdown),
exporterhelper.WithShutdown(shutdownWithCodec(component.ShutdownFunc(exp.Shutdown), payloadCodec)),
exporterhelper.WithCapabilities(exp.Capabilities()),
}

Expand All @@ -103,7 +117,7 @@ func createTracesExporter(ctx context.Context, params exporter.Settings, cfg com
params,
cfg,
exp.ConsumeTraces,
buildExporterResilienceOptions(options, c)...,
buildExporterResilienceOptions(options, c, payloadCodec)...,
)
}

Expand All @@ -113,10 +127,11 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, cfg compo
if err != nil {
return nil, fmt.Errorf("cannot configure loadbalancing logs exporter: %w", err)
}
payloadCodec := newQueuePayloadCodecIfEnabled(c)

options := []exporterhelper.Option{
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithShutdown(shutdownWithCodec(component.ShutdownFunc(exporter.Shutdown), payloadCodec)),
exporterhelper.WithCapabilities(exporter.Capabilities()),
}

Expand All @@ -125,7 +140,7 @@ func createLogsExporter(ctx context.Context, params exporter.Settings, cfg compo
params,
cfg,
exporter.ConsumeLogs,
buildExporterResilienceOptions(options, c)...,
buildExporterResilienceOptions(options, c, payloadCodec)...,
)
}

Expand All @@ -135,10 +150,11 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg co
if err != nil {
return nil, fmt.Errorf("cannot configure loadbalancing metrics exporter: %w", err)
}
payloadCodec := newQueuePayloadCodecIfEnabled(c)

options := []exporterhelper.Option{
exporterhelper.WithStart(exporter.Start),
exporterhelper.WithShutdown(exporter.Shutdown),
exporterhelper.WithShutdown(shutdownWithCodec(component.ShutdownFunc(exporter.Shutdown), payloadCodec)),
exporterhelper.WithCapabilities(exporter.Capabilities()),
}

Expand All @@ -147,6 +163,26 @@ func createMetricsExporter(ctx context.Context, params exporter.Settings, cfg co
params,
cfg,
exporter.ConsumeMetrics,
buildExporterResilienceOptions(options, c)...,
buildExporterResilienceOptions(options, c, payloadCodec)...,
)
}

func newQueuePayloadCodecIfEnabled(cfg *Config) *queuePayloadCodec {
if !cfg.QueueSettings.Enabled {
return nil
}
if cfg.QueueSettings.PayloadCompression == "" {
return nil
}

return newQueuePayloadCodec(cfg.QueueSettings.PayloadCompression)
Comment thread
coderabbitai[bot] marked this conversation as resolved.
}

func shutdownWithCodec(shutdown component.ShutdownFunc, codec *queuePayloadCodec) component.ShutdownFunc {
if codec == nil {
return shutdown
}
return func(ctx context.Context) error {
return multierr.Append(shutdown.Shutdown(ctx), codec.Close())
}
}
Loading
Loading