diff --git a/exporter/exporterhelper/common.go b/exporter/exporterhelper/common.go index f1080bdee04..509570960e8 100644 --- a/exporter/exporterhelper/common.go +++ b/exporter/exporterhelper/common.go @@ -85,7 +85,7 @@ func WithRetry(config configretry.BackOffConfig) Option { // This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. func WithQueue(config QueueSettings) Option { return func(o *baseExporter) { - if o.requestExporter { + if o.marshaler == nil || o.unmarshaler == nil { panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead") } if !config.Enabled { @@ -114,6 +114,9 @@ func WithQueue(config QueueSettings) Option { // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option { return func(o *baseExporter) { + if o.marshaler != nil || o.unmarshaler != nil { + panic("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead") + } if !cfg.Enabled { o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures." return @@ -135,15 +138,30 @@ func WithCapabilities(capabilities consumer.Capabilities) Option { } } +// withMarshaler is used to set the request marshaler for the new exporter helper. +// It must be provided as the first option when creating a new exporter helper. +func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option { + return func(o *baseExporter) { + o.marshaler = marshaler + } +} + +// withUnmarshaler is used to set the request unmarshaler for the new exporter helper. +// It must be provided as the first option when creating a new exporter helper. +func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option { + return func(o *baseExporter) { + o.unmarshaler = unmarshaler + } +} + // baseExporter contains common fields between different exporter types. type baseExporter struct { component.StartFunc component.ShutdownFunc - requestExporter bool - marshaler exporterqueue.Marshaler[Request] - unmarshaler exporterqueue.Unmarshaler[Request] - signal component.DataType + marshaler exporterqueue.Marshaler[Request] + unmarshaler exporterqueue.Unmarshaler[Request] + signal component.DataType set exporter.CreateSettings obsrep *ObsReport @@ -162,20 +180,14 @@ type baseExporter struct { consumerOptions []consumer.Option } -// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones. -func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool, - marshaler exporterqueue.Marshaler[Request], unmarshaler exporterqueue.Unmarshaler[Request], osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { - +func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) { obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set}) if err != nil { return nil, err } be := &baseExporter{ - requestExporter: requestExporter, - marshaler: marshaler, - unmarshaler: unmarshaler, - signal: signal, + signal: signal, queueSender: &baseRequestSender{}, obsrepSender: osf(obsReport), diff --git a/exporter/exporterhelper/common_test.go b/exporter/exporterhelper/common_test.go index 8bd7f9fc47b..e3a584b17b1 100644 --- a/exporter/exporterhelper/common_test.go +++ b/exporter/exporterhelper/common_test.go @@ -18,6 +18,7 @@ import ( "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/config/configretry" "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterqueue" "go.opentelemetry.io/collector/exporter/exportertest" ) @@ -36,11 +37,7 @@ func newNoopObsrepSender(*ObsReport) requestSender { } func TestBaseExporter(t *testing.T) { - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender) - require.NoError(t, err) - require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) - require.NoError(t, be.Shutdown(context.Background())) - be, err = newBaseExporter(defaultSettings, defaultType, true, nil, nil, newNoopObsrepSender) + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) require.NoError(t, be.Shutdown(context.Background())) @@ -49,7 +46,7 @@ func TestBaseExporter(t *testing.T) { func TestBaseExporterWithOptions(t *testing.T) { want := errors.New("my error") be, err := newBaseExporter( - defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender, + defaultSettings, defaultType, newNoopObsrepSender, WithStart(func(context.Context, component.Host) error { return want }), WithShutdown(func(context.Context) error { return want }), WithTimeout(NewDefaultTimeoutSettings()), @@ -68,15 +65,22 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) { } } -func TestQueueRetryOptionsWithRequestExporter(t *testing.T) { - bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender, +func TestQueueOptionsWithRequestExporter(t *testing.T) { + bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig())) require.Nil(t, err) - require.True(t, bs.requestExporter) + require.Nil(t, bs.marshaler) + require.Nil(t, bs.unmarshaler) require.Panics(t, func() { - _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender, + _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings())) }) + require.Panics(t, func() { + _, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(configretry.NewDefaultBackOffConfig()), + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) + }) } func TestBaseExporterLogging(t *testing.T) { @@ -85,9 +89,8 @@ func TestBaseExporterLogging(t *testing.T) { set.Logger = zap.New(logger) rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - bs, err := newBaseExporter(set, defaultType, true, nil, nil, newNoopObsrepSender, WithRetry(rCfg)) + bs, err := newBaseExporter(set, defaultType, newNoopObsrepSender, WithRetry(rCfg)) require.Nil(t, err) - require.True(t, bs.requestExporter) sendErr := bs.send(context.Background(), newErrorRequest()) require.Error(t, sendErr) diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 466b55ed9bb..ce70230e321 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -70,7 +70,7 @@ type logsExporter struct { // NewLogsExporter creates an exporter.Logs that records observability metrics and wraps every request with a Span. func NewLogsExporter( - _ context.Context, + ctx context.Context, set exporter.CreateSettings, cfg component.Config, pusher consumer.ConsumeLogsFunc, @@ -79,34 +79,11 @@ func NewLogsExporter( if cfg == nil { return nil, errNilConfig } - - if set.Logger == nil { - return nil, errNilLogger - } - if pusher == nil { return nil, errNilPushLogsData } - - be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler, - newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...) - if err != nil { - return nil, err - } - - lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error { - req := newLogsRequest(ld, pusher) - serr := be.send(ctx, req) - if errors.Is(serr, queue.ErrQueueIsFull) { - be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount())) - } - return serr - }, be.consumerOptions...) - - return &logsExporter{ - baseExporter: be, - Logs: lc, - }, err + logsOpts := []Option{withMarshaler(logsRequestMarshaler), withUnmarshaler(newLogsRequestUnmarshalerFunc(pusher))} + return NewLogsRequestExporter(ctx, set, requestFromLogs(pusher), append(logsOpts, options...)...) } // RequestFromLogsFunc converts plog.Logs data into a user-defined request. @@ -114,6 +91,13 @@ func NewLogsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestFromLogsFunc func(context.Context, plog.Logs) (Request, error) +// requestFromLogs returns a RequestFromLogsFunc that converts plog.Logs into a Request. +func requestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc { + return func(_ context.Context, ld plog.Logs) (Request, error) { + return newLogsRequest(ld, pusher), nil + } +} + // NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. @@ -131,7 +115,7 @@ func NewLogsRequestExporter( return nil, errNilLogsConverter } - be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...) + be, err := newBaseExporter(set, component.DataTypeLogs, newLogsExporterWithObservability, options...) if err != nil { return nil, err } diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index bdb284bb11a..ab7d65dae65 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -70,7 +70,7 @@ type metricsExporter struct { // NewMetricsExporter creates an exporter.Metrics that records observability metrics and wraps every request with a Span. func NewMetricsExporter( - _ context.Context, + ctx context.Context, set exporter.CreateSettings, cfg component.Config, pusher consumer.ConsumeMetricsFunc, @@ -79,34 +79,11 @@ func NewMetricsExporter( if cfg == nil { return nil, errNilConfig } - - if set.Logger == nil { - return nil, errNilLogger - } - if pusher == nil { return nil, errNilPushMetricsData } - - be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler, - newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...) - if err != nil { - return nil, err - } - - mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error { - req := newMetricsRequest(md, pusher) - serr := be.send(ctx, req) - if errors.Is(serr, queue.ErrQueueIsFull) { - be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount())) - } - return serr - }, be.consumerOptions...) - - return &metricsExporter{ - baseExporter: be, - Metrics: mc, - }, err + metricsOpts := []Option{withMarshaler(metricsRequestMarshaler), withUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher))} + return NewMetricsRequestExporter(ctx, set, requestFromMetrics(pusher), append(metricsOpts, options...)...) } // RequestFromMetricsFunc converts pdata.Metrics into a user-defined request. @@ -114,6 +91,13 @@ func NewMetricsExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestFromMetricsFunc func(context.Context, pmetric.Metrics) (Request, error) +// requestFromMetrics returns a RequestFromMetricsFunc that converts pdata.Metrics into a Request. +func requestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFunc { + return func(_ context.Context, md pmetric.Metrics) (Request, error) { + return newMetricsRequest(md, pusher), nil + } +} + // NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. @@ -131,7 +115,7 @@ func NewMetricsRequestExporter( return nil, errNilMetricsConverter } - be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...) + be, err := newBaseExporter(set, component.DataTypeMetrics, newMetricsSenderWithObservability, options...) if err != nil { return nil, err } diff --git a/exporter/exporterhelper/queue_sender_test.go b/exporter/exporterhelper/queue_sender_test.go index 598b63a3f9d..a2616152ea6 100644 --- a/exporter/exporterhelper/queue_sender_test.go +++ b/exporter/exporterhelper/queue_sender_test.go @@ -27,7 +27,9 @@ func TestQueuedRetry_StopWhileWaiting(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -59,7 +61,9 @@ func TestQueuedRetry_DoNotPreserveCancellation(t *testing.T) { qCfg := NewDefaultQueueSettings() qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -89,7 +93,9 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) - be, err := newBaseExporter(set, defaultType, false, nil, nil, newNoopObsrepSender, WithQueue(qCfg)) + be, err := newBaseExporter(set, defaultType, newNoopObsrepSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -103,40 +109,54 @@ func TestQueuedRetry_RejectOnFull(t *testing.T) { func TestQueuedRetryHappyPath(t *testing.T) { tests := []struct { - name string - queueOption Option + name string + queueOptions []Option }{ { name: "WithQueue", - queueOption: WithQueue(QueueSettings{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }), + queueOptions: []Option{ + withMarshaler(mockRequestMarshaler), + withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithQueue(QueueSettings{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }), + WithRetry(configretry.NewDefaultBackOffConfig()), + }, }, { name: "WithRequestQueue/MemoryQueueFactory", - queueOption: WithRequestQueue(exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, exporterqueue.NewMemoryQueueFactory[Request]()), + queueOptions: []Option{ + WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewMemoryQueueFactory[Request]()), + WithRetry(configretry.NewDefaultBackOffConfig()), + }, }, { name: "WithRequestQueue/PersistentQueueFactory", - queueOption: WithRequestQueue(exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + queueOptions: []Option{ + WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + WithRetry(configretry.NewDefaultBackOffConfig()), + }, }, { name: "WithRequestQueue/PersistentQueueFactory/RequestsLimit", - queueOption: WithRequestQueue(exporterqueue.Config{ - Enabled: true, - QueueSize: 10, - NumConsumers: 1, - }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + queueOptions: []Option{ + WithRequestQueue(exporterqueue.Config{ + Enabled: true, + QueueSize: 10, + NumConsumers: 1, + }, exporterqueue.NewPersistentQueueFactory[Request](nil, exporterqueue.PersistentQueueSettings[Request]{})), + WithRetry(configretry.NewDefaultBackOffConfig()), + }, }, } for _, tt := range tests { @@ -145,9 +165,8 @@ func TestQueuedRetryHappyPath(t *testing.T) { require.NoError(t, err) t.Cleanup(func() { require.NoError(t, tel.Shutdown(context.Background())) }) - rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tel.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), tt.queueOption) + be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, tt.queueOptions...) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -190,7 +209,9 @@ func TestQueuedRetry_QueueMetricsReported(t *testing.T) { qCfg.NumConsumers = 0 // to make every request go straight to the queue rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -239,24 +260,30 @@ func TestQueueSettings_Validate(t *testing.T) { func TestQueueRetryWithDisabledQueue(t *testing.T) { tests := []struct { - name string - queueOption Option + name string + queueOptions []Option }{ { name: "WithQueue", - queueOption: func() Option { - qs := NewDefaultQueueSettings() - qs.Enabled = false - return WithQueue(qs) - }(), + queueOptions: []Option{ + withMarshaler(mockRequestMarshaler), + withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + func() Option { + qs := NewDefaultQueueSettings() + qs.Enabled = false + return WithQueue(qs) + }(), + }, }, { name: "WithRequestQueue", - queueOption: func() Option { - qs := exporterqueue.NewDefaultConfig() - qs.Enabled = false - return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[Request]()) - }(), + queueOptions: []Option{ + func() Option { + qs := exporterqueue.NewDefaultConfig() + qs.Enabled = false + return WithRequestQueue(qs, exporterqueue.NewMemoryQueueFactory[Request]()) + }(), + }, }, } @@ -265,8 +292,7 @@ func TestQueueRetryWithDisabledQueue(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) - be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, - tt.queueOption) + be, err := newBaseExporter(set, component.DataTypeLogs, newObservabilityConsumerSender, tt.queueOptions...) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -290,7 +316,8 @@ func TestQueueFailedRequestDropped(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) - be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newNoopObsrepSender, WithQueue(NewDefaultQueueSettings())) + be, err := newBaseExporter(set, component.DataTypeLogs, newNoopObsrepSender, + WithRequestQueue(exporterqueue.NewDefaultConfig(), exporterqueue.NewMemoryQueueFactory[Request]())) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) mockR := newMockRequest(2, errors.New("some error")) @@ -311,7 +338,9 @@ func TestQueuedRetryPersistenceEnabled(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -335,7 +364,8 @@ func TestQueuedRetryPersistenceEnabledStorageError(t *testing.T) { qCfg.StorageID = &storageID // enable persistence rCfg := configretry.NewDefaultBackOffConfig() set := exporter.CreateSettings{ID: defaultID, TelemetrySettings: tt.TelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()} - be, err := newBaseExporter(set, defaultType, false, mockRequestMarshaler, mockRequestUnmarshaler(&mockRequest{}), newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(set, defaultType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), + withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -358,8 +388,8 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { rCfg.MaxElapsedTime = 0 // retry infinitely so shutdown can be triggered mockReq := newErrorRequest() - be, err := newBaseExporter(defaultSettings, defaultType, false, mockRequestMarshaler, mockRequestUnmarshaler(mockReq), - newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), + withUnmarshaler(mockRequestUnmarshaler(mockReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) var extensions = map[component.ID]component.Component{ @@ -382,8 +412,8 @@ func TestQueuedRetryPersistentEnabled_NoDataLossOnShutdown(t *testing.T) { // start the exporter again replacing the preserved mockRequest in the unmarshaler with a new one that doesn't fail. replacedReq := newMockRequest(1, nil) - be, err = newBaseExporter(defaultSettings, defaultType, false, mockRequestMarshaler, mockRequestUnmarshaler(replacedReq), - newNoopObsrepSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err = newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender, withMarshaler(mockRequestMarshaler), + withUnmarshaler(mockRequestUnmarshaler(replacedReq)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), host)) t.Cleanup(func() { require.NoError(t, be.Shutdown(context.Background())) }) diff --git a/exporter/exporterhelper/retry_sender_test.go b/exporter/exporterhelper/retry_sender_test.go index 74e8c2a3ab9..96b4904f372 100644 --- a/exporter/exporterhelper/retry_sender_test.go +++ b/exporter/exporterhelper/retry_sender_test.go @@ -39,7 +39,8 @@ func TestQueuedRetry_DropOnPermanentError(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := configretry.NewDefaultBackOffConfig() mockR := newMockRequest(2, consumererror.NewPermanent(errors.New("bad data"))) - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(mockR)), WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -62,9 +63,9 @@ func TestQueuedRetry_DropOnNoRetry(t *testing.T) { qCfg := NewDefaultQueueSettings() rCfg := configretry.NewDefaultBackOffConfig() rCfg.Enabled = false - be, err := newBaseExporter(defaultSettings, defaultType, false, mockRequestMarshaler, - mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error"))), - newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, withMarshaler(mockRequestMarshaler), + withUnmarshaler(mockRequestUnmarshaler(newMockRequest(2, errors.New("transient error")))), + WithQueue(qCfg), WithRetry(rCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -89,7 +90,9 @@ func TestQueuedRetry_OnError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) t.Cleanup(func() { @@ -117,7 +120,9 @@ func TestQueuedRetry_MaxElapsedTime(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = time.Millisecond rCfg.MaxElapsedTime = 100 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -163,7 +168,9 @@ func TestQueuedRetry_ThrottleError(t *testing.T) { qCfg.NumConsumers = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 10 * time.Millisecond - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -195,7 +202,9 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { qCfg.QueueSize = 1 rCfg := configretry.NewDefaultBackOffConfig() rCfg.InitialInterval = 0 - be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg), WithQueue(qCfg)) + be, err := newBaseExporter(defaultSettings, defaultType, newObservabilityConsumerSender, + withMarshaler(mockRequestMarshaler), withUnmarshaler(mockRequestUnmarshaler(&mockRequest{})), + WithRetry(rCfg), WithQueue(qCfg)) require.NoError(t, err) ocs := be.obsrepSender.(*observabilityConsumerSender) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) @@ -220,7 +229,7 @@ func TestQueuedRetry_RetryOnError(t *testing.T) { func TestQueueRetryWithNoQueue(t *testing.T) { rCfg := configretry.NewDefaultBackOffConfig() rCfg.MaxElapsedTime = time.Nanosecond // fail fast - be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) + be, err := newBaseExporter(exportertest.NewNopCreateSettings(), component.DataTypeLogs, newObservabilityConsumerSender, WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) @@ -241,7 +250,7 @@ func TestQueueRetryWithDisabledRetires(t *testing.T) { set := exportertest.NewNopCreateSettings() logger, observed := observer.New(zap.ErrorLevel) set.Logger = zap.New(logger) - be, err := newBaseExporter(set, component.DataTypeLogs, false, nil, nil, newObservabilityConsumerSender, WithRetry(rCfg)) + be, err := newBaseExporter(set, component.DataTypeLogs, newObservabilityConsumerSender, WithRetry(rCfg)) require.NoError(t, err) require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost())) ocs := be.obsrepSender.(*observabilityConsumerSender) diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 806ea2d485c..778c0d63989 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -70,7 +70,7 @@ type traceExporter struct { // NewTracesExporter creates an exporter.Traces that records observability metrics and wraps every request with a Span. func NewTracesExporter( - _ context.Context, + ctx context.Context, set exporter.CreateSettings, cfg component.Config, pusher consumer.ConsumeTracesFunc, @@ -79,34 +79,11 @@ func NewTracesExporter( if cfg == nil { return nil, errNilConfig } - - if set.Logger == nil { - return nil, errNilLogger - } - if pusher == nil { return nil, errNilPushTraceData } - - be, err := newBaseExporter(set, component.DataTypeTraces, false, tracesRequestMarshaler, - newTraceRequestUnmarshalerFunc(pusher), newTracesExporterWithObservability, options...) - if err != nil { - return nil, err - } - - tc, err := consumer.NewTraces(func(ctx context.Context, td ptrace.Traces) error { - req := newTracesRequest(td, pusher) - serr := be.send(ctx, req) - if errors.Is(serr, queue.ErrQueueIsFull) { - be.obsrep.recordEnqueueFailure(ctx, component.DataTypeTraces, int64(req.ItemsCount())) - } - return serr - }, be.consumerOptions...) - - return &traceExporter{ - baseExporter: be, - Traces: tc, - }, err + tracesOpts := []Option{withMarshaler(tracesRequestMarshaler), withUnmarshaler(newTraceRequestUnmarshalerFunc(pusher))} + return NewTracesRequestExporter(ctx, set, requestFromTraces(pusher), append(tracesOpts, options...)...) } // RequestFromTracesFunc converts ptrace.Traces into a user-defined Request. @@ -114,6 +91,13 @@ func NewTracesExporter( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestFromTracesFunc func(context.Context, ptrace.Traces) (Request, error) +// requestFromTraces returns a RequestFromTracesFunc that converts ptrace.Traces into a Request. +func requestFromTraces(pusher consumer.ConsumeTracesFunc) RequestFromTracesFunc { + return func(_ context.Context, traces ptrace.Traces) (Request, error) { + return newTracesRequest(traces, pusher), nil + } +} + // NewTracesRequestExporter creates a new traces exporter based on a custom TracesConverter and RequestSender. // This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. @@ -131,7 +115,7 @@ func NewTracesRequestExporter( return nil, errNilTracesConverter } - be, err := newBaseExporter(set, component.DataTypeTraces, true, nil, nil, newTracesExporterWithObservability, options...) + be, err := newBaseExporter(set, component.DataTypeTraces, newTracesExporterWithObservability, options...) if err != nil { return nil, err }