diff --git a/.chloggen/fix-todo-optional.yaml b/.chloggen/fix-todo-optional.yaml index 70b6e9f755c..77ddefb3f5e 100644 --- a/.chloggen/fix-todo-optional.yaml +++ b/.chloggen/fix-todo-optional.yaml @@ -7,7 +7,7 @@ change_type: breaking component: exporterhelper # A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). -note: Use configoptional for optional fields in exporterhelper +note: Use configoptional for sending_queue::batch field # One or more tracking issues or pull requests related to the change issues: [13345] diff --git a/exporter/exporterhelper/internal/base_exporter.go b/exporter/exporterhelper/internal/base_exporter.go index 1ee43265286..7631699a292 100644 --- a/exporter/exporterhelper/internal/base_exporter.go +++ b/exporter/exporterhelper/internal/base_exporter.go @@ -213,7 +213,7 @@ func WithQueueBatch(cfg queuebatch.Config, set QueueBatchSettings[request.Reques o.ExportFailureMessage += " Try enabling sending_queue to survive temporary failures." return nil } - if cfg.StorageID.HasValue() && set.Encoding == nil { + if cfg.StorageID != nil && set.Encoding == nil { return errors.New("`QueueBatchSettings.Encoding` must not be nil when persistent queue is enabled") } o.queueBatchSettings = set diff --git a/exporter/exporterhelper/internal/base_exporter_test.go b/exporter/exporterhelper/internal/base_exporter_test.go index a2eb7ef2051..f43216c990e 100644 --- a/exporter/exporterhelper/internal/base_exporter_test.go +++ b/exporter/exporterhelper/internal/base_exporter_test.go @@ -15,7 +15,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" @@ -53,7 +52,8 @@ func TestQueueOptionsWithRequestExporter(t *testing.T) { require.Error(t, err) qCfg := NewDefaultQueueConfig() - qCfg.StorageID = configoptional.Some(component.MustNewID("test")) + storageID := component.NewID(component.MustNewType("test")) + qCfg.StorageID = &storageID _, err = NewBaseExporter(exportertest.NewNopSettings(exportertest.NopType), pipeline.SignalMetrics, noopExport, WithQueueBatchSettings(newFakeQueueBatch()), WithRetry(configretry.NewDefaultBackOffConfig()), diff --git a/exporter/exporterhelper/internal/queue/persistent_queue.go b/exporter/exporterhelper/internal/queue/persistent_queue.go index 059c39fce80..cda2e1a7e87 100644 --- a/exporter/exporterhelper/internal/queue/persistent_queue.go +++ b/exporter/exporterhelper/internal/queue/persistent_queue.go @@ -102,7 +102,7 @@ func newPersistentQueue[T any](set Settings[T]) readableQueue[T] { activeSizer: set.activeSizer(), itemsSizer: set.ItemsSizer, bytesSizer: set.BytesSizer, - storageID: *set.StorageID.Get(), + storageID: *set.StorageID, id: set.ID, signal: set.Signal, blockOnOverflow: set.BlockOnOverflow, diff --git a/exporter/exporterhelper/internal/queue/persistent_queue_test.go b/exporter/exporterhelper/internal/queue/persistent_queue_test.go index 25e1ee1c54d..72bb81c53bd 100644 --- a/exporter/exporterhelper/internal/queue/persistent_queue_test.go +++ b/exporter/exporterhelper/internal/queue/persistent_queue_test.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/experr" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/hosttest" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" @@ -232,7 +231,8 @@ func newSettings(sizerType request.SizerType, capacity int64) Settings[int64] { func newSettingsWithStorage(sizerType request.SizerType, capacity int64) Settings[int64] { set := newSettings(sizerType, capacity) - set.StorageID = configoptional.Some(component.ID{}) + storageID := component.ID{} + set.StorageID = &storageID return set } @@ -513,7 +513,8 @@ func TestInvalidStorageExtensionType(t *testing.T) { } func TestPersistentQueue_StopAfterBadStart(t *testing.T) { - pq := newPersistentQueue[int64](Settings[int64]{StorageID: configoptional.Some(component.ID{})}) + storageID := component.ID{} + pq := newPersistentQueue[int64](Settings[int64]{StorageID: &storageID}) // verify that stopping a un-start/started w/error queue does not panic assert.NoError(t, pq.Shutdown(context.Background())) } diff --git a/exporter/exporterhelper/internal/queue/queue.go b/exporter/exporterhelper/internal/queue/queue.go index f286a95dd08..9583be9fb25 100644 --- a/exporter/exporterhelper/internal/queue/queue.go +++ b/exporter/exporterhelper/internal/queue/queue.go @@ -8,7 +8,6 @@ import ( "errors" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/pipeline" ) @@ -64,7 +63,7 @@ type Settings[T any] struct { WaitForResult bool BlockOnOverflow bool Signal pipeline.Signal - StorageID configoptional.Optional[component.ID] + StorageID *component.ID Encoding Encoding[T] ID component.ID Telemetry component.TelemetrySettings @@ -97,7 +96,7 @@ func NewQueue[T request.Request](set Settings[T], next ConsumeFunc[T]) (Queue[T] func newBaseQueue[T any](set Settings[T]) (readableQueue[T], error) { // Configure memory queue or persistent based on the config. - if !set.StorageID.HasValue() { + if set.StorageID == nil { return newMemoryQueue[T](set), nil } if set.ItemsSizer == nil { diff --git a/exporter/exporterhelper/internal/queuebatch/config.go b/exporter/exporterhelper/internal/queuebatch/config.go index 7caa75e0860..5f031c1652a 100644 --- a/exporter/exporterhelper/internal/queuebatch/config.go +++ b/exporter/exporterhelper/internal/queuebatch/config.go @@ -33,9 +33,10 @@ type Config struct { // If true, the component will wait for space; otherwise, operations will immediately return a retryable error. BlockOnOverflow bool `mapstructure:"block_on_overflow"` - // StorageID, if not empty, enables the persistent storage and uses the component specified + // StorageID if not empty, enables the persistent storage and uses the component specified // as a storage extension for the persistent queue. - StorageID configoptional.Optional[component.ID] `mapstructure:"storage"` + // TODO: This will be changed to Optional when available. + StorageID *component.ID `mapstructure:"storage"` // NumConsumers is the maximum number of concurrent consumers from the queue. // This applies across all different optional configurations from above (e.g. wait_for_result, blockOnOverflow, persistent, etc.). @@ -74,7 +75,7 @@ func (cfg *Config) Validate() error { } // Only support request sizer for persistent queue at this moment. - if cfg.StorageID.HasValue() && cfg.WaitForResult { + if cfg.StorageID != nil && cfg.WaitForResult { return errors.New("`wait_for_result` is not supported with a persistent queue configured with `storage`") } diff --git a/exporter/exporterhelper/internal/queuebatch/config_test.go b/exporter/exporterhelper/internal/queuebatch/config_test.go index 62065cd7ed1..c3da0a1d7cb 100644 --- a/exporter/exporterhelper/internal/queuebatch/config_test.go +++ b/exporter/exporterhelper/internal/queuebatch/config_test.go @@ -11,7 +11,6 @@ import ( "github.com/stretchr/testify/require" "go.opentelemetry.io/collector/component" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" ) @@ -30,9 +29,10 @@ func TestConfig_Validate(t *testing.T) { cfg.QueueSize = 0 require.EqualError(t, cfg.Validate(), "`queue_size` must be positive") + storageID := component.MustNewID("test") cfg = newTestConfig() cfg.WaitForResult = true - cfg.StorageID = configoptional.Some(component.MustNewID("test")) + cfg.StorageID = &storageID require.EqualError(t, cfg.Validate(), "`wait_for_result` is not supported with a persistent queue configured with `storage`") cfg = newTestConfig() diff --git a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go index 8bef8d251eb..fb962d6cf40 100644 --- a/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go +++ b/exporter/exporterhelper/internal/queuebatch/queue_batch_test.go @@ -151,7 +151,7 @@ func TestQueueBatchDifferentSizers(t *testing.T) { func TestQueueBatchPersistenceEnabled(t *testing.T) { cfg := newTestConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - cfg.StorageID = configoptional.Some(storageID) + cfg.StorageID = &storageID qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]()) require.NoError(t, err) @@ -168,7 +168,7 @@ func TestQueueBatchPersistenceEnabledStorageError(t *testing.T) { storageError := errors.New("could not get storage client") cfg := newTestConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - cfg.StorageID = configoptional.Some(storageID) + cfg.StorageID = &storageID qb, err := NewQueueBatch(newFakeRequestSettings(), cfg, sendertest.NewNopSenderFunc[request.Request]()) require.NoError(t, err) @@ -184,7 +184,7 @@ func TestQueueBatchPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { cfg := newTestConfig() cfg.NumConsumers = 1 storageID := component.MustNewIDWithName("file_storage", "storage") - cfg.StorageID = configoptional.Some(storageID) + cfg.StorageID = &storageID mockReq := &requesttest.FakeRequest{Items: 2} qSet := newFakeRequestSettings() diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index eb89505c898..867b6664916 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -174,7 +173,7 @@ func TestLogs_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = configoptional.Some(storageID) + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 9c62ea51f90..81e98ad84b1 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -174,7 +173,7 @@ func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = configoptional.Some(storageID) + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_metrics", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 84ea4721af1..59e622dc9ae 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -22,7 +22,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumertest" @@ -172,7 +171,7 @@ func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = configoptional.Some(storageID) + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{ diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index c85b5d4d676..b80967c84c5 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -6,7 +6,6 @@ require ( github.com/stretchr/testify v1.10.0 go.opentelemetry.io/collector/component v1.35.0 go.opentelemetry.io/collector/component/componenttest v0.129.0 - go.opentelemetry.io/collector/config/configoptional v0.129.0 go.opentelemetry.io/collector/consumer v1.35.0 go.opentelemetry.io/collector/consumer/consumererror v0.129.0 go.opentelemetry.io/collector/consumer/consumererror/xconsumererror v0.129.0 @@ -46,6 +45,7 @@ require ( github.com/pmezard/go-difflib v1.0.0 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect go.opentelemetry.io/collector/client v1.35.0 // indirect + go.opentelemetry.io/collector/config/configoptional v0.129.0 // indirect go.opentelemetry.io/collector/config/configretry v1.35.0 // indirect go.opentelemetry.io/collector/confmap v1.35.0 // indirect go.opentelemetry.io/collector/extension v1.35.0 // indirect diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index 17cdb9b281e..c19e102d152 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -20,7 +20,6 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componenttest" - "go.opentelemetry.io/collector/config/configoptional" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/consumer/consumererror/xconsumererror" @@ -172,7 +171,7 @@ func TestProfiles_WithPersistentQueue(t *testing.T) { fgOrigWriteState := queue.PersistRequestContextOnWrite qCfg := exporterhelper.NewDefaultQueueConfig() storageID := component.MustNewIDWithName("file_storage", "storage") - qCfg.StorageID = configoptional.Some(storageID) + qCfg.StorageID = &storageID set := exportertest.NewNopSettings(exportertest.NopType) set.ID = component.MustNewIDWithName("test_logs", "with_persistent_queue") host := hosttest.NewHost(map[component.ID]component.Component{