Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .chloggen/failoverconnector-queue-settings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver)
component: failoverconnector

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds queue_settings to failoverconnector to enable queueing on the "exporter portion" of the connector.

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [33077]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext:

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
5 changes: 5 additions & 0 deletions connector/failoverconnector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"time"

"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pipeline"
)

Expand All @@ -16,6 +17,10 @@ var (
)

type Config struct {
// QueueSettings use the exporterhelper sending_queue to move the queue to the connector to avoid data being stuck
// in the queue of an unhealthy exporter
QueueSettings exporterhelper.QueueBatchConfig `mapstructure:"sending_queue"`

// PipelinePriority is the list of pipeline level priorities in a 1 - n configuration, multiple pipelines can
// sit at a single priority level and will be routed in a fanout. If any pipeline at a level fails, the
// level is considered unhealthy
Expand Down
3 changes: 3 additions & 0 deletions connector/failoverconnector/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/confmap/confmaptest"
"go.opentelemetry.io/collector/confmap/xconfmap"
"go.opentelemetry.io/collector/exporter/exporterhelper"
"go.opentelemetry.io/collector/pipeline"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/metadata"
Expand All @@ -26,6 +27,7 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "default"),
expected: &Config{
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
PipelinePriority: [][]pipeline.ID{
{
pipeline.NewIDWithName(pipeline.SignalTraces, ""),
Expand All @@ -37,6 +39,7 @@ func TestLoadConfig(t *testing.T) {
{
id: component.NewIDWithName(metadata.Type, "full"),
expected: &Config{
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
Comment thread
akats7 marked this conversation as resolved.
PipelinePriority: [][]pipeline.ID{
{
pipeline.NewIDWithName(pipeline.SignalTraces, "first"),
Expand Down
99 changes: 93 additions & 6 deletions connector/failoverconnector/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/connector"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/exporter"
"go.opentelemetry.io/collector/exporter/exporterhelper"

"github.com/open-telemetry/opentelemetry-collector-contrib/connector/failoverconnector/internal/metadata"
)
Expand All @@ -26,35 +28,120 @@ func NewFactory() connector.Factory {

func createDefaultConfig() component.Config {
return &Config{
QueueSettings: exporterhelper.NewDefaultQueueConfig(),
RetryInterval: 10 * time.Minute,
RetryGap: 0,
MaxRetries: 0,
}
}

func createTracesToTraces(
_ context.Context,
ctx context.Context,
set connector.Settings,
cfg component.Config,
traces consumer.Traces,
) (connector.Traces, error) {
return newTracesToTraces(set, cfg, traces)
t, err := newTracesToTraces(set, cfg, traces)
if err != nil {
return nil, err
}
expSettings := exporter.Settings{
ID: set.ID,
TelemetrySettings: set.TelemetrySettings,
BuildInfo: set.BuildInfo,
}

oCfg := cfg.(*Config)

// If queue is disabled, return the raw failover connector
if !oCfg.QueueSettings.Enabled {
return t, nil
}

// If queue is enabled, wrap with exporterhelper
wrapped, err := exporterhelper.NewTraces(ctx, expSettings, cfg,
t.ConsumeTraces,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithQueue(oCfg.QueueSettings),
)
if err != nil {
return nil, err
}

// Return testable wrapper that exposes internal failover router
return newWrappedTracesConnector(wrapped, t.(*tracesFailover)), nil
}

func createMetricsToMetrics(
_ context.Context,
ctx context.Context,
set connector.Settings,
cfg component.Config,
metrics consumer.Metrics,
) (connector.Metrics, error) {
return newMetricsToMetrics(set, cfg, metrics)
t, err := newMetricsToMetrics(set, cfg, metrics)
if err != nil {
return nil, err
}
expSettings := exporter.Settings{
ID: set.ID,
TelemetrySettings: set.TelemetrySettings,
BuildInfo: set.BuildInfo,
}

oCfg := cfg.(*Config)

// If queue is disabled, return the raw failover connector directly (original behavior)
if !oCfg.QueueSettings.Enabled {
return t, nil
}

// If queue is enabled, wrap with exporterhelper
wrapped, err := exporterhelper.NewMetrics(ctx, expSettings, cfg,
t.ConsumeMetrics,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithQueue(oCfg.QueueSettings),
)
if err != nil {
return nil, err
}

// Return testable wrapper that exposes internal failover router
return newWrappedMetricsConnector(wrapped, t.(*metricsFailover)), nil
}

func createLogsToLogs(
_ context.Context,
ctx context.Context,
set connector.Settings,
cfg component.Config,
logs consumer.Logs,
) (connector.Logs, error) {
return newLogsToLogs(set, cfg, logs)
t, err := newLogsToLogs(set, cfg, logs)
if err != nil {
return nil, err
}
expSettings := exporter.Settings{
ID: set.ID,
TelemetrySettings: set.TelemetrySettings,
BuildInfo: set.BuildInfo,
}

oCfg := cfg.(*Config)

// If queue is disabled, return the raw failover connector directly (original behavior)
if !oCfg.QueueSettings.Enabled {
return t, nil
}

// If queue is enabled, wrap with exporterhelper
wrapped, err := exporterhelper.NewLogs(ctx, expSettings, cfg,
t.ConsumeLogs,
exporterhelper.WithCapabilities(consumer.Capabilities{MutatesData: false}),
exporterhelper.WithQueue(oCfg.QueueSettings),
)
if err != nil {
return nil, err
}

// Return testable wrapper that exposes internal failover router
return newWrappedLogsConnector(wrapped, t.(*logsFailover)), nil
Comment on lines +118 to +146

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Non-blocking!) I see there is generic code [C any] in this file. It's not the easiest thing to do, but it's possible (at least!) to write this in generic Go so that you have to can write it once instead of once per signal.

Here's an example I have pending (and will return to):

https://github.com/open-telemetry/opentelemetry-collector/pull/13265/files#diff-b60f5aaeade2c9da1069a8c3690949ec32ab7a4ec06b9d0dca9b8bdbba518c9cR54

}
6 changes: 5 additions & 1 deletion connector/failoverconnector/failover.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,11 @@ func (f *baseFailoverRouter[C]) reportConsumerError(idx int) {
}

func (f *baseFailoverRouter[C]) Shutdown() {
close(f.done)
select {
case <-f.done:
default:
close(f.done)
}
}

func newBaseFailoverRouter[C any](provider consumerProvider[C], cfg *Config) (*baseFailoverRouter[C], error) {
Expand Down
31 changes: 16 additions & 15 deletions connector/failoverconnector/failover_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestFailoverRecovery(t *testing.T) {
require.NoError(t, err)

failoverConnector := conn.(*tracesFailover)
tRouter := failoverConnector.failover

tr := sampleTrace()

Expand All @@ -51,7 +52,7 @@ func TestFailoverRecovery(t *testing.T) {

t.Run("single failover recovery to primary consumer: level 2 -> 1", func(t *testing.T) {
defer func() {
resetConsumers(failoverConnector, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth)
resetConsumers(tRouter, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth)
}()
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))

Expand All @@ -62,77 +63,77 @@ func TestFailoverRecovery(t *testing.T) {
failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 0, tr)
return consumeTracesAndCheckStable(tRouter, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
})

t.Run("double failover recovery: level 3 -> 2 -> 1", func(t *testing.T) {
defer func() {
resetConsumers(failoverConnector, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth)
resetConsumers(tRouter, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth)
}()
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 2, tr)
return consumeTracesAndCheckStable(tRouter, 2, tr)
}, 3*time.Second, 5*time.Millisecond)

// Simulate recovery of exporter
failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 1, tr)
return consumeTracesAndCheckStable(tRouter, 1, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkFirst)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 0, tr)
return consumeTracesAndCheckStable(tRouter, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
})

t.Run("multiple failover recovery: level 3 -> 2 -> 4 -> 3 -> 1", func(t *testing.T) {
defer func() {
resetConsumers(failoverConnector, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth)
resetConsumers(tRouter, &sinkFirst, &sinkSecond, &sinkThird, &sinkFourth)
}()
failoverConnector.failover.ModifyConsumerAtIndex(0, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 2, tr)
return consumeTracesAndCheckStable(tRouter, 2, tr)
}, 3*time.Second, 5*time.Millisecond)

// Simulate recovery of exporter
failoverConnector.failover.ModifyConsumerAtIndex(1, &sinkSecond)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 1, tr)
return consumeTracesAndCheckStable(tRouter, 1, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(2, consumertest.NewErr(errTracesConsumer))
failoverConnector.failover.ModifyConsumerAtIndex(1, consumertest.NewErr(errTracesConsumer))

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 3, tr)
return consumeTracesAndCheckStable(tRouter, 3, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(2, &sinkThird)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 2, tr)
return consumeTracesAndCheckStable(tRouter, 2, tr)
}, 3*time.Second, 5*time.Millisecond)

failoverConnector.failover.ModifyConsumerAtIndex(0, &sinkThird)

require.Eventually(t, func() bool {
return consumeTracesAndCheckStable(failoverConnector, 0, tr)
return consumeTracesAndCheckStable(tRouter, 0, tr)
}, 3*time.Second, 5*time.Millisecond)
})
}

