From 31cdb5b513c7147106f469698533dc1c88a3f4de Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 9 Sep 2025 18:40:17 +0200 Subject: [PATCH 01/11] [exporterhelper] Move NewLogsRequest to exporterhelper/internal --- exporter/exporterhelper/internal/constants.go | 27 +++++ .../exporterhelper/internal/new_request.go | 69 +++++++++++ .../internal/new_request_test.go | 108 ++++++++++++++++++ .../internal/request/request.go | 8 ++ exporter/exporterhelper/logs.go | 58 +--------- exporter/exporterhelper/logs_test.go | 94 +-------------- exporter/exporterhelper/request.go | 7 +- 7 files changed, 219 insertions(+), 152 deletions(-) create mode 100644 exporter/exporterhelper/internal/constants.go create mode 100644 exporter/exporterhelper/internal/new_request.go create mode 100644 exporter/exporterhelper/internal/new_request_test.go diff --git a/exporter/exporterhelper/internal/constants.go b/exporter/exporterhelper/internal/constants.go new file mode 100644 index 000000000000..763c2dc5547b --- /dev/null +++ b/exporter/exporterhelper/internal/constants.go @@ -0,0 +1,27 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import "errors" + +var ( + // errNilConfig is returned when an empty name is given. + errNilConfig = errors.New("nil config") + // errNilLogger is returned when a logger is nil + errNilLogger = errors.New("nil logger") + // errNilConsumeRequest is returned when a nil PushTraces is given. + errNilConsumeRequest = errors.New("nil RequestConsumeFunc") + // errNilPushTraces is returned when a nil PushTraces is given. + errNilPushTraces = errors.New("nil PushTraces") + // errNilPushMetrics is returned when a nil PushMetrics is given. + errNilPushMetrics = errors.New("nil PushMetrics") + // errNilPushLogs is returned when a nil PushLogs is given. + errNilPushLogs = errors.New("nil PushLogs") + // errNilTracesConverter is returned when a nil RequestFromTracesFunc is given. + errNilTracesConverter = errors.New("nil RequestFromTracesFunc") + // errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given. + errNilMetricsConverter = errors.New("nil RequestFromMetricsFunc") + // errNilLogsConverter is returned when a nil RequestFromLogsFunc is given. + errNilLogsConverter = errors.New("nil RequestFromLogsFunc") +) diff --git a/exporter/exporterhelper/internal/new_request.go b/exporter/exporterhelper/internal/new_request.go new file mode 100644 index 000000000000..ad58e35e1c3e --- /dev/null +++ b/exporter/exporterhelper/internal/new_request.go @@ -0,0 +1,69 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import ( + "context" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pipeline" + "go.uber.org/zap" +) + +type logsExporter struct { + *BaseExporter + consumer.Logs +} + +// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewLogsRequest( + _ context.Context, + set exporter.Settings, + converter request.RequestConverterFunc[plog.Logs], + pusher request.RequestConsumeFunc, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilLogsConverter + } + + if pusher == nil { + return nil, errNilConsumeRequest + } + + be, err := NewBaseExporter(set, pipeline.SignalLogs, pusher, options...) + if err != nil { + return nil, err + } + + lc, err := consumer.NewLogs(newConsumeLogs(converter, be, set.Logger), be.ConsumerOptions...) + if err != nil { + return nil, err + } + + return &logsExporter{BaseExporter: be, Logs: lc}, nil +} + +func newConsumeLogs(converter request.RequestConverterFunc[plog.Logs], be *BaseExporter, logger *zap.Logger) consumer.ConsumeLogsFunc { + return func(ctx context.Context, ld plog.Logs) error { + req, err := converter(ctx, ld) + if err != nil { + logger.Error("Failed to convert logs. Dropping data.", + zap.Int("dropped_log_records", ld.LogRecordCount()), + zap.Error(err)) + return consumererror.NewPermanent(err) + } + return be.Send(ctx, req) + } +} diff --git a/exporter/exporterhelper/internal/new_request_test.go b/exporter/exporterhelper/internal/new_request_test.go new file mode 100644 index 000000000000..bc599707c9f1 --- /dev/null +++ b/exporter/exporterhelper/internal/new_request_test.go @@ -0,0 +1,108 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" +) + +func TestLogsRequest_NilLogger(t *testing.T) { + le, err := NewLogsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + +func TestLogsRequest_NilLogsConverter(t *testing.T) { + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, le) + require.Equal(t, errNilLogsConverter, err) +} + +func TestLogsRequest_NilPushLogsData(t *testing.T) { + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromLogsFunc(nil), nil) + require.Nil(t, le) + require.Equal(t, errNilConsumeRequest, err) +} + +func TestLogsRequest_Default(t *testing.T) { + ld := plog.NewLogs() + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + +func TestLogsRequest_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + +func TestLogsRequest_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdown)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.NoError(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestLogsRequest_Default_ConvertError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("convert_error") + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(want), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsRequest_Default_ExportError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("export_error") + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[request.Request](want)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsRequest_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdownErr)) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, want, le.Shutdown(context.Background())) +} diff --git a/exporter/exporterhelper/internal/request/request.go b/exporter/exporterhelper/internal/request/request.go index c0a44082d598..13729f91fb76 100644 --- a/exporter/exporterhelper/internal/request/request.go +++ b/exporter/exporterhelper/internal/request/request.go @@ -5,6 +5,8 @@ package request // import "go.opentelemetry.io/collector/exporter/exporterhelper import ( "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" ) // Request represents a single request that can be sent to an external endpoint. @@ -41,3 +43,9 @@ type ErrorHandler interface { // Otherwise, it should return the original Request. OnError(error) Request } + +type RequestConverterFunc[T any] func(context.Context, T) (Request, error) + +// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, +// and accessing it is considered undefined behavior. +type RequestConsumeFunc = sender.SendFunc[Request] diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index cdac6a5e1f87..f6d67fe8eaf8 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -20,7 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/xpdata/pref" pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" - "go.opentelemetry.io/collector/pipeline" ) var ( @@ -131,11 +128,6 @@ func (req *logsRequest) BytesSize() int { return logsMarshaler.LogsSize(req.ld) } -type logsExporter struct { - *internal.BaseExporter - consumer.Logs -} - // NewLogs creates an exporter.Logs that records observability logs and wraps every request with a Span. func NewLogs( ctx context.Context, @@ -150,7 +142,7 @@ func NewLogs( if pusher == nil { return nil, errNilPushLogs } - return NewLogsRequest(ctx, set, requestFromLogs(), requestConsumeFromLogs(pusher), + return internal.NewLogsRequest(ctx, set, requestFromLogs(), requestConsumeFromLogs(pusher), append([]Option{internal.WithQueueBatchSettings(NewLogsQueueBatchSettings())}, options...)...) } @@ -167,51 +159,3 @@ func requestFromLogs() RequestConverterFunc[plog.Logs] { return newLogsRequest(ld), nil } } - -// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewLogsRequest( - _ context.Context, - set exporter.Settings, - converter RequestConverterFunc[plog.Logs], - pusher RequestConsumeFunc, - options ...Option, -) (exporter.Logs, error) { - if set.Logger == nil { - return nil, errNilLogger - } - - if converter == nil { - return nil, errNilLogsConverter - } - - if pusher == nil { - return nil, errNilConsumeRequest - } - - be, err := internal.NewBaseExporter(set, pipeline.SignalLogs, pusher, options...) - if err != nil { - return nil, err - } - - lc, err := consumer.NewLogs(newConsumeLogs(converter, be, set.Logger), be.ConsumerOptions...) - if err != nil { - return nil, err - } - - return &logsExporter{BaseExporter: be, Logs: lc}, nil -} - -func newConsumeLogs(converter RequestConverterFunc[plog.Logs], be *internal.BaseExporter, logger *zap.Logger) consumer.ConsumeLogsFunc { - return func(ctx context.Context, ld plog.Logs) error { - req, err := converter(ctx, ld) - if err != nil { - logger.Error("Failed to convert logs. Dropping data.", - zap.Int("dropped_log_records", ld.LogRecordCount()), - zap.Error(err)) - return consumererror.NewPermanent(err) - } - return be.Send(ctx, req) - } -} diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 867b66649162..d32ec05e06ce 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -71,30 +71,12 @@ func TestLogs_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } -func TestLogsRequest_NilLogger(t *testing.T) { - le, err := NewLogsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, le) - require.Equal(t, errNilLogger, err) -} - func TestLogs_NilPushLogsData(t *testing.T) { le, err := NewLogs(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeLogsConfig, nil) require.Nil(t, le) require.Equal(t, errNilPushLogs, err) } -func TestLogsRequest_NilLogsConverter(t *testing.T) { - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, le) - require.Equal(t, errNilLogsConverter, err) -} - -func TestLogsRequest_NilPushLogsData(t *testing.T) { - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromLogsFunc(nil), nil) - require.Nil(t, le) - require.Equal(t, errNilConsumeRequest, err) -} - func TestLogs_Default(t *testing.T) { ld := plog.NewLogs() le, err := NewLogs(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeLogsConfig, newPushLogsData(nil)) @@ -107,19 +89,6 @@ func TestLogs_Default(t *testing.T) { assert.NoError(t, le.Shutdown(context.Background())) } -func TestLogsRequest_Default(t *testing.T) { - ld := plog.NewLogs() - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - assert.NotNil(t, le) - require.NoError(t, err) - - assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) - assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) - assert.NoError(t, le.Shutdown(context.Background())) -} - func TestLogs_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewLogs(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeLogsConfig, newPushLogsData(nil), WithCapabilities(capabilities)) @@ -129,16 +98,6 @@ func TestLogs_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, le.Capabilities()) } -func TestLogsRequest_WithCapabilities(t *testing.T) { - capabilities := consumer.Capabilities{MutatesData: true} - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithCapabilities(capabilities)) - require.NoError(t, err) - require.NotNil(t, le) - - assert.Equal(t, capabilities, le.Capabilities()) -} - func TestLogs_Default_ReturnError(t *testing.T) { ld := plog.NewLogs() want := errors.New("my_error") @@ -148,26 +107,6 @@ func TestLogs_Default_ReturnError(t *testing.T) { require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } -func TestLogsRequest_Default_ConvertError(t *testing.T) { - ld := plog.NewLogs() - want := errors.New("convert_error") - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(want), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - require.NotNil(t, le) - require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) -} - -func TestLogsRequest_Default_ExportError(t *testing.T) { - ld := plog.NewLogs() - want := errors.New("export_error") - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) - require.NoError(t, err) - require.NotNil(t, le) - require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) -} - func TestLogs_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite @@ -279,7 +218,7 @@ func TestLogsRequest_WithRecordMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - le, err := NewLogsRequest(context.Background(), + le, err := internal.NewLogsRequest(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) @@ -305,7 +244,7 @@ func TestLogsRequest_WithRecordMetrics_ExportError(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - le, err := NewLogsRequest(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + le, err := internal.NewLogsRequest(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) @@ -333,7 +272,7 @@ func TestLogsRequest_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - le, err := NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) + le, err := internal.NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForLogs(t, sr, set.TracerProvider.Tracer("test"), le, nil) @@ -361,7 +300,7 @@ func TestLogsRequest_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("my_error") - le, err := NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) + le, err := internal.NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForLogs(t, sr, set.TracerProvider.Tracer("test"), le, want) @@ -379,19 +318,6 @@ func TestLogs_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } -func TestLogsRequest_WithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdown)) - assert.NotNil(t, le) - assert.NoError(t, err) - - assert.NoError(t, le.Shutdown(context.Background())) - assert.True(t, shutdownCalled) -} - func TestLogs_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -403,18 +329,6 @@ func TestLogs_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, le.Shutdown(context.Background())) } -func TestLogsRequest_WithShutdown_ReturnError(t *testing.T) { - want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdownErr)) - assert.NotNil(t, le) - require.NoError(t, err) - - assert.Equal(t, want, le.Shutdown(context.Background())) -} - func newPushLogsDataModifiedDownstream(retError error) consumer.ConsumeLogsFunc { return func(_ context.Context, log plog.Logs) error { log.ResourceLogs().MoveAndAppendTo(plog.NewResourceLogsSlice()) diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index a0064f40cc2c..614dd1b0e2e0 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -4,10 +4,7 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" import ( - "context" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" ) // Request represents a single request that can be sent to an external endpoint. @@ -26,11 +23,11 @@ type RequestErrorHandler = request.ErrorHandler // RequestConverterFunc converts pdata telemetry into a user-defined Request. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type RequestConverterFunc[T any] func(context.Context, T) (Request, error) +type RequestConverterFunc[T any] = request.RequestConverterFunc[T] // RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, // and accessing it is considered undefined behavior. -type RequestConsumeFunc = sender.SendFunc[Request] +type RequestConsumeFunc = request.RequestConsumeFunc // RequestSizer is an interface that returns the size of the given request. type RequestSizer = request.Sizer[Request] From 46e785aa3f1087b662b36bc5ec27e0d689e2812c Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 9 Sep 2025 18:54:09 +0200 Subject: [PATCH 02/11] [exporterhelper] Move NewTracesRequest to exporterhelper/internal --- .../exporterhelper/internal/new_request.go | 54 +++++++++++ .../internal/new_request_test.go | 87 +++++++++++++++++ exporter/exporterhelper/traces.go | 56 +---------- exporter/exporterhelper/traces_test.go | 95 +------------------ 4 files changed, 146 insertions(+), 146 deletions(-) diff --git a/exporter/exporterhelper/internal/new_request.go b/exporter/exporterhelper/internal/new_request.go index ad58e35e1c3e..fcf51dc26218 100644 --- a/exporter/exporterhelper/internal/new_request.go +++ b/exporter/exporterhelper/internal/new_request.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" ) @@ -67,3 +68,56 @@ func newConsumeLogs(converter request.RequestConverterFunc[plog.Logs], be *BaseE return be.Send(ctx, req) } } + +type tracesExporter struct { + *BaseExporter + consumer.Traces +} + +// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewTracesRequest( + _ context.Context, + set exporter.Settings, + converter request.RequestConverterFunc[ptrace.Traces], + pusher request.RequestConsumeFunc, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilTracesConverter + } + + if pusher == nil { + return nil, errNilConsumeRequest + } + + be, err := NewBaseExporter(set, pipeline.SignalTraces, pusher, options...) + if err != nil { + return nil, err + } + + tc, err := consumer.NewTraces(newConsumeTraces(converter, be, set.Logger), be.ConsumerOptions...) + if err != nil { + return nil, err + } + + return &tracesExporter{BaseExporter: be, Traces: tc}, nil +} + +func newConsumeTraces(converter request.RequestConverterFunc[ptrace.Traces], be *BaseExporter, logger *zap.Logger) consumer.ConsumeTracesFunc { + return func(ctx context.Context, td ptrace.Traces) error { + req, err := converter(ctx, td) + if err != nil { + logger.Error("Failed to convert traces. Dropping data.", + zap.Int("dropped_spans", td.SpanCount()), + zap.Error(err)) + return consumererror.NewPermanent(err) + } + return be.Send(ctx, req) + } +} diff --git a/exporter/exporterhelper/internal/new_request_test.go b/exporter/exporterhelper/internal/new_request_test.go index bc599707c9f1..01365d5dcaac 100644 --- a/exporter/exporterhelper/internal/new_request_test.go +++ b/exporter/exporterhelper/internal/new_request_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/ptrace" ) func TestLogsRequest_NilLogger(t *testing.T) { @@ -106,3 +107,89 @@ func TestLogsRequest_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, le.Shutdown(context.Background())) } + +func TestTracesRequest_NilLogger(t *testing.T) { + te, err := NewTracesRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, te) + require.Equal(t, errNilLogger, err) +} + +func TestTracesRequest_NilTracesConverter(t *testing.T) { + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, te) + require.Equal(t, errNilTracesConverter, err) +} + +func TestTracesRequest_NilPushTraceData(t *testing.T) { + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromTracesFunc(nil), nil) + require.Nil(t, te) + require.Equal(t, errNilConsumeRequest, err) +} + +func TestTracesRequest_Default(t *testing.T) { + td := ptrace.NewTraces() + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + assert.NotNil(t, te) + require.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + +func TestTracesRequest_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithCapabilities(capabilities)) + assert.NotNil(t, te) + require.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) +} + +func TestTracesRequest_Default_ConvertError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("convert_error") + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(want), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesRequest_Default_ExportError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("export_error") + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[request.Request](want)) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesRequest_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdown)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestTracesRequest_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdownErr)) + assert.NotNil(t, te) + require.NoError(t, err) + + assert.Equal(t, want, te.Shutdown(context.Background())) +} diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 96e87d6ff321..40185c1f2024 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -20,7 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/xpdata/pref" pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" - "go.opentelemetry.io/collector/pipeline" ) var ( @@ -130,10 +127,6 @@ func (req *tracesRequest) BytesSize() int { return tracesMarshaler.TracesSize(req.td) } -type tracesExporter struct { - *internal.BaseExporter - consumer.Traces -} // NewTraces creates an exporter.Traces that records observability metrics and wraps every request with a Span. func NewTraces( @@ -149,7 +142,7 @@ func NewTraces( if pusher == nil { return nil, errNilPushTraces } - return NewTracesRequest(ctx, set, requestFromTraces(), requestConsumeFromTraces(pusher), + return internal.NewTracesRequest(ctx, set, requestFromTraces(), requestConsumeFromTraces(pusher), append([]Option{internal.WithQueueBatchSettings(NewTracesQueueBatchSettings())}, options...)...) } @@ -167,50 +160,3 @@ func requestFromTraces() RequestConverterFunc[ptrace.Traces] { } } -// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewTracesRequest( - _ context.Context, - set exporter.Settings, - converter RequestConverterFunc[ptrace.Traces], - pusher RequestConsumeFunc, - options ...Option, -) (exporter.Traces, error) { - if set.Logger == nil { - return nil, errNilLogger - } - - if converter == nil { - return nil, errNilTracesConverter - } - - if pusher == nil { - return nil, errNilConsumeRequest - } - - be, err := internal.NewBaseExporter(set, pipeline.SignalTraces, pusher, options...) - if err != nil { - return nil, err - } - - tc, err := consumer.NewTraces(newConsumeTraces(converter, be, set.Logger), be.ConsumerOptions...) - if err != nil { - return nil, err - } - - return &tracesExporter{BaseExporter: be, Traces: tc}, nil -} - -func newConsumeTraces(converter RequestConverterFunc[ptrace.Traces], be *internal.BaseExporter, logger *zap.Logger) consumer.ConsumeTracesFunc { - return func(ctx context.Context, td ptrace.Traces) error { - req, err := converter(ctx, td) - if err != nil { - logger.Error("Failed to convert traces. Dropping data.", - zap.Int("dropped_spans", td.SpanCount()), - zap.Error(err)) - return consumererror.NewPermanent(err) - } - return be.Send(ctx, req) - } -} diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 59e622dc9aeb..000ad1579a50 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -67,30 +67,12 @@ func TestTraces_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } -func TestTracesRequest_NilLogger(t *testing.T) { - te, err := NewTracesRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, te) - require.Equal(t, errNilLogger, err) -} - func TestTraces_NilPushTraceData(t *testing.T) { te, err := NewTraces(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeTracesConfig, nil) require.Nil(t, te) require.Equal(t, errNilPushTraces, err) } -func TestTracesRequest_NilTracesConverter(t *testing.T) { - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, te) - require.Equal(t, errNilTracesConverter, err) -} - -func TestTracesRequest_NilPushTraceData(t *testing.T) { - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromTracesFunc(nil), nil) - require.Nil(t, te) - require.Equal(t, errNilConsumeRequest, err) -} - func TestTraces_Default(t *testing.T) { td := ptrace.NewTraces() te, err := NewTraces(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeTracesConfig, newTraceDataPusher(nil)) @@ -103,19 +85,6 @@ func TestTraces_Default(t *testing.T) { assert.NoError(t, te.Shutdown(context.Background())) } -func TestTracesRequest_Default(t *testing.T) { - td := ptrace.NewTraces() - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) - assert.NotNil(t, te) - require.NoError(t, err) - - assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) - assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, te.ConsumeTraces(context.Background(), td)) - assert.NoError(t, te.Shutdown(context.Background())) -} - func TestTraces_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} te, err := NewTraces(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeTracesConfig, newTraceDataPusher(nil), WithCapabilities(capabilities)) @@ -125,16 +94,6 @@ func TestTraces_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, te.Capabilities()) } -func TestTracesRequest_WithCapabilities(t *testing.T) { - capabilities := consumer.Capabilities{MutatesData: true} - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request](), WithCapabilities(capabilities)) - assert.NotNil(t, te) - require.NoError(t, err) - - assert.Equal(t, capabilities, te.Capabilities()) -} - func TestTraces_Default_ReturnError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("my_error") @@ -146,25 +105,6 @@ func TestTraces_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } -func TestTracesRequest_Default_ConvertError(t *testing.T) { - td := ptrace.NewTraces() - want := errors.New("convert_error") - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(want), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - require.NotNil(t, te) - require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) -} - -func TestTracesRequest_Default_ExportError(t *testing.T) { - td := ptrace.NewTraces() - want := errors.New("export_error") - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) - require.NoError(t, err) - require.NotNil(t, te) - require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) -} func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead @@ -277,7 +217,7 @@ func TestTracesRequest_WithRecordMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesRequest(context.Background(), + te, err := internal.NewTracesRequest(context.Background(), exporter.Settings{ID: fakeTracesName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) @@ -303,7 +243,7 @@ func TestTracesRequest_WithRecordMetrics_RequestSenderError(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesRequest(context.Background(), + te, err := internal.NewTracesRequest(context.Background(), exporter.Settings{ID: fakeTracesName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) @@ -333,7 +273,7 @@ func TestTracesRequest_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - te, err := NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) + te, err := internal.NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, te) @@ -363,7 +303,7 @@ func TestTracesRequest_WithSpan_ExportError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("export_error") - te, err := NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) + te, err := internal.NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, te) @@ -383,20 +323,6 @@ func TestTraces_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } -func TestTracesRequest_WithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdown)) - assert.NotNil(t, te) - assert.NoError(t, err) - - assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, te.Shutdown(context.Background())) - assert.True(t, shutdownCalled) -} - func TestTraces_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -409,19 +335,6 @@ func TestTraces_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, te.Shutdown(context.Background())) } -func TestTracesRequest_WithShutdown_ReturnError(t *testing.T) { - want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdownErr)) - assert.NotNil(t, te) - assert.NoError(t, err) - - assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, want, te.Shutdown(context.Background())) -} - func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(context.Context, ptrace.Traces) error { return retError From 072cc53443de4b81d0f9eb780c013f313a68abfc Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Tue, 9 Sep 2025 19:00:41 +0200 Subject: [PATCH 03/11] [exporterhelper] Move NewMetricsRequest to exporterhelper/internal --- .../exporterhelper/internal/new_request.go | 54 +++++++++++ .../internal/new_request_test.go | 89 +++++++++++++++++ exporter/exporterhelper/metrics.go | 56 +---------- exporter/exporterhelper/metrics_test.go | 95 +------------------ 4 files changed, 148 insertions(+), 146 deletions(-) diff --git a/exporter/exporterhelper/internal/new_request.go b/exporter/exporterhelper/internal/new_request.go index fcf51dc26218..fade6dfa2803 100644 --- a/exporter/exporterhelper/internal/new_request.go +++ b/exporter/exporterhelper/internal/new_request.go @@ -11,6 +11,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" "go.uber.org/zap" @@ -121,3 +122,56 @@ func newConsumeTraces(converter request.RequestConverterFunc[ptrace.Traces], be return be.Send(ctx, req) } } + +type metricsExporter struct { + *BaseExporter + consumer.Metrics +} + +// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMetricsRequest( + _ context.Context, + set exporter.Settings, + converter request.RequestConverterFunc[pmetric.Metrics], + pusher request.RequestConsumeFunc, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilMetricsConverter + } + + if pusher == nil { + return nil, errNilConsumeRequest + } + + be, err := NewBaseExporter(set, pipeline.SignalMetrics, pusher, options...) + if err != nil { + return nil, err + } + + mc, err := consumer.NewMetrics(newConsumeMetrics(converter, be, set.Logger), be.ConsumerOptions...) + if err != nil { + return nil, err + } + + return &metricsExporter{BaseExporter: be, Metrics: mc}, nil +} + +func newConsumeMetrics(converter request.RequestConverterFunc[pmetric.Metrics], be *BaseExporter, logger *zap.Logger) consumer.ConsumeMetricsFunc { + return func(ctx context.Context, md pmetric.Metrics) error { + req, err := converter(ctx, md) + if err != nil { + logger.Error("Failed to convert metrics. Dropping data.", + zap.Int("dropped_data_points", md.DataPointCount()), + zap.Error(err)) + return consumererror.NewPermanent(err) + } + return be.Send(ctx, req) + } +} diff --git a/exporter/exporterhelper/internal/new_request_test.go b/exporter/exporterhelper/internal/new_request_test.go index 01365d5dcaac..5c0341b3b054 100644 --- a/exporter/exporterhelper/internal/new_request_test.go +++ b/exporter/exporterhelper/internal/new_request_test.go @@ -19,6 +19,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" "go.opentelemetry.io/collector/exporter/exportertest" "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" ) @@ -193,3 +194,91 @@ func TestTracesRequest_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, te.Shutdown(context.Background())) } + +func TestMetricsRequest_NilLogger(t *testing.T) { + me, err := NewMetricsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + +func TestMetricsRequest_NilMetricsConverter(t *testing.T) { + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, me) + require.Equal(t, errNilMetricsConverter, err) +} + +func TestMetricsRequest_NilPushMetricsData(t *testing.T) { + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromMetricsFunc(nil), nil) + require.Nil(t, me) + require.Equal(t, errNilConsumeRequest, err) +} + +func TestMetricsRequest_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + +func TestMetricsRequest_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithCapabilities(capabilities)) + require.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + +func TestMetricsRequest_Default_ConvertError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("convert_error") + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(want), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsRequest_Default_ExportError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("export_error") + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[request.Request](want)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsRequest_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdown)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestMetricsRequest_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdownErr)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) +} diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 280eae108d57..74caba88024f 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -20,7 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/xpdata/pref" pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" - "go.opentelemetry.io/collector/pipeline" ) var ( @@ -130,10 +127,6 @@ func (req *metricsRequest) BytesSize() int { return metricsMarshaler.MetricsSize(req.md) } -type metricsExporter struct { - *internal.BaseExporter - consumer.Metrics -} // NewMetrics creates an exporter.Metrics that records observability metrics and wraps every request with a Span. func NewMetrics( @@ -149,7 +142,7 @@ func NewMetrics( if pusher == nil { return nil, errNilPushMetrics } - return NewMetricsRequest(ctx, set, requestFromMetrics(), requestConsumeFromMetrics(pusher), + return internal.NewMetricsRequest(ctx, set, requestFromMetrics(), requestConsumeFromMetrics(pusher), append([]Option{internal.WithQueueBatchSettings(NewMetricsQueueBatchSettings())}, options...)...) } @@ -167,50 +160,3 @@ func requestFromMetrics() RequestConverterFunc[pmetric.Metrics] { } } -// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewMetricsRequest( - _ context.Context, - set exporter.Settings, - converter RequestConverterFunc[pmetric.Metrics], - pusher RequestConsumeFunc, - options ...Option, -) (exporter.Metrics, error) { - if set.Logger == nil { - return nil, errNilLogger - } - - if converter == nil { - return nil, errNilMetricsConverter - } - - if pusher == nil { - return nil, errNilConsumeRequest - } - - be, err := internal.NewBaseExporter(set, pipeline.SignalMetrics, pusher, options...) - if err != nil { - return nil, err - } - - mc, err := consumer.NewMetrics(newConsumeMetrics(converter, be, set.Logger), be.ConsumerOptions...) - if err != nil { - return nil, err - } - - return &metricsExporter{BaseExporter: be, Metrics: mc}, nil -} - -func newConsumeMetrics(converter RequestConverterFunc[pmetric.Metrics], be *internal.BaseExporter, logger *zap.Logger) consumer.ConsumeMetricsFunc { - return func(ctx context.Context, md pmetric.Metrics) error { - req, err := converter(ctx, md) - if err != nil { - logger.Error("Failed to convert metrics. Dropping data.", - zap.Int("dropped_data_points", md.DataPointCount()), - zap.Error(err)) - return consumererror.NewPermanent(err) - } - return be.Send(ctx, req) - } -} diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 81e98ad84b1e..f9bd80aecaf8 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -71,30 +71,12 @@ func TestMetrics_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } -func TestMetricsRequest_NilLogger(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, me) - require.Equal(t, errNilLogger, err) -} - func TestMetrics_NilPushMetricsData(t *testing.T) { me, err := NewMetrics(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeMetricsConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetrics, err) } -func TestMetricsRequest_NilMetricsConverter(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, me) - require.Equal(t, errNilMetricsConverter, err) -} - -func TestMetricsRequest_NilPushMetricsData(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromMetricsFunc(nil), nil) - require.Nil(t, me) - require.Equal(t, errNilConsumeRequest, err) -} - func TestMetrics_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetrics(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeMetricsConfig, newPushMetricsData(nil)) @@ -107,19 +89,6 @@ func TestMetrics_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } -func TestMetricsRequest_Default(t *testing.T) { - md := pmetric.NewMetrics() - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - assert.NotNil(t, me) - - assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) - assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) - assert.NoError(t, me.Shutdown(context.Background())) -} - func TestMetrics_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetrics(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeMetricsConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -129,16 +98,6 @@ func TestMetrics_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } -func TestMetricsRequest_WithCapabilities(t *testing.T) { - capabilities := consumer.Capabilities{MutatesData: true} - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithCapabilities(capabilities)) - require.NoError(t, err) - assert.NotNil(t, me) - - assert.Equal(t, capabilities, me.Capabilities()) -} - func TestMetrics_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -148,25 +107,6 @@ func TestMetrics_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } -func TestMetricsRequest_Default_ConvertError(t *testing.T) { - md := pmetric.NewMetrics() - want := errors.New("convert_error") - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(want), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - require.NotNil(t, me) - require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) -} - -func TestMetricsRequest_Default_ExportError(t *testing.T) { - md := pmetric.NewMetrics() - want := errors.New("export_error") - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) - require.NoError(t, err) - require.NotNil(t, me) - require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) -} func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead @@ -279,7 +219,7 @@ func TestMetricsRequest_WithRecordMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsRequest(context.Background(), + me, err := internal.NewMetricsRequest(context.Background(), exporter.Settings{ID: fakeMetricsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) @@ -305,7 +245,7 @@ func TestMetricsRequest_WithRecordMetrics_ExportError(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsRequest(context.Background(), + me, err := internal.NewMetricsRequest(context.Background(), exporter.Settings{ID: fakeMetricsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) @@ -334,7 +274,7 @@ func TestMetricsRequest_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - me, err := NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) + me, err := internal.NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetrics(t, sr, set.TracerProvider.Tracer("test"), me, nil) @@ -362,7 +302,7 @@ func TestMetricsRequest_WithSpan_ExportError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("my_error") - me, err := NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) + me, err := internal.NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetrics(t, sr, set.TracerProvider.Tracer("test"), me, want) @@ -381,20 +321,6 @@ func TestMetrics_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } -func TestMetricsRequest_WithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdown)) - assert.NotNil(t, me) - assert.NoError(t, err) - - assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, me.Shutdown(context.Background())) - assert.True(t, shutdownCalled) -} - func TestMetrics_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -407,19 +333,6 @@ func TestMetrics_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } -func TestMetricsRequest_WithShutdown_ReturnError(t *testing.T) { - want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdownErr)) - assert.NotNil(t, me) - assert.NoError(t, err) - - assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, want, me.Shutdown(context.Background())) -} - func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(_ context.Context, _ pmetric.Metrics) error { return retError From d1a80bdb961a067d59ab05a66c7e52a3f57c6377 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 10 Sep 2025 13:32:17 +0200 Subject: [PATCH 04/11] Move things to xexporterhelper and deprecate the ones in exporterhelper --- exporter/exporterhelper/logs.go | 12 +++++ exporter/exporterhelper/metrics.go | 14 ++++- exporter/exporterhelper/request.go | 2 + exporter/exporterhelper/traces.go | 14 ++++- .../xexporterhelper/new_request.go | 54 +++++++++++++++++++ .../xexporterhelper/profiles.go | 4 +- .../exporterhelper/xexporterhelper/request.go | 23 ++++++++ 7 files changed, 117 insertions(+), 6 deletions(-) create mode 100644 exporter/exporterhelper/xexporterhelper/new_request.go create mode 100644 exporter/exporterhelper/xexporterhelper/request.go diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index f6d67fe8eaf8..51d9f65ca5b9 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -146,6 +146,18 @@ func NewLogs( append([]Option{internal.WithQueueBatchSettings(NewLogsQueueBatchSettings())}, options...)...) } +// Deprecated [v0.136.0]: Use xexporterhelper.NewLogsRequest instead. +// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. +func NewLogsRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[plog.Logs], + pusher RequestConsumeFunc, + options ...Option, +) (exporter.Logs, error) { + return internal.NewLogsRequest(ctx, set, converter, pusher, options...) +} + // requestConsumeFromLogs returns a RequestConsumeFunc that consumes plog.Logs. func requestConsumeFromLogs(pusher consumer.ConsumeLogsFunc) RequestConsumeFunc { return func(ctx context.Context, request Request) error { diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 74caba88024f..b1651e82c66a 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -127,7 +127,6 @@ func (req *metricsRequest) BytesSize() int { return metricsMarshaler.MetricsSize(req.md) } - // NewMetrics creates an exporter.Metrics that records observability metrics and wraps every request with a Span. func NewMetrics( ctx context.Context, @@ -146,6 +145,18 @@ func NewMetrics( append([]Option{internal.WithQueueBatchSettings(NewMetricsQueueBatchSettings())}, options...)...) } +// Deprecated [v0.136.0]: Use xexporterhelper.NewMetricsRequest instead. +// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. +func NewMetricsRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[pmetric.Metrics], + pusher RequestConsumeFunc, + options ...Option, +) (exporter.Metrics, error) { + return internal.NewMetricsRequest(ctx, set, converter, pusher, options...) +} + // requestConsumeFromMetrics returns a RequestConsumeFunc that consumes pmetric.Metrics. func requestConsumeFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestConsumeFunc { return func(ctx context.Context, request Request) error { @@ -159,4 +170,3 @@ func requestFromMetrics() RequestConverterFunc[pmetric.Metrics] { return newMetricsRequest(md), nil } } - diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index 614dd1b0e2e0..f62fe295e117 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -20,11 +20,13 @@ type Request = request.Request // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestErrorHandler = request.ErrorHandler +// Deprecated: [v0.136.0] Use xexporterhelper.RequestConsumeFunc. // RequestConverterFunc converts pdata telemetry into a user-defined Request. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestConverterFunc[T any] = request.RequestConverterFunc[T] +// Deprecated: [v0.136.0] Use xexporterhelper.RequestConsumeFunc. // RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, // and accessing it is considered undefined behavior. type RequestConsumeFunc = request.RequestConsumeFunc diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 40185c1f2024..3e966fc110e9 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -127,7 +127,6 @@ func (req *tracesRequest) BytesSize() int { return tracesMarshaler.TracesSize(req.td) } - // NewTraces creates an exporter.Traces that records observability metrics and wraps every request with a Span. func NewTraces( ctx context.Context, @@ -146,6 +145,18 @@ func NewTraces( append([]Option{internal.WithQueueBatchSettings(NewTracesQueueBatchSettings())}, options...)...) } +// Deprecated [v0.136.0]: Use xexporterhelper.NewTracesRequest instead.\ +// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. +func NewTracesRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[ptrace.Traces], + pusher RequestConsumeFunc, + options ...Option, +) (exporter.Traces, error) { + return internal.NewTracesRequest(ctx, set, converter, pusher, options...) +} + // requestConsumeFromTraces returns a RequestConsumeFunc that consumes ptrace.Traces. func requestConsumeFromTraces(pusher consumer.ConsumeTracesFunc) RequestConsumeFunc { return func(ctx context.Context, request Request) error { @@ -159,4 +170,3 @@ func requestFromTraces() RequestConverterFunc[ptrace.Traces] { return newTracesRequest(traces), nil } } - diff --git a/exporter/exporterhelper/xexporterhelper/new_request.go b/exporter/exporterhelper/xexporterhelper/new_request.go new file mode 100644 index 000000000000..45e42d16784c --- /dev/null +++ b/exporter/exporterhelper/xexporterhelper/new_request.go @@ -0,0 +1,54 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xexporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewLogsRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[plog.Logs], + pusher RequestConsumeFunc, + options ...exporterhelper.Option, +) (exporter.Logs, error) { + return internal.NewLogsRequest(ctx, set, converter, pusher, options...) +} + +// NewMetricsRequest creates new metrics exporter based on custom MetricsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMetricsRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[pmetric.Metrics], + pusher RequestConsumeFunc, + options ...exporterhelper.Option, +) (exporter.Metrics, error) { + return internal.NewMetricsRequest(ctx, set, converter, pusher, options...) +} + +// NewTracesRequest creates new traces exporter based on custom TracesConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewTracesRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[ptrace.Traces], + pusher RequestConsumeFunc, + options ...exporterhelper.Option, +) (exporter.Traces, error) { + return internal.NewTracesRequest(ctx, set, converter, pusher, options...) +} diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index 31a891e8491f..1801e45908bb 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -176,8 +176,8 @@ func requestFromProfiles() exporterhelper.RequestConverterFunc[pprofile.Profiles func NewProfilesRequest( _ context.Context, set exporter.Settings, - converter exporterhelper.RequestConverterFunc[pprofile.Profiles], - pusher exporterhelper.RequestConsumeFunc, + converter RequestConverterFunc[pprofile.Profiles], + pusher RequestConsumeFunc, options ...exporterhelper.Option, ) (xexporter.Profiles, error) { if set.Logger == nil { diff --git a/exporter/exporterhelper/xexporterhelper/request.go b/exporter/exporterhelper/xexporterhelper/request.go new file mode 100644 index 000000000000..c1add2010ec2 --- /dev/null +++ b/exporter/exporterhelper/xexporterhelper/request.go @@ -0,0 +1,23 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xexporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" + +import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" // Request represents a single request that can be sent to an external endpoint. + +// RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial +// temporary failures. For example, if some items failed to process and can be retried, this interface allows to +// return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. +// If not implemented, the original Request will be returned assuming the error is applied to the whole Request. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestErrorHandler = request.ErrorHandler + +// RequestConverterFunc converts pdata telemetry into a user-defined Request. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestConverterFunc[T any] = request.RequestConverterFunc[T] + +// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, +// and accessing it is considered undefined behavior. +type RequestConsumeFunc = request.RequestConsumeFunc From 06cd6ecae37e4b72cf9cea9d855576087f624591 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 10 Sep 2025 13:37:19 +0200 Subject: [PATCH 05/11] Move more experimental stuff to xexporterhelper --- exporter/exporterhelper/queue_batch.go | 1 + exporter/exporterhelper/request.go | 1 + exporter/exporterhelper/xexporterhelper/new_request.go | 8 ++++++++ 3 files changed, 10 insertions(+) diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index cdb0261f19ef..638d44bca834 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -40,6 +40,7 @@ var ErrQueueIsFull = queue.ErrQueueIsFull // They include things line Encoding to be used with persistent queue, or the available Sizers, etc. type QueueBatchSettings = queuebatch.Settings[Request] +// Deprecated: [v0.136.0] Use xexporterhelper.WithQueueBatch. // WithQueueBatch enables queueing and batching for an exporter. // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index f62fe295e117..6570f5766032 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -12,6 +12,7 @@ import ( // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type Request = request.Request +// Deprecated: [v0.136.0] Use xexporterhelper.RequestErroHandler // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to // return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. diff --git a/exporter/exporterhelper/xexporterhelper/new_request.go b/exporter/exporterhelper/xexporterhelper/new_request.go index 45e42d16784c..b55506a3f50f 100644 --- a/exporter/exporterhelper/xexporterhelper/new_request.go +++ b/exporter/exporterhelper/xexporterhelper/new_request.go @@ -52,3 +52,11 @@ func NewTracesRequest( ) (exporter.Traces, error) { return internal.NewTracesRequest(ctx, set, converter, pusher, options...) } + +// WithQueueBatch enables queueing and batching for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithQueueBatch(cfg exporterhelper.QueueBatchConfig, set exporterhelper.QueueBatchSettings) exporterhelper.Option { + return internal.WithQueueBatch(cfg, set) +} From 537589d3c7460abd2ec71ca1410433cb294b0623 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 10 Sep 2025 13:39:36 +0200 Subject: [PATCH 06/11] Add changelog note --- ...ove-exporterhelper-experimental-stuff.yaml | 25 +++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 .chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml diff --git a/.chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml b/.chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml new file mode 100644 index 000000000000..cb31d47cb60c --- /dev/null +++ b/.chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate all experimental symbols in exporterhelper and move them to xexporterhelper + +# One or more tracking issues or pull requests related to the change +issues: [11143] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] From 7c5f5c2ebe220b57d06883af0334ae2952ee1792 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 10 Sep 2025 13:45:44 +0200 Subject: [PATCH 07/11] make gotidy --- exporter/exporterhelper/xexporterhelper/go.mod | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index c83025223fad..9de249a17b03 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/collector/exporter/exporterhelper v0.135.0 go.opentelemetry.io/collector/exporter/exportertest v0.135.0 go.opentelemetry.io/collector/exporter/xexporter v0.135.0 + go.opentelemetry.io/collector/pdata v1.41.0 go.opentelemetry.io/collector/pdata/pprofile v0.135.0 go.opentelemetry.io/collector/pdata/testdata v0.135.0 go.opentelemetry.io/collector/pdata/xpdata v0.135.0 @@ -54,7 +55,6 @@ require ( go.opentelemetry.io/collector/extension/xextension v0.135.0 // indirect go.opentelemetry.io/collector/featuregate v1.41.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.135.0 // indirect - go.opentelemetry.io/collector/pdata v1.41.0 // indirect go.opentelemetry.io/collector/pipeline v1.41.0 // indirect go.opentelemetry.io/collector/receiver v1.41.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.135.0 // indirect From cd368c5b0e0c62358a540510df44e7a72697639a Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Wed, 10 Sep 2025 14:35:54 +0200 Subject: [PATCH 08/11] Fix linter errors --- exporter/exporterhelper/constants.go | 8 -------- exporter/exporterhelper/internal/constants.go | 8 -------- exporter/exporterhelper/internal/new_request.go | 3 ++- exporter/exporterhelper/internal/new_request_test.go | 1 + exporter/exporterhelper/logs.go | 2 +- exporter/exporterhelper/metrics.go | 2 +- exporter/exporterhelper/metrics_test.go | 1 - exporter/exporterhelper/traces.go | 2 +- exporter/exporterhelper/traces_test.go | 1 - exporter/exporterhelper/xexporterhelper/profiles.go | 6 +++--- exporter/exporterhelper/xexporterhelper/profiles_test.go | 2 +- 11 files changed, 10 insertions(+), 26 deletions(-) diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index 120f1aaeb066..afad5d9f7e2d 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -12,18 +12,10 @@ var ( errNilConfig = errors.New("nil config") // errNilLogger is returned when a logger is nil errNilLogger = errors.New("nil logger") - // errNilConsumeRequest is returned when a nil PushTraces is given. - errNilConsumeRequest = errors.New("nil RequestConsumeFunc") // errNilPushTraces is returned when a nil PushTraces is given. errNilPushTraces = errors.New("nil PushTraces") // errNilPushMetrics is returned when a nil PushMetrics is given. errNilPushMetrics = errors.New("nil PushMetrics") // errNilPushLogs is returned when a nil PushLogs is given. errNilPushLogs = errors.New("nil PushLogs") - // errNilTracesConverter is returned when a nil RequestFromTracesFunc is given. - errNilTracesConverter = errors.New("nil RequestFromTracesFunc") - // errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given. - errNilMetricsConverter = errors.New("nil RequestFromMetricsFunc") - // errNilLogsConverter is returned when a nil RequestFromLogsFunc is given. - errNilLogsConverter = errors.New("nil RequestFromLogsFunc") ) diff --git a/exporter/exporterhelper/internal/constants.go b/exporter/exporterhelper/internal/constants.go index 763c2dc5547b..178495a5bf9b 100644 --- a/exporter/exporterhelper/internal/constants.go +++ b/exporter/exporterhelper/internal/constants.go @@ -6,18 +6,10 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import "errors" var ( - // errNilConfig is returned when an empty name is given. - errNilConfig = errors.New("nil config") // errNilLogger is returned when a logger is nil errNilLogger = errors.New("nil logger") // errNilConsumeRequest is returned when a nil PushTraces is given. errNilConsumeRequest = errors.New("nil RequestConsumeFunc") - // errNilPushTraces is returned when a nil PushTraces is given. - errNilPushTraces = errors.New("nil PushTraces") - // errNilPushMetrics is returned when a nil PushMetrics is given. - errNilPushMetrics = errors.New("nil PushMetrics") - // errNilPushLogs is returned when a nil PushLogs is given. - errNilPushLogs = errors.New("nil PushLogs") // errNilTracesConverter is returned when a nil RequestFromTracesFunc is given. errNilTracesConverter = errors.New("nil RequestFromTracesFunc") // errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given. diff --git a/exporter/exporterhelper/internal/new_request.go b/exporter/exporterhelper/internal/new_request.go index fade6dfa2803..ebafcb3ed618 100644 --- a/exporter/exporterhelper/internal/new_request.go +++ b/exporter/exporterhelper/internal/new_request.go @@ -6,6 +6,8 @@ package internal // import "go.opentelemetry.io/collector/exporter/exporterhelpe import ( "context" + "go.uber.org/zap" + "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/exporter" @@ -14,7 +16,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pipeline" - "go.uber.org/zap" ) type logsExporter struct { diff --git a/exporter/exporterhelper/internal/new_request_test.go b/exporter/exporterhelper/internal/new_request_test.go index 5c0341b3b054..7245b8f7cb1d 100644 --- a/exporter/exporterhelper/internal/new_request_test.go +++ b/exporter/exporterhelper/internal/new_request_test.go @@ -10,6 +10,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component/componenttest" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index 51d9f65ca5b9..23eec4a9bc17 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -146,8 +146,8 @@ func NewLogs( append([]Option{internal.WithQueueBatchSettings(NewLogsQueueBatchSettings())}, options...)...) } -// Deprecated [v0.136.0]: Use xexporterhelper.NewLogsRequest instead. // NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. +// Deprecated [v0.136.0]: Use xexporterhelper.NewLogsRequest instead. func NewLogsRequest( ctx context.Context, set exporter.Settings, diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index b1651e82c66a..da22df460a34 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -145,8 +145,8 @@ func NewMetrics( append([]Option{internal.WithQueueBatchSettings(NewMetricsQueueBatchSettings())}, options...)...) } -// Deprecated [v0.136.0]: Use xexporterhelper.NewMetricsRequest instead. // NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. +// Deprecated [v0.136.0]: Use xexporterhelper.NewMetricsRequest instead. func NewMetricsRequest( ctx context.Context, set exporter.Settings, diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index f9bd80aecaf8..6466e30ae5f1 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -107,7 +107,6 @@ func TestMetrics_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } - func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 3e966fc110e9..627f3c7f08fc 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -145,8 +145,8 @@ func NewTraces( append([]Option{internal.WithQueueBatchSettings(NewTracesQueueBatchSettings())}, options...)...) } -// Deprecated [v0.136.0]: Use xexporterhelper.NewTracesRequest instead.\ // NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. +// Deprecated [v0.136.0]: Use xexporterhelper.NewTracesRequest instead.\ func NewTracesRequest( ctx context.Context, set exporter.Settings, diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 000ad1579a50..b3417dd57e2b 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -105,7 +105,6 @@ func TestTraces_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } - func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index 1801e45908bb..893303cd30ce 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -157,14 +157,14 @@ func NewProfiles( } // requestConsumeFromProfiles returns a RequestConsumeFunc that consumes pprofile.Profiles. -func requestConsumeFromProfiles(pusher xconsumer.ConsumeProfilesFunc) exporterhelper.RequestConsumeFunc { +func requestConsumeFromProfiles(pusher xconsumer.ConsumeProfilesFunc) RequestConsumeFunc { return func(ctx context.Context, request exporterhelper.Request) error { return pusher.ConsumeProfiles(ctx, request.(*profilesRequest).pd) } } // requestFromProfiles returns a RequestFromProfilesFunc that converts pprofile.Profiles into a Request. -func requestFromProfiles() exporterhelper.RequestConverterFunc[pprofile.Profiles] { +func requestFromProfiles() RequestConverterFunc[pprofile.Profiles] { return func(_ context.Context, profiles pprofile.Profiles) (exporterhelper.Request, error) { return newProfilesRequest(profiles), nil } @@ -205,7 +205,7 @@ func NewProfilesRequest( return &profileExporter{BaseExporter: be, Profiles: tc}, nil } -func newConsumeProfiles(converter exporterhelper.RequestConverterFunc[pprofile.Profiles], be *internal.BaseExporter, logger *zap.Logger) xconsumer.ConsumeProfilesFunc { +func newConsumeProfiles(converter RequestConverterFunc[pprofile.Profiles], be *internal.BaseExporter, logger *zap.Logger) xconsumer.ConsumeProfilesFunc { return func(ctx context.Context, pd pprofile.Profiles) error { req, err := converter(ctx, pd) if err != nil { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index 687346703373..827ce3b17f67 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -53,7 +53,7 @@ func TestProfilesRequest(t *testing.T) { assert.Equal( t, newProfilesRequest(pprofile.NewProfiles()), - lr.(exporterhelper.RequestErrorHandler).OnError(profileErr), + lr.(RequestErrorHandler).OnError(profileErr), ) } From fb326b346ba2e8a8385397508de722eda0108979 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 11 Sep 2025 14:32:18 +0200 Subject: [PATCH 09/11] Deprecate more related symbols --- exporter/exporterhelper/queue_batch.go | 3 ++- exporter/exporterhelper/request.go | 2 ++ exporter/exporterhelper/traces.go | 2 +- exporter/exporterhelper/xexporterhelper/new_request.go | 7 ++++++- exporter/exporterhelper/xexporterhelper/request.go | 8 ++++++++ 5 files changed, 19 insertions(+), 3 deletions(-) diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index 638d44bca834..d4ba21f9fcde 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -36,11 +36,12 @@ type QueueBatchEncoding[T any] interface { var ErrQueueIsFull = queue.ErrQueueIsFull +// Deprecated: [v0.136.0] Use xexporterhelper.WithQueueBatch. // QueueBatchSettings are settings for the QueueBatch component. // They include things line Encoding to be used with persistent queue, or the available Sizers, etc. type QueueBatchSettings = queuebatch.Settings[Request] -// Deprecated: [v0.136.0] Use xexporterhelper.WithQueueBatch. +// Deprecated: [v0.136.0] Use xexporterhelper.WithQueueBatch or WithQueue. // WithQueueBatch enables queueing and batching for an exporter. // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index 2d7ac7ec4de1..5f3cfa75bbc7 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -7,6 +7,7 @@ import ( "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" ) +// Deprecated: [v0.136.0] Use xexporterhelper.Request. // Request represents a single request that can be sent to an external endpoint. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. @@ -32,6 +33,7 @@ type RequestConverterFunc[T any] = request.RequestConverterFunc[T] // and accessing it is considered undefined behavior. type RequestConsumeFunc = request.RequestConsumeFunc +// Deprecated: [v0.136.0] Use xexporterhelper.RequestSizer. // RequestSizer is an interface that returns the size of the given request. type RequestSizer = request.Sizer[Request] diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 627f3c7f08fc..4900511be698 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -146,7 +146,7 @@ func NewTraces( } // NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. -// Deprecated [v0.136.0]: Use xexporterhelper.NewTracesRequest instead.\ +// Deprecated [v0.136.0]: Use xexporterhelper.NewTracesRequest instead. func NewTracesRequest( ctx context.Context, set exporter.Settings, diff --git a/exporter/exporterhelper/xexporterhelper/new_request.go b/exporter/exporterhelper/xexporterhelper/new_request.go index b55506a3f50f..ce9596caff4b 100644 --- a/exporter/exporterhelper/xexporterhelper/new_request.go +++ b/exporter/exporterhelper/xexporterhelper/new_request.go @@ -9,6 +9,7 @@ import ( "go.opentelemetry.io/collector/exporter" "go.opentelemetry.io/collector/exporter/exporterhelper" "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/ptrace" @@ -53,10 +54,14 @@ func NewTracesRequest( return internal.NewTracesRequest(ctx, set, converter, pusher, options...) } +// QueueBatchSettings are settings for the QueueBatch component. +// They include things line Encoding to be used with persistent queue, or the available Sizers, etc. +type QueueBatchSettings = queuebatch.Settings[Request] + // WithQueueBatch enables queueing and batching for an exporter. // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func WithQueueBatch(cfg exporterhelper.QueueBatchConfig, set exporterhelper.QueueBatchSettings) exporterhelper.Option { +func WithQueueBatch(cfg exporterhelper.QueueBatchConfig, set QueueBatchSettings) exporterhelper.Option { return internal.WithQueueBatch(cfg, set) } diff --git a/exporter/exporterhelper/xexporterhelper/request.go b/exporter/exporterhelper/xexporterhelper/request.go index c1add2010ec2..9fbf3e8e847b 100644 --- a/exporter/exporterhelper/xexporterhelper/request.go +++ b/exporter/exporterhelper/xexporterhelper/request.go @@ -5,6 +5,11 @@ package xexporterhelper // import "go.opentelemetry.io/collector/exporter/export import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" // Request represents a single request that can be sent to an external endpoint. +// Request represents a single request that can be sent to an external endpoint. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Request = request.Request + // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to // return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. @@ -21,3 +26,6 @@ type RequestConverterFunc[T any] = request.RequestConverterFunc[T] // RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, // and accessing it is considered undefined behavior. type RequestConsumeFunc = request.RequestConsumeFunc + +// RequestSizer is an interface that returns the size of the given request. +type RequestSizer = request.Sizer[Request] From 805282292bd0b28c24d730132a78aa3f808e3a95 Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 11 Sep 2025 15:01:06 +0200 Subject: [PATCH 10/11] rg '\bexporterhelper.Request\b' -l | xargs sd '\bexporterhelper.Request\b' 'Request' --- .../xexporterhelper/profiles.go | 8 +-- .../xexporterhelper/profiles_batch.go | 10 ++-- .../xexporterhelper/profiles_batch_test.go | 60 +++++++++---------- .../xexporterhelper/profiles_test.go | 24 ++++---- 4 files changed, 51 insertions(+), 51 deletions(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index 893303cd30ce..dae5bcb7732c 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -57,7 +57,7 @@ type profilesRequest struct { cachedSize int } -func newProfilesRequest(pd pprofile.Profiles) exporterhelper.Request { +func newProfilesRequest(pd pprofile.Profiles) Request { return &profilesRequest{ pd: pd, cachedSize: -1, @@ -105,7 +105,7 @@ func (profilesReferenceCounter) Unref(req request.Request) { pref.UnrefProfiles(req.(*profilesRequest).pd) } -func (req *profilesRequest) OnError(err error) exporterhelper.Request { +func (req *profilesRequest) OnError(err error) Request { var profileError xconsumererror.Profiles if errors.As(err, &profileError) { // TODO: Add logic to unref the new request created here. @@ -158,14 +158,14 @@ func NewProfiles( // requestConsumeFromProfiles returns a RequestConsumeFunc that consumes pprofile.Profiles. func requestConsumeFromProfiles(pusher xconsumer.ConsumeProfilesFunc) RequestConsumeFunc { - return func(ctx context.Context, request exporterhelper.Request) error { + return func(ctx context.Context, request Request) error { return pusher.ConsumeProfiles(ctx, request.(*profilesRequest).pd) } } // requestFromProfiles returns a RequestFromProfilesFunc that converts pprofile.Profiles into a Request. func requestFromProfiles() RequestConverterFunc[pprofile.Profiles] { - return func(_ context.Context, profiles pprofile.Profiles) (exporterhelper.Request, error) { + return func(_ context.Context, profiles pprofile.Profiles) (Request, error) { return newProfilesRequest(profiles), nil } } diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index 829fca166185..3ee8d7a5c90a 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -17,7 +17,7 @@ import ( // // Following the OTLP 1.7.0 upgrade, this is currently a noop. // See https://github.com/open-telemetry/opentelemetry-collector/issues/13106 -func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt exporterhelper.RequestSizerType, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { +func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt exporterhelper.RequestSizerType, r2 Request) ([]Request, error) { var sz sizer.ProfilesSizer switch szt { case exporterhelper.RequestSizerTypeItems: @@ -38,7 +38,7 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor // If no limit we can simply merge the new request into the current and return. if maxSize == 0 { - return []exporterhelper.Request{req, req2}, nil + return []Request{req, req2}, nil } sp1, err1 := req.split(maxSize, sz) @@ -49,7 +49,7 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor // If no limit we can simply merge the new request into the current and return. if maxSize == 0 { - return []exporterhelper.Request{req}, nil + return []Request{req}, nil } return req.split(maxSize, sz) } @@ -63,8 +63,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles()) }*/ -func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]exporterhelper.Request, error) { - var res []exporterhelper.Request +func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]Request, error) { + var res []Request for req.size(sz) > maxSize { pd, rmSize := extractProfiles(req.pd, maxSize, sz) if pd.SampleCount() == 0 { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 877d1370f642..a518bf350c17 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -40,9 +40,9 @@ func TestMergeSplitProfiles(t *testing.T) { name string szt exporterhelper.RequestSizerType maxSize int - pr1 exporterhelper.Request - pr2 exporterhelper.Request - expected []exporterhelper.Request + pr1 Request + pr2 Request + expected []Request }{ { name: "both_requests_empty", @@ -50,7 +50,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(pprofile.NewProfiles()), - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "first_request_empty", @@ -58,7 +58,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, }, { name: "first_empty_second_nil", @@ -66,7 +66,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: nil, - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "merge_only", @@ -74,7 +74,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), - expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { + expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles @@ -86,7 +86,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 4, pr1: newProfilesRequest(testdata.GenerateProfiles(10)), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(2)), @@ -98,7 +98,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(8)), pr2: newProfilesRequest(testdata.GenerateProfiles(20)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(8) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) @@ -116,7 +116,7 @@ func TestMergeSplitProfiles(t *testing.T) { return testdata.GenerateProfiles(6) }()), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(2) @@ -143,9 +143,9 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { name string szt exporterhelper.RequestSizerType maxSize int - pr1 exporterhelper.Request - pr2 exporterhelper.Request - expected []exporterhelper.Request + pr1 Request + pr2 Request + expected []Request }{ { name: "both_requests_empty", @@ -153,7 +153,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(pprofile.NewProfiles()), - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "first_request_empty", @@ -161,7 +161,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, }, { name: "first_empty_second_nil", @@ -169,7 +169,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: nil, - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "merge_only", @@ -177,7 +177,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), - expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { + expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles @@ -189,7 +189,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 4, pr1: newProfilesRequest(testdata.GenerateProfiles(10)), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(2)), @@ -201,7 +201,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(8)), pr2: newProfilesRequest(testdata.GenerateProfiles(20)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(8) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) @@ -219,7 +219,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { return testdata.GenerateProfiles(6) }()), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(2) @@ -232,7 +232,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(pprofile.NewProfiles()), - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "first_request_empty", @@ -240,7 +240,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, }, { name: "first_empty_second_nil", @@ -248,7 +248,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: nil, - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "merge_only", @@ -256,7 +256,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(11)), pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), - expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { + expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles @@ -268,7 +268,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(4)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(10)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(2)), @@ -280,7 +280,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(testdata.GenerateProfiles(8)), pr2: newProfilesRequest(testdata.GenerateProfiles(20)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(7) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) @@ -316,7 +316,7 @@ func TestMergeSplitManySmallLogs(t *testing.T) { t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") // All requests merge into a single batch. - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(1))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(1))} for j := 0; j < 1000; j++ { lr2 := newProfilesRequest(testdata.GenerateProfiles(10)) res, _ := merged[len(merged)-1].MergeSplit(context.Background(), 10000, exporterhelper.RequestSizerTypeItems, lr2) @@ -329,7 +329,7 @@ func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) { // All requests merge into a single batch. b.ReportAllocs() for i := 0; i < b.N; i++ { - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(10))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(10))} for j := 0; j < 1000; j++ { pr2 := newProfilesRequest(testdata.GenerateProfiles(10)) res, _ := merged[len(merged)-1].MergeSplit( @@ -348,7 +348,7 @@ func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing. // Every incoming request results in a split. b.ReportAllocs() for i := 0; i < b.N; i++ { - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(0))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(0))} for j := 0; j < 10; j++ { pr2 := newProfilesRequest(testdata.GenerateProfiles(10001)) res, _ := merged[len(merged)-1].MergeSplit( @@ -368,7 +368,7 @@ func BenchmarkSplittingBasedOnByteSizeHugeProfiles(b *testing.B) { // One request splits into many batches. b.ReportAllocs() for i := 0; i < b.N; i++ { - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(0))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(0))} pr2 := newProfilesRequest(testdata.GenerateProfiles(100000)) res, _ := merged[len(merged)-1].MergeSplit( context.Background(), diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index 827ce3b17f67..af7c79c1c4e2 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -70,7 +70,7 @@ func TestProfilesExporter_NilLogger(t *testing.T) { } func TestProfilesRequestExporter_NilLogger(t *testing.T) { - le, err := NewProfilesRequest(context.Background(), exporter.Settings{}, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + le, err := NewProfilesRequest(context.Background(), exporter.Settings{}, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.Nil(t, le) require.Equal(t, errNilLogger, err) } @@ -82,7 +82,7 @@ func TestProfilesExporter_NilPushProfilesData(t *testing.T) { } func TestProfilesExporter_NilProfilesConverter(t *testing.T) { - te, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[exporterhelper.Request]()) + te, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) require.Nil(t, te) require.Equal(t, errNilProfilesConverter, err) } @@ -108,7 +108,7 @@ func TestProfilesExporter_Default(t *testing.T) { func TestProfilesRequestExporter_Default(t *testing.T) { ld := pprofile.NewProfiles() le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request]()) assert.NotNil(t, le) require.NoError(t, err) @@ -130,7 +130,7 @@ func TestProfilesExporter_WithCapabilities(t *testing.T) { func TestProfilesRequestExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request](), exporterhelper.WithCapabilities(capabilities)) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request](), exporterhelper.WithCapabilities(capabilities)) require.NoError(t, err) require.NotNil(t, le) @@ -150,7 +150,7 @@ func TestProfilesRequestExporter_Default_ConvertError(t *testing.T) { ld := pprofile.NewProfiles() want := errors.New("convert_error") le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(want), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + requestFromProfilesFunc(want), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, le) require.Equal(t, consumererror.NewPermanent(want), le.ConsumeProfiles(context.Background(), ld)) @@ -160,7 +160,7 @@ func TestProfilesRequestExporter_Default_ExportError(t *testing.T) { ld := pprofile.NewProfiles() want := errors.New("export_error") le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[exporterhelper.Request](want)) + requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) require.Equal(t, want, le.ConsumeProfiles(context.Background(), ld)) @@ -260,7 +260,7 @@ func TestProfilesRequestExporter_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil) @@ -288,7 +288,7 @@ func TestProfilesRequestExporter_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("my_error") - le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[exporterhelper.Request](want)) + le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, want) @@ -311,7 +311,7 @@ func TestProfilesRequestExporter_WithShutdown(t *testing.T) { shutdown := func(context.Context) error { shutdownCalled = true; return nil } le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request](), exporterhelper.WithShutdown(shutdown)) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request](), exporterhelper.WithShutdown(shutdown)) assert.NotNil(t, le) require.NoError(t, err) @@ -335,7 +335,7 @@ func TestProfilesRequestExporter_WithShutdown_ReturnError(t *testing.T) { shutdownErr := func(context.Context) error { return want } le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request](), exporterhelper.WithShutdown(shutdownErr)) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request](), exporterhelper.WithShutdown(shutdownErr)) assert.NotNil(t, le) require.NoError(t, err) @@ -382,8 +382,8 @@ func checkWrapSpanForProfilesExporter(t *testing.T, sr *tracetest.SpanRecorder, } } -func requestFromProfilesFunc(err error) func(context.Context, pprofile.Profiles) (exporterhelper.Request, error) { - return func(_ context.Context, pd pprofile.Profiles) (exporterhelper.Request, error) { +func requestFromProfilesFunc(err error) func(context.Context, pprofile.Profiles) (Request, error) { + return func(_ context.Context, pd pprofile.Profiles) (Request, error) { return &requesttest.FakeRequest{Items: pd.SampleCount()}, err } } From 321a3bf13560addd320cfb0f4372120ab625abeb Mon Sep 17 00:00:00 2001 From: Pablo Baeyens Date: Thu, 11 Sep 2025 15:13:31 +0200 Subject: [PATCH 11/11] Address linter --- exporter/exporterhelper/xexporterhelper/profiles.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index dae5bcb7732c..d3546f43cb35 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -34,8 +34,8 @@ var ( // NewProfilesQueueBatchSettings returns a new QueueBatchSettings to configure to WithQueueBatch when using pprofile.Profiles. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewProfilesQueueBatchSettings() exporterhelper.QueueBatchSettings { - return exporterhelper.QueueBatchSettings{ +func NewProfilesQueueBatchSettings() QueueBatchSettings { + return QueueBatchSettings{ ReferenceCounter: profilesReferenceCounter{}, Encoding: profilesEncoding{}, ItemsSizer: request.NewItemsSizer(),