Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .chloggen/fix-todo-optional.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ change_type: breaking
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Use configoptional for optional fields in exporterhelper
note: Use configoptional for sending_queue::batch field

# One or more tracking issues or pull requests related to the change
issues: [13345]
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/internal/base_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func WithQueueBatch(cfg queuebatch.Config, set QueueBatchSettings[request.Reques
o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return nil
}
if cfg.StorageID.HasValue() && set.Encoding == nil {
if cfg.StorageID != nil && set.Encoding == nil {
return errors.New("`QueueBatchSettings.Encoding` must not be nil when persistent queue is enabled")
}
o.queueBatchSettings = set
Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/base_exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/config/configretry"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest"
Expand Down Expand Up @@ -53,7 +52,8 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) {
require.Error(t, err)

qCfg := NewDefaultQueueConfig()
qCfg.StorageID = configoptional.Some(component.MustNewID("test"))
storageID := component.NewID(component.MustNewType("test"))
qCfg.StorageID = &storageID
_, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport,
WithQueueBatchSettings(newFakeQueueBatch()),
WithRetry(configretry.NewDefaultBackOffConfig()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func newPersistentQueue[T any](set Settings[T]) readableQueue[T] {
activeSizer: set.activeSizer(),
itemsSizer: set.ItemsSizer,
bytesSizer: set.BytesSizer,
storageID: *set.StorageID.Get(),
storageID: *set.StorageID,
id: set.ID,
signal: set.Signal,
blockOnOverflow: set.BlockOnOverflow,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
Expand Down Expand Up @@ -232,7 +231,8 @@ func newSettings(sizerType request.SizerType, capacity int64) Settings[int64] {

func newSettingsWithStorage(sizerType request.SizerType, capacity int64) Settings[int64] {
set := newSettings(sizerType, capacity)
set.StorageID = configoptional.Some(component.ID{})
storageID := component.ID{}
set.StorageID = &storageID
return set
}

Expand Down Expand Up @@ -513,7 +513,8 @@ func TestInvalidStorageExtensionType(t *testing.T) {
}

func TestPersistentQueue_StopAfterBadStart(t *testing.T) {
pq := newPersistentQueue[int64](Settings[int64]{StorageID: configoptional.Some(component.ID{})})
storageID := component.ID{}
pq := newPersistentQueue[int64](Settings[int64]{StorageID: &storageID})
// verify that stopping a un-start/started w/error queue does not panic
assert.NoError(t, pq.Shutdown(context.Background()))
}
Expand Down
5 changes: 2 additions & 3 deletions exporter/exporterhelper/internal/queue/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"errors"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
"go.opentelemetry.io/collector/pipeline"
)
Expand Down Expand Up @@ -64,7 +63,7 @@ type Settings[T any] struct {
WaitForResult bool
BlockOnOverflow bool
Signal pipeline.Signal
StorageID configoptional.Optional[component.ID]
StorageID *component.ID
Encoding Encoding[T]
ID component.ID
Telemetry component.TelemetrySettings
Expand Down Expand Up @@ -97,7 +96,7 @@ func NewQueue[T request.Request](set Settings[T], next ConsumeFunc[T]) (Queue[T]

func newBaseQueue[T any](set Settings[T]) (readableQueue[T], error) {
// Configure memory queue or persistent based on the config.
if !set.StorageID.HasValue() {
if set.StorageID == nil {
return newMemoryQueue[T](set), nil
}
if set.ItemsSizer == nil {
Expand Down
7 changes: 4 additions & 3 deletions exporter/exporterhelper/internal/queuebatch/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,10 @@ type Config struct {
// If true, the component will wait for space; otherwise, operations will immediately return a retryable error.
BlockOnOverflow bool `mapstructure:"block_on_overflow"`

// StorageID, if not empty, enables the persistent storage and uses the component specified
// StorageID if not empty, enables the persistent storage and uses the component specified
// as a storage extension for the persistent queue.
StorageID configoptional.Optional[component.ID] `mapstructure:"storage"`
// TODO: This will be changed to Optional when available.
StorageID *component.ID `mapstructure:"storage"`

// NumConsumers is the maximum number of concurrent consumers from the queue.
// This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.).
Expand Down Expand Up @@ -74,7 +75,7 @@ func (cfg *Config) Validate() error {
}

// Only support request sizer for persistent queue at this moment.
if cfg.StorageID.HasValue() && cfg.WaitForResult {
if cfg.StorageID != nil && cfg.WaitForResult {
return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`")
}

Expand Down
4 changes: 2 additions & 2 deletions exporter/exporterhelper/internal/queuebatch/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"github.com/stretchr/testify/require"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/exporter/exporterhelper/internal/request"
)

Expand All @@ -30,9 +29,10 @@ func TestConfig_Validate(t *testing.T) {
cfg.QueueSize = 0
require.EqualError(t, cfg.Validate(), "`queue_size` must be positive")

storageID := component.MustNewID("test")
cfg = newTestConfig()
cfg.WaitForResult = true
cfg.StorageID = configoptional.Some(component.MustNewID("test"))
cfg.StorageID = &storageID
require.EqualError(t, cfg.Validate(), "`wait_for_result` is not supported with a persistent queue configured with `storage`")

cfg = newTestConfig()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ func TestQueueBatchDifferentSizers(t *testing.T) {
func TestQueueBatchPersistenceEnabled(t *testing.T) {
cfg := newTestConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
cfg.StorageID = configoptional.Some(storageID)
cfg.StorageID = &storageID
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]())
require.NoError(t, err)

Expand All @@ -168,7 +168,7 @@ func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) {
storageError := errors.New("could not get storage client")
cfg := newTestConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
cfg.StorageID = configoptional.Some(storageID)
cfg.StorageID = &storageID
qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]())
require.NoError(t, err)

Expand All @@ -184,7 +184,7 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) {
cfg := newTestConfig()
cfg.NumConsumers = 1
storageID := component.MustNewIDWithName("file_storage", "storage")
cfg.StorageID = configoptional.Some(storageID)
cfg.StorageID = &storageID

mockReq := &requesttest.FakeRequest{Items: 2}
qSet := newFakeRequestSettings()
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/logs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -174,7 +173,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) {
fgOrigWriteState := queue.PersistRequestContextOnWrite
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = configoptional.Some(storageID)
qCfg.StorageID = &storageID
set := exportertest.NewNopSettings(exportertest.NopType)
set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue")
host := hosttest.NewHost(map[component.ID]component.Component{
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/metrics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -174,7 +173,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) {
fgOrigWriteState := queue.PersistRequestContextOnWrite
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = configoptional.Some(storageID)
qCfg.StorageID = &storageID
set := exportertest.NewNopSettings(exportertest.NopType)
set.ID = component.MustNewIDWithName("test_metrics", "with_persistent_queue")
host := hosttest.NewHost(map[component.ID]component.Component{
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/traces_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumertest"
Expand Down Expand Up @@ -172,7 +171,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) {
fgOrigWriteState := queue.PersistRequestContextOnWrite
qCfg := NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = configoptional.Some(storageID)
qCfg.StorageID = &storageID
set := exportertest.NewNopSettings(exportertest.NopType)
set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue")
host := hosttest.NewHost(map[component.ID]component.Component{
Expand Down
2 changes: 1 addition & 1 deletion exporter/exporterhelper/xexporterhelper/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ require (
github.com/stretchr/testify v1.10.0
go.opentelemetry.io/collector/component v1.35.0
go.opentelemetry.io/collector/component/componenttest v0.129.0
go.opentelemetry.io/collector/config/configoptional v0.129.0
go.opentelemetry.io/collector/consumer v1.35.0
go.opentelemetry.io/collector/consumer/consumererror v0.129.0
go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.129.0
Expand Down Expand Up @@ -46,6 +45,7 @@ require (
github.com/pmezard/go-difflib v1.0.0 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/collector/client v1.35.0 // indirect
go.opentelemetry.io/collector/config/configoptional v0.129.0 // indirect
go.opentelemetry.io/collector/config/configretry v1.35.0 // indirect
go.opentelemetry.io/collector/confmap v1.35.0 // indirect
go.opentelemetry.io/collector/extension v1.35.0 // indirect
Expand Down
3 changes: 1 addition & 2 deletions exporter/exporterhelper/xexporterhelper/profiles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componenttest"
"go.opentelemetry.io/collector/config/configoptional"
"go.opentelemetry.io/collector/consumer"
"go.opentelemetry.io/collector/consumer/consumererror"
"go.opentelemetry.io/collector/consumer/consumererror/xconsumererror"
Expand Down Expand Up @@ -172,7 +171,7 @@ func TestProfiles_WithPersistentQueue(t *testing.T) {
fgOrigWriteState := queue.PersistRequestContextOnWrite
qCfg := exporterhelper.NewDefaultQueueConfig()
storageID := component.MustNewIDWithName("file_storage", "storage")
qCfg.StorageID = configoptional.Some(storageID)
qCfg.StorageID = &storageID
set := exportertest.NewNopSettings(exportertest.NopType)
set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue")
host := hosttest.NewHost(map[component.ID]component.Component{
Expand Down
Loading