func resetConsumers(conn *tracesFailover, consumers ...consumer.Traces) {
func resetConsumers(router *tracesRouter, consumers ...consumer.Traces) {
for i, sink := range consumers {
conn.failover.ModifyConsumerAtIndex(i, sink)
router.ModifyConsumerAtIndex(i, sink)
}
conn.failover.TestSetStableConsumerIndex(0)
router.TestSetStableConsumerIndex(0)
}
10 changes: 10 additions & 0 deletions connector/failoverconnector/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,16 @@ require (
go.opentelemetry.io/collector/connector/connectortest v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/consumer v1.41.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/consumer/consumertest v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/exporter v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/exporter/exporterhelper v0.135.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/pdata v1.41.1-0.20250911155607-37a3ace6274c
go.opentelemetry.io/collector/pipeline v1.41.1-0.20250911155607-37a3ace6274c
go.uber.org/goleak v1.3.0
go.uber.org/zap v1.27.0
)

require (
github.com/cenkalti/backoff/v5 v5.0.3 // indirect
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/go-logr/logr v1.4.3 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand All @@ -37,12 +40,19 @@ require (
github.com/modern-go/reflect2 v1.0.3-0.20250322232337-35a7c28c31ee // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/client v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configoptional v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/config/configretry v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/connector/xconnector v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/consumer/consumererror v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/consumer/xconsumer v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/extension v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/extension/xextension v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/featuregate v1.41.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/internal/fanoutconsumer v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/internal/telemetry v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/pdata/pprofile v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/pdata/xpdata v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/collector/pipeline/xpipeline v0.135.1-0.20250911155607-37a3ace6274c // indirect
go.opentelemetry.io/contrib/bridges/otelzap v0.12.0 // indirect
go.opentelemetry.io/otel v1.38.0 // indirect
Expand Down
Loading
Loading