diff --git a/.chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml b/.chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml new file mode 100644 index 000000000000..cb31d47cb60c --- /dev/null +++ b/.chloggen/mx-psi_move-exporterhelper-experimental-stuff.yaml @@ -0,0 +1,25 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: deprecation + +# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver) +component: exporterhelper + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Deprecate all experimental symbols in exporterhelper and move them to xexporterhelper + +# One or more tracking issues or pull requests related to the change +issues: [11143] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/exporter/exporterhelper/constants.go b/exporter/exporterhelper/constants.go index 120f1aaeb066..afad5d9f7e2d 100644 --- a/exporter/exporterhelper/constants.go +++ b/exporter/exporterhelper/constants.go @@ -12,18 +12,10 @@ var ( errNilConfig = errors.New("nil config") // errNilLogger is returned when a logger is nil errNilLogger = errors.New("nil logger") - // errNilConsumeRequest is returned when a nil PushTraces is given. - errNilConsumeRequest = errors.New("nil RequestConsumeFunc") // errNilPushTraces is returned when a nil PushTraces is given. errNilPushTraces = errors.New("nil PushTraces") // errNilPushMetrics is returned when a nil PushMetrics is given. errNilPushMetrics = errors.New("nil PushMetrics") // errNilPushLogs is returned when a nil PushLogs is given. errNilPushLogs = errors.New("nil PushLogs") - // errNilTracesConverter is returned when a nil RequestFromTracesFunc is given. - errNilTracesConverter = errors.New("nil RequestFromTracesFunc") - // errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given. - errNilMetricsConverter = errors.New("nil RequestFromMetricsFunc") - // errNilLogsConverter is returned when a nil RequestFromLogsFunc is given. - errNilLogsConverter = errors.New("nil RequestFromLogsFunc") ) diff --git a/exporter/exporterhelper/internal/constants.go b/exporter/exporterhelper/internal/constants.go new file mode 100644 index 000000000000..178495a5bf9b --- /dev/null +++ b/exporter/exporterhelper/internal/constants.go @@ -0,0 +1,19 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import "errors" + +var ( + // errNilLogger is returned when a logger is nil + errNilLogger = errors.New("nil logger") + // errNilConsumeRequest is returned when a nil PushTraces is given. + errNilConsumeRequest = errors.New("nil RequestConsumeFunc") + // errNilTracesConverter is returned when a nil RequestFromTracesFunc is given. + errNilTracesConverter = errors.New("nil RequestFromTracesFunc") + // errNilMetricsConverter is returned when a nil RequestFromMetricsFunc is given. + errNilMetricsConverter = errors.New("nil RequestFromMetricsFunc") + // errNilLogsConverter is returned when a nil RequestFromLogsFunc is given. + errNilLogsConverter = errors.New("nil RequestFromLogsFunc") +) diff --git a/exporter/exporterhelper/internal/new_request.go b/exporter/exporterhelper/internal/new_request.go new file mode 100644 index 000000000000..ebafcb3ed618 --- /dev/null +++ b/exporter/exporterhelper/internal/new_request.go @@ -0,0 +1,178 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import ( + "context" + + "go.uber.org/zap" + + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" + "go.opentelemetry.io/collector/pipeline" +) + +type logsExporter struct { + *BaseExporter + consumer.Logs +} + +// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewLogsRequest( + _ context.Context, + set exporter.Settings, + converter request.RequestConverterFunc[plog.Logs], + pusher request.RequestConsumeFunc, + options ...Option, +) (exporter.Logs, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilLogsConverter + } + + if pusher == nil { + return nil, errNilConsumeRequest + } + + be, err := NewBaseExporter(set, pipeline.SignalLogs, pusher, options...) + if err != nil { + return nil, err + } + + lc, err := consumer.NewLogs(newConsumeLogs(converter, be, set.Logger), be.ConsumerOptions...) + if err != nil { + return nil, err + } + + return &logsExporter{BaseExporter: be, Logs: lc}, nil +} + +func newConsumeLogs(converter request.RequestConverterFunc[plog.Logs], be *BaseExporter, logger *zap.Logger) consumer.ConsumeLogsFunc { + return func(ctx context.Context, ld plog.Logs) error { + req, err := converter(ctx, ld) + if err != nil { + logger.Error("Failed to convert logs. Dropping data.", + zap.Int("dropped_log_records", ld.LogRecordCount()), + zap.Error(err)) + return consumererror.NewPermanent(err) + } + return be.Send(ctx, req) + } +} + +type tracesExporter struct { + *BaseExporter + consumer.Traces +} + +// NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewTracesRequest( + _ context.Context, + set exporter.Settings, + converter request.RequestConverterFunc[ptrace.Traces], + pusher request.RequestConsumeFunc, + options ...Option, +) (exporter.Traces, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilTracesConverter + } + + if pusher == nil { + return nil, errNilConsumeRequest + } + + be, err := NewBaseExporter(set, pipeline.SignalTraces, pusher, options...) + if err != nil { + return nil, err + } + + tc, err := consumer.NewTraces(newConsumeTraces(converter, be, set.Logger), be.ConsumerOptions...) + if err != nil { + return nil, err + } + + return &tracesExporter{BaseExporter: be, Traces: tc}, nil +} + +func newConsumeTraces(converter request.RequestConverterFunc[ptrace.Traces], be *BaseExporter, logger *zap.Logger) consumer.ConsumeTracesFunc { + return func(ctx context.Context, td ptrace.Traces) error { + req, err := converter(ctx, td) + if err != nil { + logger.Error("Failed to convert traces. Dropping data.", + zap.Int("dropped_spans", td.SpanCount()), + zap.Error(err)) + return consumererror.NewPermanent(err) + } + return be.Send(ctx, req) + } +} + +type metricsExporter struct { + *BaseExporter + consumer.Metrics +} + +// NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMetricsRequest( + _ context.Context, + set exporter.Settings, + converter request.RequestConverterFunc[pmetric.Metrics], + pusher request.RequestConsumeFunc, + options ...Option, +) (exporter.Metrics, error) { + if set.Logger == nil { + return nil, errNilLogger + } + + if converter == nil { + return nil, errNilMetricsConverter + } + + if pusher == nil { + return nil, errNilConsumeRequest + } + + be, err := NewBaseExporter(set, pipeline.SignalMetrics, pusher, options...) + if err != nil { + return nil, err + } + + mc, err := consumer.NewMetrics(newConsumeMetrics(converter, be, set.Logger), be.ConsumerOptions...) + if err != nil { + return nil, err + } + + return &metricsExporter{BaseExporter: be, Metrics: mc}, nil +} + +func newConsumeMetrics(converter request.RequestConverterFunc[pmetric.Metrics], be *BaseExporter, logger *zap.Logger) consumer.ConsumeMetricsFunc { + return func(ctx context.Context, md pmetric.Metrics) error { + req, err := converter(ctx, md) + if err != nil { + logger.Error("Failed to convert metrics. Dropping data.", + zap.Int("dropped_data_points", md.DataPointCount()), + zap.Error(err)) + return consumererror.NewPermanent(err) + } + return be.Send(ctx, req) + } +} diff --git a/exporter/exporterhelper/internal/new_request_test.go b/exporter/exporterhelper/internal/new_request_test.go new file mode 100644 index 000000000000..7245b8f7cb1d --- /dev/null +++ b/exporter/exporterhelper/internal/new_request_test.go @@ -0,0 +1,285 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + +import ( + "context" + "errors" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "go.opentelemetry.io/collector/component/componenttest" + "go.opentelemetry.io/collector/consumer" + "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/requesttest" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sendertest" + "go.opentelemetry.io/collector/exporter/exportertest" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +func TestLogsRequest_NilLogger(t *testing.T) { + le, err := NewLogsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, le) + require.Equal(t, errNilLogger, err) +} + +func TestLogsRequest_NilLogsConverter(t *testing.T) { + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, le) + require.Equal(t, errNilLogsConverter, err) +} + +func TestLogsRequest_NilPushLogsData(t *testing.T) { + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromLogsFunc(nil), nil) + require.Nil(t, le) + require.Equal(t, errNilConsumeRequest, err) +} + +func TestLogsRequest_Default(t *testing.T) { + ld := plog.NewLogs() + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) + assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) + assert.NoError(t, le.Shutdown(context.Background())) +} + +func TestLogsRequest_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithCapabilities(capabilities)) + require.NoError(t, err) + require.NotNil(t, le) + + assert.Equal(t, capabilities, le.Capabilities()) +} + +func TestLogsRequest_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdown)) + assert.NotNil(t, le) + assert.NoError(t, err) + + assert.NoError(t, le.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestLogsRequest_Default_ConvertError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("convert_error") + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(want), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsRequest_Default_ExportError(t *testing.T) { + ld := plog.NewLogs() + want := errors.New("export_error") + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[request.Request](want)) + require.NoError(t, err) + require.NotNil(t, le) + require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) +} + +func TestLogsRequest_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdownErr)) + assert.NotNil(t, le) + require.NoError(t, err) + + assert.Equal(t, want, le.Shutdown(context.Background())) +} + +func TestTracesRequest_NilLogger(t *testing.T) { + te, err := NewTracesRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, te) + require.Equal(t, errNilLogger, err) +} + +func TestTracesRequest_NilTracesConverter(t *testing.T) { + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, te) + require.Equal(t, errNilTracesConverter, err) +} + +func TestTracesRequest_NilPushTraceData(t *testing.T) { + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromTracesFunc(nil), nil) + require.Nil(t, te) + require.Equal(t, errNilConsumeRequest, err) +} + +func TestTracesRequest_Default(t *testing.T) { + td := ptrace.NewTraces() + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + assert.NotNil(t, te) + require.NoError(t, err) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) + assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, te.ConsumeTraces(context.Background(), td)) + assert.NoError(t, te.Shutdown(context.Background())) +} + +func TestTracesRequest_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithCapabilities(capabilities)) + assert.NotNil(t, te) + require.NoError(t, err) + + assert.Equal(t, capabilities, te.Capabilities()) +} + +func TestTracesRequest_Default_ConvertError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("convert_error") + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(want), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesRequest_Default_ExportError(t *testing.T) { + td := ptrace.NewTraces() + want := errors.New("export_error") + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[request.Request](want)) + require.NoError(t, err) + require.NotNil(t, te) + require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) +} + +func TestTracesRequest_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdown)) + assert.NotNil(t, te) + assert.NoError(t, err) + + assert.NoError(t, te.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestTracesRequest_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdownErr)) + assert.NotNil(t, te) + require.NoError(t, err) + + assert.Equal(t, want, te.Shutdown(context.Background())) +} + +func TestMetricsRequest_NilLogger(t *testing.T) { + me, err := NewMetricsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, me) + require.Equal(t, errNilLogger, err) +} + +func TestMetricsRequest_NilMetricsConverter(t *testing.T) { + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[request.Request]()) + require.Nil(t, me) + require.Equal(t, errNilMetricsConverter, err) +} + +func TestMetricsRequest_NilPushMetricsData(t *testing.T) { + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromMetricsFunc(nil), nil) + require.Nil(t, me) + require.Equal(t, errNilConsumeRequest, err) +} + +func TestMetricsRequest_Default(t *testing.T) { + md := pmetric.NewMetrics() + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) + assert.NoError(t, me.Shutdown(context.Background())) +} + +func TestMetricsRequest_WithCapabilities(t *testing.T) { + capabilities := consumer.Capabilities{MutatesData: true} + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithCapabilities(capabilities)) + require.NoError(t, err) + assert.NotNil(t, me) + + assert.Equal(t, capabilities, me.Capabilities()) +} + +func TestMetricsRequest_Default_ConvertError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("convert_error") + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(want), sendertest.NewNopSenderFunc[request.Request]()) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsRequest_Default_ExportError(t *testing.T) { + md := pmetric.NewMetrics() + want := errors.New("export_error") + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[request.Request](want)) + require.NoError(t, err) + require.NotNil(t, me) + require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) +} + +func TestMetricsRequest_WithShutdown(t *testing.T) { + shutdownCalled := false + shutdown := func(context.Context) error { shutdownCalled = true; return nil } + + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdown)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.NoError(t, me.Shutdown(context.Background())) + assert.True(t, shutdownCalled) +} + +func TestMetricsRequest_WithShutdown_ReturnError(t *testing.T) { + want := errors.New("my_error") + shutdownErr := func(context.Context) error { return want } + + me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), + requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[request.Request](), WithShutdown(shutdownErr)) + assert.NotNil(t, me) + assert.NoError(t, err) + + assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) + assert.Equal(t, want, me.Shutdown(context.Background())) +} diff --git a/exporter/exporterhelper/internal/request/request.go b/exporter/exporterhelper/internal/request/request.go index c0a44082d598..13729f91fb76 100644 --- a/exporter/exporterhelper/internal/request/request.go +++ b/exporter/exporterhelper/internal/request/request.go @@ -5,6 +5,8 @@ package request // import "go.opentelemetry.io/collector/exporter/exporterhelper import ( "context" + + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" ) // Request represents a single request that can be sent to an external endpoint. @@ -41,3 +43,9 @@ type ErrorHandler interface { // Otherwise, it should return the original Request. OnError(error) Request } + +type RequestConverterFunc[T any] func(context.Context, T) (Request, error) + +// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, +// and accessing it is considered undefined behavior. +type RequestConsumeFunc = sender.SendFunc[Request] diff --git a/exporter/exporterhelper/logs.go b/exporter/exporterhelper/logs.go index cdac6a5e1f87..23eec4a9bc17 100644 --- a/exporter/exporterhelper/logs.go +++ b/exporter/exporterhelper/logs.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -20,7 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/xpdata/pref" pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" - "go.opentelemetry.io/collector/pipeline" ) var ( @@ -131,11 +128,6 @@ func (req *logsRequest) BytesSize() int { return logsMarshaler.LogsSize(req.ld) } -type logsExporter struct { - *internal.BaseExporter - consumer.Logs -} - // NewLogs creates an exporter.Logs that records observability logs and wraps every request with a Span. func NewLogs( ctx context.Context, @@ -150,68 +142,32 @@ func NewLogs( if pusher == nil { return nil, errNilPushLogs } - return NewLogsRequest(ctx, set, requestFromLogs(), requestConsumeFromLogs(pusher), + return internal.NewLogsRequest(ctx, set, requestFromLogs(), requestConsumeFromLogs(pusher), append([]Option{internal.WithQueueBatchSettings(NewLogsQueueBatchSettings())}, options...)...) } -// requestConsumeFromLogs returns a RequestConsumeFunc that consumes plog.Logs. -func requestConsumeFromLogs(pusher consumer.ConsumeLogsFunc) RequestConsumeFunc { - return func(ctx context.Context, request Request) error { - return pusher.ConsumeLogs(ctx, request.(*logsRequest).ld) - } -} - -// requestFromLogs returns a RequestFromLogsFunc that converts plog.Logs into a Request. -func requestFromLogs() RequestConverterFunc[plog.Logs] { - return func(_ context.Context, ld plog.Logs) (Request, error) { - return newLogsRequest(ld), nil - } -} - // NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated [v0.136.0]: Use xexporterhelper.NewLogsRequest instead. func NewLogsRequest( - _ context.Context, + ctx context.Context, set exporter.Settings, converter RequestConverterFunc[plog.Logs], pusher RequestConsumeFunc, options ...Option, ) (exporter.Logs, error) { - if set.Logger == nil { - return nil, errNilLogger - } - - if converter == nil { - return nil, errNilLogsConverter - } - - if pusher == nil { - return nil, errNilConsumeRequest - } - - be, err := internal.NewBaseExporter(set, pipeline.SignalLogs, pusher, options...) - if err != nil { - return nil, err - } + return internal.NewLogsRequest(ctx, set, converter, pusher, options...) +} - lc, err := consumer.NewLogs(newConsumeLogs(converter, be, set.Logger), be.ConsumerOptions...) - if err != nil { - return nil, err +// requestConsumeFromLogs returns a RequestConsumeFunc that consumes plog.Logs. +func requestConsumeFromLogs(pusher consumer.ConsumeLogsFunc) RequestConsumeFunc { + return func(ctx context.Context, request Request) error { + return pusher.ConsumeLogs(ctx, request.(*logsRequest).ld) } - - return &logsExporter{BaseExporter: be, Logs: lc}, nil } -func newConsumeLogs(converter RequestConverterFunc[plog.Logs], be *internal.BaseExporter, logger *zap.Logger) consumer.ConsumeLogsFunc { - return func(ctx context.Context, ld plog.Logs) error { - req, err := converter(ctx, ld) - if err != nil { - logger.Error("Failed to convert logs. Dropping data.", - zap.Int("dropped_log_records", ld.LogRecordCount()), - zap.Error(err)) - return consumererror.NewPermanent(err) - } - return be.Send(ctx, req) +// requestFromLogs returns a RequestFromLogsFunc that converts plog.Logs into a Request. +func requestFromLogs() RequestConverterFunc[plog.Logs] { + return func(_ context.Context, ld plog.Logs) (Request, error) { + return newLogsRequest(ld), nil } } diff --git a/exporter/exporterhelper/logs_test.go b/exporter/exporterhelper/logs_test.go index 867b66649162..d32ec05e06ce 100644 --- a/exporter/exporterhelper/logs_test.go +++ b/exporter/exporterhelper/logs_test.go @@ -71,30 +71,12 @@ func TestLogs_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } -func TestLogsRequest_NilLogger(t *testing.T) { - le, err := NewLogsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, le) - require.Equal(t, errNilLogger, err) -} - func TestLogs_NilPushLogsData(t *testing.T) { le, err := NewLogs(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeLogsConfig, nil) require.Nil(t, le) require.Equal(t, errNilPushLogs, err) } -func TestLogsRequest_NilLogsConverter(t *testing.T) { - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, le) - require.Equal(t, errNilLogsConverter, err) -} - -func TestLogsRequest_NilPushLogsData(t *testing.T) { - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromLogsFunc(nil), nil) - require.Nil(t, le) - require.Equal(t, errNilConsumeRequest, err) -} - func TestLogs_Default(t *testing.T) { ld := plog.NewLogs() le, err := NewLogs(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeLogsConfig, newPushLogsData(nil)) @@ -107,19 +89,6 @@ func TestLogs_Default(t *testing.T) { assert.NoError(t, le.Shutdown(context.Background())) } -func TestLogsRequest_Default(t *testing.T) { - ld := plog.NewLogs() - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - assert.NotNil(t, le) - require.NoError(t, err) - - assert.Equal(t, consumer.Capabilities{MutatesData: false}, le.Capabilities()) - assert.NoError(t, le.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, le.ConsumeLogs(context.Background(), ld)) - assert.NoError(t, le.Shutdown(context.Background())) -} - func TestLogs_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewLogs(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeLogsConfig, newPushLogsData(nil), WithCapabilities(capabilities)) @@ -129,16 +98,6 @@ func TestLogs_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, le.Capabilities()) } -func TestLogsRequest_WithCapabilities(t *testing.T) { - capabilities := consumer.Capabilities{MutatesData: true} - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithCapabilities(capabilities)) - require.NoError(t, err) - require.NotNil(t, le) - - assert.Equal(t, capabilities, le.Capabilities()) -} - func TestLogs_Default_ReturnError(t *testing.T) { ld := plog.NewLogs() want := errors.New("my_error") @@ -148,26 +107,6 @@ func TestLogs_Default_ReturnError(t *testing.T) { require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) } -func TestLogsRequest_Default_ConvertError(t *testing.T) { - ld := plog.NewLogs() - want := errors.New("convert_error") - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(want), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - require.NotNil(t, le) - require.Equal(t, consumererror.NewPermanent(want), le.ConsumeLogs(context.Background(), ld)) -} - -func TestLogsRequest_Default_ExportError(t *testing.T) { - ld := plog.NewLogs() - want := errors.New("export_error") - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) - require.NoError(t, err) - require.NotNil(t, le) - require.Equal(t, want, le.ConsumeLogs(context.Background(), ld)) -} - func TestLogs_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite @@ -279,7 +218,7 @@ func TestLogsRequest_WithRecordMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - le, err := NewLogsRequest(context.Background(), + le, err := internal.NewLogsRequest(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) @@ -305,7 +244,7 @@ func TestLogsRequest_WithRecordMetrics_ExportError(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - le, err := NewLogsRequest(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, + le, err := internal.NewLogsRequest(context.Background(), exporter.Settings{ID: fakeLogsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) @@ -333,7 +272,7 @@ func TestLogsRequest_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - le, err := NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) + le, err := internal.NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForLogs(t, sr, set.TracerProvider.Tracer("test"), le, nil) @@ -361,7 +300,7 @@ func TestLogsRequest_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("my_error") - le, err := NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) + le, err := internal.NewLogsRequest(context.Background(), set, requesttest.RequestFromLogsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForLogs(t, sr, set.TracerProvider.Tracer("test"), le, want) @@ -379,19 +318,6 @@ func TestLogs_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } -func TestLogsRequest_WithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdown)) - assert.NotNil(t, le) - assert.NoError(t, err) - - assert.NoError(t, le.Shutdown(context.Background())) - assert.True(t, shutdownCalled) -} - func TestLogs_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -403,18 +329,6 @@ func TestLogs_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, le.Shutdown(context.Background())) } -func TestLogsRequest_WithShutdown_ReturnError(t *testing.T) { - want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - le, err := NewLogsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromLogsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdownErr)) - assert.NotNil(t, le) - require.NoError(t, err) - - assert.Equal(t, want, le.Shutdown(context.Background())) -} - func newPushLogsDataModifiedDownstream(retError error) consumer.ConsumeLogsFunc { return func(_ context.Context, log plog.Logs) error { log.ResourceLogs().MoveAndAppendTo(plog.NewResourceLogsSlice()) diff --git a/exporter/exporterhelper/metrics.go b/exporter/exporterhelper/metrics.go index 280eae108d57..da22df460a34 100644 --- a/exporter/exporterhelper/metrics.go +++ b/exporter/exporterhelper/metrics.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -20,7 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/pdata/xpdata/pref" pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" - "go.opentelemetry.io/collector/pipeline" ) var ( @@ -130,11 +127,6 @@ func (req *metricsRequest) BytesSize() int { return metricsMarshaler.MetricsSize(req.md) } -type metricsExporter struct { - *internal.BaseExporter - consumer.Metrics -} - // NewMetrics creates an exporter.Metrics that records observability metrics and wraps every request with a Span. func NewMetrics( ctx context.Context, @@ -149,68 +141,32 @@ func NewMetrics( if pusher == nil { return nil, errNilPushMetrics } - return NewMetricsRequest(ctx, set, requestFromMetrics(), requestConsumeFromMetrics(pusher), + return internal.NewMetricsRequest(ctx, set, requestFromMetrics(), requestConsumeFromMetrics(pusher), append([]Option{internal.WithQueueBatchSettings(NewMetricsQueueBatchSettings())}, options...)...) } -// requestConsumeFromMetrics returns a RequestConsumeFunc that consumes pmetric.Metrics. -func requestConsumeFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestConsumeFunc { - return func(ctx context.Context, request Request) error { - return pusher.ConsumeMetrics(ctx, request.(*metricsRequest).md) - } -} - -// requestFromMetrics returns a RequestFromMetricsFunc that converts pdata.Metrics into a Request. -func requestFromMetrics() RequestConverterFunc[pmetric.Metrics] { - return func(_ context.Context, md pmetric.Metrics) (Request, error) { - return newMetricsRequest(md), nil - } -} - // NewMetricsRequest creates a new metrics exporter based on a custom MetricsConverter and Sender. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated [v0.136.0]: Use xexporterhelper.NewMetricsRequest instead. func NewMetricsRequest( - _ context.Context, + ctx context.Context, set exporter.Settings, converter RequestConverterFunc[pmetric.Metrics], pusher RequestConsumeFunc, options ...Option, ) (exporter.Metrics, error) { - if set.Logger == nil { - return nil, errNilLogger - } - - if converter == nil { - return nil, errNilMetricsConverter - } - - if pusher == nil { - return nil, errNilConsumeRequest - } - - be, err := internal.NewBaseExporter(set, pipeline.SignalMetrics, pusher, options...) - if err != nil { - return nil, err - } + return internal.NewMetricsRequest(ctx, set, converter, pusher, options...) +} - mc, err := consumer.NewMetrics(newConsumeMetrics(converter, be, set.Logger), be.ConsumerOptions...) - if err != nil { - return nil, err +// requestConsumeFromMetrics returns a RequestConsumeFunc that consumes pmetric.Metrics. +func requestConsumeFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestConsumeFunc { + return func(ctx context.Context, request Request) error { + return pusher.ConsumeMetrics(ctx, request.(*metricsRequest).md) } - - return &metricsExporter{BaseExporter: be, Metrics: mc}, nil } -func newConsumeMetrics(converter RequestConverterFunc[pmetric.Metrics], be *internal.BaseExporter, logger *zap.Logger) consumer.ConsumeMetricsFunc { - return func(ctx context.Context, md pmetric.Metrics) error { - req, err := converter(ctx, md) - if err != nil { - logger.Error("Failed to convert metrics. Dropping data.", - zap.Int("dropped_data_points", md.DataPointCount()), - zap.Error(err)) - return consumererror.NewPermanent(err) - } - return be.Send(ctx, req) +// requestFromMetrics returns a RequestFromMetricsFunc that converts pdata.Metrics into a Request. +func requestFromMetrics() RequestConverterFunc[pmetric.Metrics] { + return func(_ context.Context, md pmetric.Metrics) (Request, error) { + return newMetricsRequest(md), nil } } diff --git a/exporter/exporterhelper/metrics_test.go b/exporter/exporterhelper/metrics_test.go index 81e98ad84b1e..6466e30ae5f1 100644 --- a/exporter/exporterhelper/metrics_test.go +++ b/exporter/exporterhelper/metrics_test.go @@ -71,30 +71,12 @@ func TestMetrics_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } -func TestMetricsRequest_NilLogger(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, me) - require.Equal(t, errNilLogger, err) -} - func TestMetrics_NilPushMetricsData(t *testing.T) { me, err := NewMetrics(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeMetricsConfig, nil) require.Nil(t, me) require.Equal(t, errNilPushMetrics, err) } -func TestMetricsRequest_NilMetricsConverter(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, me) - require.Equal(t, errNilMetricsConverter, err) -} - -func TestMetricsRequest_NilPushMetricsData(t *testing.T) { - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromMetricsFunc(nil), nil) - require.Nil(t, me) - require.Equal(t, errNilConsumeRequest, err) -} - func TestMetrics_Default(t *testing.T) { md := pmetric.NewMetrics() me, err := NewMetrics(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeMetricsConfig, newPushMetricsData(nil)) @@ -107,19 +89,6 @@ func TestMetrics_Default(t *testing.T) { assert.NoError(t, me.Shutdown(context.Background())) } -func TestMetricsRequest_Default(t *testing.T) { - md := pmetric.NewMetrics() - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - assert.NotNil(t, me) - - assert.Equal(t, consumer.Capabilities{MutatesData: false}, me.Capabilities()) - assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, me.ConsumeMetrics(context.Background(), md)) - assert.NoError(t, me.Shutdown(context.Background())) -} - func TestMetrics_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} me, err := NewMetrics(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeMetricsConfig, newPushMetricsData(nil), WithCapabilities(capabilities)) @@ -129,16 +98,6 @@ func TestMetrics_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, me.Capabilities()) } -func TestMetricsRequest_WithCapabilities(t *testing.T) { - capabilities := consumer.Capabilities{MutatesData: true} - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithCapabilities(capabilities)) - require.NoError(t, err) - assert.NotNil(t, me) - - assert.Equal(t, capabilities, me.Capabilities()) -} - func TestMetrics_Default_ReturnError(t *testing.T) { md := pmetric.NewMetrics() want := errors.New("my_error") @@ -148,26 +107,6 @@ func TestMetrics_Default_ReturnError(t *testing.T) { require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) } -func TestMetricsRequest_Default_ConvertError(t *testing.T) { - md := pmetric.NewMetrics() - want := errors.New("convert_error") - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(want), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - require.NotNil(t, me) - require.Equal(t, consumererror.NewPermanent(want), me.ConsumeMetrics(context.Background(), md)) -} - -func TestMetricsRequest_Default_ExportError(t *testing.T) { - md := pmetric.NewMetrics() - want := errors.New("export_error") - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) - require.NoError(t, err) - require.NotNil(t, me) - require.Equal(t, want, me.ConsumeMetrics(context.Background(), md)) -} - func TestMetrics_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite @@ -279,7 +218,7 @@ func TestMetricsRequest_WithRecordMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsRequest(context.Background(), + me, err := internal.NewMetricsRequest(context.Background(), exporter.Settings{ID: fakeMetricsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) @@ -305,7 +244,7 @@ func TestMetricsRequest_WithRecordMetrics_ExportError(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - me, err := NewMetricsRequest(context.Background(), + me, err := internal.NewMetricsRequest(context.Background(), exporter.Settings{ID: fakeMetricsName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) @@ -334,7 +273,7 @@ func TestMetricsRequest_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - me, err := NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) + me, err := internal.NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetrics(t, sr, set.TracerProvider.Tracer("test"), me, nil) @@ -362,7 +301,7 @@ func TestMetricsRequest_WithSpan_ExportError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("my_error") - me, err := NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) + me, err := internal.NewMetricsRequest(context.Background(), set, requesttest.RequestFromMetricsFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, me) checkWrapSpanForMetrics(t, sr, set.TracerProvider.Tracer("test"), me, want) @@ -381,20 +320,6 @@ func TestMetrics_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } -func TestMetricsRequest_WithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdown)) - assert.NotNil(t, me) - assert.NoError(t, err) - - assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, me.Shutdown(context.Background())) - assert.True(t, shutdownCalled) -} - func TestMetrics_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -407,19 +332,6 @@ func TestMetrics_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, me.Shutdown(context.Background())) } -func TestMetricsRequest_WithShutdown_ReturnError(t *testing.T) { - want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - me, err := NewMetricsRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromMetricsFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdownErr)) - assert.NotNil(t, me) - assert.NoError(t, err) - - assert.NoError(t, me.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, want, me.Shutdown(context.Background())) -} - func newPushMetricsData(retError error) consumer.ConsumeMetricsFunc { return func(_ context.Context, _ pmetric.Metrics) error { return retError diff --git a/exporter/exporterhelper/queue_batch.go b/exporter/exporterhelper/queue_batch.go index cdb0261f19ef..d4ba21f9fcde 100644 --- a/exporter/exporterhelper/queue_batch.go +++ b/exporter/exporterhelper/queue_batch.go @@ -36,10 +36,12 @@ type QueueBatchEncoding[T any] interface { var ErrQueueIsFull = queue.ErrQueueIsFull +// Deprecated: [v0.136.0] Use xexporterhelper.WithQueueBatch. // QueueBatchSettings are settings for the QueueBatch component. // They include things line Encoding to be used with persistent queue, or the available Sizers, etc. type QueueBatchSettings = queuebatch.Settings[Request] +// Deprecated: [v0.136.0] Use xexporterhelper.WithQueueBatch or WithQueue. // WithQueueBatch enables queueing and batching for an exporter. // This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. // Experimental: This API is at the early stage of development and may change without backward compatibility diff --git a/exporter/exporterhelper/request.go b/exporter/exporterhelper/request.go index dcb3046ac387..5f3cfa75bbc7 100644 --- a/exporter/exporterhelper/request.go +++ b/exporter/exporterhelper/request.go @@ -4,17 +4,16 @@ package exporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper" import ( - "context" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" - "go.opentelemetry.io/collector/exporter/exporterhelper/internal/sender" ) +// Deprecated: [v0.136.0] Use xexporterhelper.Request. // Request represents a single request that can be sent to an external endpoint. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type Request = request.Request +// Deprecated: [v0.136.0] Use xexporterhelper.RequestErroHandler // RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial // temporary failures. For example, if some items failed to process and can be retried, this interface allows to // return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. @@ -23,15 +22,18 @@ type Request = request.Request // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. type RequestErrorHandler = request.ErrorHandler +// Deprecated: [v0.136.0] Use xexporterhelper.RequestConsumeFunc. // RequestConverterFunc converts pdata telemetry into a user-defined Request. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -type RequestConverterFunc[T any] func(context.Context, T) (Request, error) +type RequestConverterFunc[T any] = request.RequestConverterFunc[T] +// Deprecated: [v0.136.0] Use xexporterhelper.RequestConsumeFunc. // RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, // and accessing it is considered undefined behavior. -type RequestConsumeFunc = sender.SendFunc[Request] +type RequestConsumeFunc = request.RequestConsumeFunc +// Deprecated: [v0.136.0] Use xexporterhelper.RequestSizer. // RequestSizer is an interface that returns the size of the given request. type RequestSizer = request.Sizer[Request] diff --git a/exporter/exporterhelper/traces.go b/exporter/exporterhelper/traces.go index 96e87d6ff321..4900511be698 100644 --- a/exporter/exporterhelper/traces.go +++ b/exporter/exporterhelper/traces.go @@ -7,8 +7,6 @@ import ( "context" "errors" - "go.uber.org/zap" - "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" @@ -20,7 +18,6 @@ import ( "go.opentelemetry.io/collector/pdata/ptrace" "go.opentelemetry.io/collector/pdata/xpdata/pref" pdatareq "go.opentelemetry.io/collector/pdata/xpdata/request" - "go.opentelemetry.io/collector/pipeline" ) var ( @@ -130,11 +127,6 @@ func (req *tracesRequest) BytesSize() int { return tracesMarshaler.TracesSize(req.td) } -type tracesExporter struct { - *internal.BaseExporter - consumer.Traces -} - // NewTraces creates an exporter.Traces that records observability metrics and wraps every request with a Span. func NewTraces( ctx context.Context, @@ -149,68 +141,32 @@ func NewTraces( if pusher == nil { return nil, errNilPushTraces } - return NewTracesRequest(ctx, set, requestFromTraces(), requestConsumeFromTraces(pusher), + return internal.NewTracesRequest(ctx, set, requestFromTraces(), requestConsumeFromTraces(pusher), append([]Option{internal.WithQueueBatchSettings(NewTracesQueueBatchSettings())}, options...)...) } -// requestConsumeFromTraces returns a RequestConsumeFunc that consumes ptrace.Traces. -func requestConsumeFromTraces(pusher consumer.ConsumeTracesFunc) RequestConsumeFunc { - return func(ctx context.Context, request Request) error { - return pusher.ConsumeTraces(ctx, request.(*tracesRequest).td) - } -} - -// requestFromTraces returns a RequestConverterFunc that converts ptrace.Traces into a Request. -func requestFromTraces() RequestConverterFunc[ptrace.Traces] { - return func(_ context.Context, traces ptrace.Traces) (Request, error) { - return newTracesRequest(traces), nil - } -} - // NewTracesRequest creates a new traces exporter based on a custom TracesConverter and Sender. -// Experimental: This API is at the early stage of development and may change without backward compatibility -// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +// Deprecated [v0.136.0]: Use xexporterhelper.NewTracesRequest instead. func NewTracesRequest( - _ context.Context, + ctx context.Context, set exporter.Settings, converter RequestConverterFunc[ptrace.Traces], pusher RequestConsumeFunc, options ...Option, ) (exporter.Traces, error) { - if set.Logger == nil { - return nil, errNilLogger - } - - if converter == nil { - return nil, errNilTracesConverter - } - - if pusher == nil { - return nil, errNilConsumeRequest - } - - be, err := internal.NewBaseExporter(set, pipeline.SignalTraces, pusher, options...) - if err != nil { - return nil, err - } + return internal.NewTracesRequest(ctx, set, converter, pusher, options...) +} - tc, err := consumer.NewTraces(newConsumeTraces(converter, be, set.Logger), be.ConsumerOptions...) - if err != nil { - return nil, err +// requestConsumeFromTraces returns a RequestConsumeFunc that consumes ptrace.Traces. +func requestConsumeFromTraces(pusher consumer.ConsumeTracesFunc) RequestConsumeFunc { + return func(ctx context.Context, request Request) error { + return pusher.ConsumeTraces(ctx, request.(*tracesRequest).td) } - - return &tracesExporter{BaseExporter: be, Traces: tc}, nil } -func newConsumeTraces(converter RequestConverterFunc[ptrace.Traces], be *internal.BaseExporter, logger *zap.Logger) consumer.ConsumeTracesFunc { - return func(ctx context.Context, td ptrace.Traces) error { - req, err := converter(ctx, td) - if err != nil { - logger.Error("Failed to convert traces. Dropping data.", - zap.Int("dropped_spans", td.SpanCount()), - zap.Error(err)) - return consumererror.NewPermanent(err) - } - return be.Send(ctx, req) +// requestFromTraces returns a RequestConverterFunc that converts ptrace.Traces into a Request. +func requestFromTraces() RequestConverterFunc[ptrace.Traces] { + return func(_ context.Context, traces ptrace.Traces) (Request, error) { + return newTracesRequest(traces), nil } } diff --git a/exporter/exporterhelper/traces_test.go b/exporter/exporterhelper/traces_test.go index 59e622dc9aeb..b3417dd57e2b 100644 --- a/exporter/exporterhelper/traces_test.go +++ b/exporter/exporterhelper/traces_test.go @@ -67,30 +67,12 @@ func TestTraces_NilLogger(t *testing.T) { require.Equal(t, errNilLogger, err) } -func TestTracesRequest_NilLogger(t *testing.T) { - te, err := NewTracesRequest(context.Background(), exporter.Settings{}, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, te) - require.Equal(t, errNilLogger, err) -} - func TestTraces_NilPushTraceData(t *testing.T) { te, err := NewTraces(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeTracesConfig, nil) require.Nil(t, te) require.Equal(t, errNilPushTraces, err) } -func TestTracesRequest_NilTracesConverter(t *testing.T) { - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) - require.Nil(t, te) - require.Equal(t, errNilTracesConverter, err) -} - -func TestTracesRequest_NilPushTraceData(t *testing.T) { - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), requesttest.RequestFromTracesFunc(nil), nil) - require.Nil(t, te) - require.Equal(t, errNilConsumeRequest, err) -} - func TestTraces_Default(t *testing.T) { td := ptrace.NewTraces() te, err := NewTraces(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeTracesConfig, newTraceDataPusher(nil)) @@ -103,19 +85,6 @@ func TestTraces_Default(t *testing.T) { assert.NoError(t, te.Shutdown(context.Background())) } -func TestTracesRequest_Default(t *testing.T) { - td := ptrace.NewTraces() - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) - assert.NotNil(t, te) - require.NoError(t, err) - - assert.Equal(t, consumer.Capabilities{MutatesData: false}, te.Capabilities()) - assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, te.ConsumeTraces(context.Background(), td)) - assert.NoError(t, te.Shutdown(context.Background())) -} - func TestTraces_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} te, err := NewTraces(context.Background(), exportertest.NewNopSettings(exportertest.NopType), &fakeTracesConfig, newTraceDataPusher(nil), WithCapabilities(capabilities)) @@ -125,16 +94,6 @@ func TestTraces_WithCapabilities(t *testing.T) { assert.Equal(t, capabilities, te.Capabilities()) } -func TestTracesRequest_WithCapabilities(t *testing.T) { - capabilities := consumer.Capabilities{MutatesData: true} - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request](), WithCapabilities(capabilities)) - assert.NotNil(t, te) - require.NoError(t, err) - - assert.Equal(t, capabilities, te.Capabilities()) -} - func TestTraces_Default_ReturnError(t *testing.T) { td := ptrace.NewTraces() want := errors.New("my_error") @@ -146,26 +105,6 @@ func TestTraces_Default_ReturnError(t *testing.T) { require.Equal(t, want, err) } -func TestTracesRequest_Default_ConvertError(t *testing.T) { - td := ptrace.NewTraces() - want := errors.New("convert_error") - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(want), sendertest.NewNopSenderFunc[Request]()) - require.NoError(t, err) - require.NotNil(t, te) - require.Equal(t, consumererror.NewPermanent(want), te.ConsumeTraces(context.Background(), td)) -} - -func TestTracesRequest_Default_ExportError(t *testing.T) { - td := ptrace.NewTraces() - want := errors.New("export_error") - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) - require.NoError(t, err) - require.NotNil(t, te) - require.Equal(t, want, te.ConsumeTraces(context.Background(), td)) -} - func TestTraces_WithPersistentQueue(t *testing.T) { fgOrigReadState := queue.PersistRequestContextOnRead fgOrigWriteState := queue.PersistRequestContextOnWrite @@ -277,7 +216,7 @@ func TestTracesRequest_WithRecordMetrics(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesRequest(context.Background(), + te, err := internal.NewTracesRequest(context.Background(), exporter.Settings{ID: fakeTracesName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) @@ -303,7 +242,7 @@ func TestTracesRequest_WithRecordMetrics_RequestSenderError(t *testing.T) { tt := componenttest.NewTelemetry() t.Cleanup(func() { require.NoError(t, tt.Shutdown(context.Background())) }) - te, err := NewTracesRequest(context.Background(), + te, err := internal.NewTracesRequest(context.Background(), exporter.Settings{ID: fakeTracesName, TelemetrySettings: tt.NewTelemetrySettings(), BuildInfo: component.NewDefaultBuildInfo()}, requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) @@ -333,7 +272,7 @@ func TestTracesRequest_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - te, err := NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) + te, err := internal.NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, te) @@ -363,7 +302,7 @@ func TestTracesRequest_WithSpan_ExportError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("export_error") - te, err := NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) + te, err := internal.NewTracesRequest(context.Background(), set, requesttest.RequestFromTracesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, te) @@ -383,20 +322,6 @@ func TestTraces_WithShutdown(t *testing.T) { assert.True(t, shutdownCalled) } -func TestTracesRequest_WithShutdown(t *testing.T) { - shutdownCalled := false - shutdown := func(context.Context) error { shutdownCalled = true; return nil } - - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdown)) - assert.NotNil(t, te) - assert.NoError(t, err) - - assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) - assert.NoError(t, te.Shutdown(context.Background())) - assert.True(t, shutdownCalled) -} - func TestTraces_WithShutdown_ReturnError(t *testing.T) { want := errors.New("my_error") shutdownErr := func(context.Context) error { return want } @@ -409,19 +334,6 @@ func TestTraces_WithShutdown_ReturnError(t *testing.T) { assert.Equal(t, want, te.Shutdown(context.Background())) } -func TestTracesRequest_WithShutdown_ReturnError(t *testing.T) { - want := errors.New("my_error") - shutdownErr := func(context.Context) error { return want } - - te, err := NewTracesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requesttest.RequestFromTracesFunc(nil), sendertest.NewNopSenderFunc[Request](), WithShutdown(shutdownErr)) - assert.NotNil(t, te) - assert.NoError(t, err) - - assert.NoError(t, te.Start(context.Background(), componenttest.NewNopHost())) - assert.Equal(t, want, te.Shutdown(context.Background())) -} - func newTraceDataPusher(retError error) consumer.ConsumeTracesFunc { return func(context.Context, ptrace.Traces) error { return retError diff --git a/exporter/exporterhelper/xexporterhelper/go.mod b/exporter/exporterhelper/xexporterhelper/go.mod index c2a54c8951fe..52c5496e643c 100644 --- a/exporter/exporterhelper/xexporterhelper/go.mod +++ b/exporter/exporterhelper/xexporterhelper/go.mod @@ -15,6 +15,7 @@ require ( go.opentelemetry.io/collector/exporter/exporterhelper v0.135.0 go.opentelemetry.io/collector/exporter/exportertest v0.135.0 go.opentelemetry.io/collector/exporter/xexporter v0.135.0 + go.opentelemetry.io/collector/pdata v1.41.0 go.opentelemetry.io/collector/pdata/pprofile v0.135.0 go.opentelemetry.io/collector/pdata/testdata v0.135.0 go.opentelemetry.io/collector/pdata/xpdata v0.135.0 @@ -54,7 +55,6 @@ require ( go.opentelemetry.io/collector/extension/xextension v0.135.0 // indirect go.opentelemetry.io/collector/featuregate v1.41.0 // indirect go.opentelemetry.io/collector/internal/telemetry v0.135.0 // indirect - go.opentelemetry.io/collector/pdata v1.41.0 // indirect go.opentelemetry.io/collector/pipeline v1.41.0 // indirect go.opentelemetry.io/collector/receiver v1.41.0 // indirect go.opentelemetry.io/collector/receiver/receivertest v0.135.0 // indirect diff --git a/exporter/exporterhelper/xexporterhelper/new_request.go b/exporter/exporterhelper/xexporterhelper/new_request.go new file mode 100644 index 000000000000..ce9596caff4b --- /dev/null +++ b/exporter/exporterhelper/xexporterhelper/new_request.go @@ -0,0 +1,67 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xexporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" + +import ( + "context" + + "go.opentelemetry.io/collector/exporter" + "go.opentelemetry.io/collector/exporter/exporterhelper" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal" + "go.opentelemetry.io/collector/exporter/exporterhelper/internal/queuebatch" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" + "go.opentelemetry.io/collector/pdata/ptrace" +) + +// NewLogsRequest creates new logs exporter based on custom LogsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewLogsRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[plog.Logs], + pusher RequestConsumeFunc, + options ...exporterhelper.Option, +) (exporter.Logs, error) { + return internal.NewLogsRequest(ctx, set, converter, pusher, options...) +} + +// NewMetricsRequest creates new metrics exporter based on custom MetricsConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewMetricsRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[pmetric.Metrics], + pusher RequestConsumeFunc, + options ...exporterhelper.Option, +) (exporter.Metrics, error) { + return internal.NewMetricsRequest(ctx, set, converter, pusher, options...) +} + +// NewTracesRequest creates new traces exporter based on custom TracesConverter and Sender. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func NewTracesRequest( + ctx context.Context, + set exporter.Settings, + converter RequestConverterFunc[ptrace.Traces], + pusher RequestConsumeFunc, + options ...exporterhelper.Option, +) (exporter.Traces, error) { + return internal.NewTracesRequest(ctx, set, converter, pusher, options...) +} + +// QueueBatchSettings are settings for the QueueBatch component. +// They include things line Encoding to be used with persistent queue, or the available Sizers, etc. +type QueueBatchSettings = queuebatch.Settings[Request] + +// WithQueueBatch enables queueing and batching for an exporter. +// This option should be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +func WithQueueBatch(cfg exporterhelper.QueueBatchConfig, set QueueBatchSettings) exporterhelper.Option { + return internal.WithQueueBatch(cfg, set) +} diff --git a/exporter/exporterhelper/xexporterhelper/profiles.go b/exporter/exporterhelper/xexporterhelper/profiles.go index 31a891e8491f..d3546f43cb35 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles.go +++ b/exporter/exporterhelper/xexporterhelper/profiles.go @@ -34,8 +34,8 @@ var ( // NewProfilesQueueBatchSettings returns a new QueueBatchSettings to configure to WithQueueBatch when using pprofile.Profiles. // Experimental: This API is at the early stage of development and may change without backward compatibility // until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. -func NewProfilesQueueBatchSettings() exporterhelper.QueueBatchSettings { - return exporterhelper.QueueBatchSettings{ +func NewProfilesQueueBatchSettings() QueueBatchSettings { + return QueueBatchSettings{ ReferenceCounter: profilesReferenceCounter{}, Encoding: profilesEncoding{}, ItemsSizer: request.NewItemsSizer(), @@ -57,7 +57,7 @@ type profilesRequest struct { cachedSize int } -func newProfilesRequest(pd pprofile.Profiles) exporterhelper.Request { +func newProfilesRequest(pd pprofile.Profiles) Request { return &profilesRequest{ pd: pd, cachedSize: -1, @@ -105,7 +105,7 @@ func (profilesReferenceCounter) Unref(req request.Request) { pref.UnrefProfiles(req.(*profilesRequest).pd) } -func (req *profilesRequest) OnError(err error) exporterhelper.Request { +func (req *profilesRequest) OnError(err error) Request { var profileError xconsumererror.Profiles if errors.As(err, &profileError) { // TODO: Add logic to unref the new request created here. @@ -157,15 +157,15 @@ func NewProfiles( } // requestConsumeFromProfiles returns a RequestConsumeFunc that consumes pprofile.Profiles. -func requestConsumeFromProfiles(pusher xconsumer.ConsumeProfilesFunc) exporterhelper.RequestConsumeFunc { - return func(ctx context.Context, request exporterhelper.Request) error { +func requestConsumeFromProfiles(pusher xconsumer.ConsumeProfilesFunc) RequestConsumeFunc { + return func(ctx context.Context, request Request) error { return pusher.ConsumeProfiles(ctx, request.(*profilesRequest).pd) } } // requestFromProfiles returns a RequestFromProfilesFunc that converts pprofile.Profiles into a Request. -func requestFromProfiles() exporterhelper.RequestConverterFunc[pprofile.Profiles] { - return func(_ context.Context, profiles pprofile.Profiles) (exporterhelper.Request, error) { +func requestFromProfiles() RequestConverterFunc[pprofile.Profiles] { + return func(_ context.Context, profiles pprofile.Profiles) (Request, error) { return newProfilesRequest(profiles), nil } } @@ -176,8 +176,8 @@ func requestFromProfiles() exporterhelper.RequestConverterFunc[pprofile.Profiles func NewProfilesRequest( _ context.Context, set exporter.Settings, - converter exporterhelper.RequestConverterFunc[pprofile.Profiles], - pusher exporterhelper.RequestConsumeFunc, + converter RequestConverterFunc[pprofile.Profiles], + pusher RequestConsumeFunc, options ...exporterhelper.Option, ) (xexporter.Profiles, error) { if set.Logger == nil { @@ -205,7 +205,7 @@ func NewProfilesRequest( return &profileExporter{BaseExporter: be, Profiles: tc}, nil } -func newConsumeProfiles(converter exporterhelper.RequestConverterFunc[pprofile.Profiles], be *internal.BaseExporter, logger *zap.Logger) xconsumer.ConsumeProfilesFunc { +func newConsumeProfiles(converter RequestConverterFunc[pprofile.Profiles], be *internal.BaseExporter, logger *zap.Logger) xconsumer.ConsumeProfilesFunc { return func(ctx context.Context, pd pprofile.Profiles) error { req, err := converter(ctx, pd) if err != nil { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch.go b/exporter/exporterhelper/xexporterhelper/profiles_batch.go index 829fca166185..3ee8d7a5c90a 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch.go @@ -17,7 +17,7 @@ import ( // // Following the OTLP 1.7.0 upgrade, this is currently a noop. // See https://github.com/open-telemetry/opentelemetry-collector/issues/13106 -func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt exporterhelper.RequestSizerType, r2 exporterhelper.Request) ([]exporterhelper.Request, error) { +func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt exporterhelper.RequestSizerType, r2 Request) ([]Request, error) { var sz sizer.ProfilesSizer switch szt { case exporterhelper.RequestSizerTypeItems: @@ -38,7 +38,7 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor // If no limit we can simply merge the new request into the current and return. if maxSize == 0 { - return []exporterhelper.Request{req, req2}, nil + return []Request{req, req2}, nil } sp1, err1 := req.split(maxSize, sz) @@ -49,7 +49,7 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor // If no limit we can simply merge the new request into the current and return. if maxSize == 0 { - return []exporterhelper.Request{req}, nil + return []Request{req}, nil } return req.split(maxSize, sz) } @@ -63,8 +63,8 @@ func (req *profilesRequest) MergeSplit(_ context.Context, maxSize int, szt expor req.pd.ResourceProfiles().MoveAndAppendTo(dst.pd.ResourceProfiles()) }*/ -func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]exporterhelper.Request, error) { - var res []exporterhelper.Request +func (req *profilesRequest) split(maxSize int, sz sizer.ProfilesSizer) ([]Request, error) { + var res []Request for req.size(sz) > maxSize { pd, rmSize := extractProfiles(req.pd, maxSize, sz) if pd.SampleCount() == 0 { diff --git a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go index 877d1370f642..a518bf350c17 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_batch_test.go @@ -40,9 +40,9 @@ func TestMergeSplitProfiles(t *testing.T) { name string szt exporterhelper.RequestSizerType maxSize int - pr1 exporterhelper.Request - pr2 exporterhelper.Request - expected []exporterhelper.Request + pr1 Request + pr2 Request + expected []Request }{ { name: "both_requests_empty", @@ -50,7 +50,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(pprofile.NewProfiles()), - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "first_request_empty", @@ -58,7 +58,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, }, { name: "first_empty_second_nil", @@ -66,7 +66,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: nil, - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "merge_only", @@ -74,7 +74,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), - expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { + expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles @@ -86,7 +86,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 4, pr1: newProfilesRequest(testdata.GenerateProfiles(10)), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(2)), @@ -98,7 +98,7 @@ func TestMergeSplitProfiles(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(8)), pr2: newProfilesRequest(testdata.GenerateProfiles(20)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(8) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) @@ -116,7 +116,7 @@ func TestMergeSplitProfiles(t *testing.T) { return testdata.GenerateProfiles(6) }()), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(2) @@ -143,9 +143,9 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { name string szt exporterhelper.RequestSizerType maxSize int - pr1 exporterhelper.Request - pr2 exporterhelper.Request - expected []exporterhelper.Request + pr1 Request + pr2 Request + expected []Request }{ { name: "both_requests_empty", @@ -153,7 +153,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(pprofile.NewProfiles()), - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "first_request_empty", @@ -161,7 +161,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, }, { name: "first_empty_second_nil", @@ -169,7 +169,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: nil, - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "merge_only", @@ -177,7 +177,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), - expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { + expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles @@ -189,7 +189,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 4, pr1: newProfilesRequest(testdata.GenerateProfiles(10)), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(2)), @@ -201,7 +201,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: 10, pr1: newProfilesRequest(testdata.GenerateProfiles(8)), pr2: newProfilesRequest(testdata.GenerateProfiles(20)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(8) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) @@ -219,7 +219,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { return testdata.GenerateProfiles(6) }()), pr2: nil, - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(func() pprofile.Profiles { return testdata.GenerateProfiles(2) @@ -232,7 +232,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(pprofile.NewProfiles()), - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "first_request_empty", @@ -240,7 +240,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(5)), - expected: []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(5))}, + expected: []Request{newProfilesRequest(testdata.GenerateProfiles(5))}, }, { name: "first_empty_second_nil", @@ -248,7 +248,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: nil, - expected: []exporterhelper.Request{newProfilesRequest(pprofile.NewProfiles())}, + expected: []Request{newProfilesRequest(pprofile.NewProfiles())}, }, { name: "merge_only", @@ -256,7 +256,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(11)), pr1: newProfilesRequest(testdata.GenerateProfiles(4)), pr2: newProfilesRequest(testdata.GenerateProfiles(6)), - expected: []exporterhelper.Request{newProfilesRequest(func() pprofile.Profiles { + expected: []Request{newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(4) testdata.GenerateProfiles(6).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) return profiles @@ -268,7 +268,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(4)), pr1: newProfilesRequest(pprofile.NewProfiles()), pr2: newProfilesRequest(testdata.GenerateProfiles(10)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(4)), newProfilesRequest(testdata.GenerateProfiles(2)), @@ -280,7 +280,7 @@ func TestMergeSplitProfilesBasedOnByteSize(t *testing.T) { maxSize: profilesMarshaler.ProfilesSize(testdata.GenerateProfiles(10)), pr1: newProfilesRequest(testdata.GenerateProfiles(8)), pr2: newProfilesRequest(testdata.GenerateProfiles(20)), - expected: []exporterhelper.Request{ + expected: []Request{ newProfilesRequest(func() pprofile.Profiles { profiles := testdata.GenerateProfiles(7) testdata.GenerateProfiles(2).ResourceProfiles().MoveAndAppendTo(profiles.ResourceProfiles()) @@ -316,7 +316,7 @@ func TestMergeSplitManySmallLogs(t *testing.T) { t.Skip("merging of profiles has been temporarily disabled (https://github.com/open-telemetry/opentelemetry-collector/issues/13106)") // All requests merge into a single batch. - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(1))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(1))} for j := 0; j < 1000; j++ { lr2 := newProfilesRequest(testdata.GenerateProfiles(10)) res, _ := merged[len(merged)-1].MergeSplit(context.Background(), 10000, exporterhelper.RequestSizerTypeItems, lr2) @@ -329,7 +329,7 @@ func BenchmarkSplittingBasedOnByteSizeManySmallProfiles(b *testing.B) { // All requests merge into a single batch. b.ReportAllocs() for i := 0; i < b.N; i++ { - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(10))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(10))} for j := 0; j < 1000; j++ { pr2 := newProfilesRequest(testdata.GenerateProfiles(10)) res, _ := merged[len(merged)-1].MergeSplit( @@ -348,7 +348,7 @@ func BenchmarkSplittingBasedOnByteSizeManyProfilesSlightlyAboveLimit(b *testing. // Every incoming request results in a split. b.ReportAllocs() for i := 0; i < b.N; i++ { - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(0))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(0))} for j := 0; j < 10; j++ { pr2 := newProfilesRequest(testdata.GenerateProfiles(10001)) res, _ := merged[len(merged)-1].MergeSplit( @@ -368,7 +368,7 @@ func BenchmarkSplittingBasedOnByteSizeHugeProfiles(b *testing.B) { // One request splits into many batches. b.ReportAllocs() for i := 0; i < b.N; i++ { - merged := []exporterhelper.Request{newProfilesRequest(testdata.GenerateProfiles(0))} + merged := []Request{newProfilesRequest(testdata.GenerateProfiles(0))} pr2 := newProfilesRequest(testdata.GenerateProfiles(100000)) res, _ := merged[len(merged)-1].MergeSplit( context.Background(), diff --git a/exporter/exporterhelper/xexporterhelper/profiles_test.go b/exporter/exporterhelper/xexporterhelper/profiles_test.go index 687346703373..af7c79c1c4e2 100644 --- a/exporter/exporterhelper/xexporterhelper/profiles_test.go +++ b/exporter/exporterhelper/xexporterhelper/profiles_test.go @@ -53,7 +53,7 @@ func TestProfilesRequest(t *testing.T) { assert.Equal( t, newProfilesRequest(pprofile.NewProfiles()), - lr.(exporterhelper.RequestErrorHandler).OnError(profileErr), + lr.(RequestErrorHandler).OnError(profileErr), ) } @@ -70,7 +70,7 @@ func TestProfilesExporter_NilLogger(t *testing.T) { } func TestProfilesRequestExporter_NilLogger(t *testing.T) { - le, err := NewProfilesRequest(context.Background(), exporter.Settings{}, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + le, err := NewProfilesRequest(context.Background(), exporter.Settings{}, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.Nil(t, le) require.Equal(t, errNilLogger, err) } @@ -82,7 +82,7 @@ func TestProfilesExporter_NilPushProfilesData(t *testing.T) { } func TestProfilesExporter_NilProfilesConverter(t *testing.T) { - te, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[exporterhelper.Request]()) + te, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), nil, sendertest.NewNopSenderFunc[Request]()) require.Nil(t, te) require.Equal(t, errNilProfilesConverter, err) } @@ -108,7 +108,7 @@ func TestProfilesExporter_Default(t *testing.T) { func TestProfilesRequestExporter_Default(t *testing.T) { ld := pprofile.NewProfiles() le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request]()) assert.NotNil(t, le) require.NoError(t, err) @@ -130,7 +130,7 @@ func TestProfilesExporter_WithCapabilities(t *testing.T) { func TestProfilesRequestExporter_WithCapabilities(t *testing.T) { capabilities := consumer.Capabilities{MutatesData: true} le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request](), exporterhelper.WithCapabilities(capabilities)) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request](), exporterhelper.WithCapabilities(capabilities)) require.NoError(t, err) require.NotNil(t, le) @@ -150,7 +150,7 @@ func TestProfilesRequestExporter_Default_ConvertError(t *testing.T) { ld := pprofile.NewProfiles() want := errors.New("convert_error") le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(want), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + requestFromProfilesFunc(want), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, le) require.Equal(t, consumererror.NewPermanent(want), le.ConsumeProfiles(context.Background(), ld)) @@ -160,7 +160,7 @@ func TestProfilesRequestExporter_Default_ExportError(t *testing.T) { ld := pprofile.NewProfiles() want := errors.New("export_error") le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[exporterhelper.Request](want)) + requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) require.Equal(t, want, le.ConsumeProfiles(context.Background(), ld)) @@ -260,7 +260,7 @@ func TestProfilesRequestExporter_WithSpan(t *testing.T) { otel.SetTracerProvider(set.TracerProvider) defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) - le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request]()) + le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request]()) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, nil) @@ -288,7 +288,7 @@ func TestProfilesRequestExporter_WithSpan_ReturnError(t *testing.T) { defer otel.SetTracerProvider(nooptrace.NewTracerProvider()) want := errors.New("my_error") - le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[exporterhelper.Request](want)) + le, err := NewProfilesRequest(context.Background(), set, requestFromProfilesFunc(nil), sendertest.NewErrSenderFunc[Request](want)) require.NoError(t, err) require.NotNil(t, le) checkWrapSpanForProfilesExporter(t, sr, set.TracerProvider.Tracer("test"), le, want) @@ -311,7 +311,7 @@ func TestProfilesRequestExporter_WithShutdown(t *testing.T) { shutdown := func(context.Context) error { shutdownCalled = true; return nil } le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request](), exporterhelper.WithShutdown(shutdown)) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request](), exporterhelper.WithShutdown(shutdown)) assert.NotNil(t, le) require.NoError(t, err) @@ -335,7 +335,7 @@ func TestProfilesRequestExporter_WithShutdown_ReturnError(t *testing.T) { shutdownErr := func(context.Context) error { return want } le, err := NewProfilesRequest(context.Background(), exportertest.NewNopSettings(exportertest.NopType), - requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[exporterhelper.Request](), exporterhelper.WithShutdown(shutdownErr)) + requestFromProfilesFunc(nil), sendertest.NewNopSenderFunc[Request](), exporterhelper.WithShutdown(shutdownErr)) assert.NotNil(t, le) require.NoError(t, err) @@ -382,8 +382,8 @@ func checkWrapSpanForProfilesExporter(t *testing.T, sr *tracetest.SpanRecorder, } } -func requestFromProfilesFunc(err error) func(context.Context, pprofile.Profiles) (exporterhelper.Request, error) { - return func(_ context.Context, pd pprofile.Profiles) (exporterhelper.Request, error) { +func requestFromProfilesFunc(err error) func(context.Context, pprofile.Profiles) (Request, error) { + return func(_ context.Context, pd pprofile.Profiles) (Request, error) { return &requesttest.FakeRequest{Items: pd.SampleCount()}, err } } diff --git a/exporter/exporterhelper/xexporterhelper/request.go b/exporter/exporterhelper/xexporterhelper/request.go new file mode 100644 index 000000000000..9fbf3e8e847b --- /dev/null +++ b/exporter/exporterhelper/xexporterhelper/request.go @@ -0,0 +1,31 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package xexporterhelper // import "go.opentelemetry.io/collector/exporter/exporterhelper/xexporterhelper" + +import "go.opentelemetry.io/collector/exporter/exporterhelper/internal/request" // Request represents a single request that can be sent to an external endpoint. + +// Request represents a single request that can be sent to an external endpoint. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type Request = request.Request + +// RequestErrorHandler is an optional interface that can be implemented by Request to provide a way handle partial +// temporary failures. For example, if some items failed to process and can be retried, this interface allows to +// return a new Request that contains the items left to be sent. Otherwise, the original Request should be returned. +// If not implemented, the original Request will be returned assuming the error is applied to the whole Request. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestErrorHandler = request.ErrorHandler + +// RequestConverterFunc converts pdata telemetry into a user-defined Request. +// Experimental: This API is at the early stage of development and may change without backward compatibility +// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved. +type RequestConverterFunc[T any] = request.RequestConverterFunc[T] + +// RequestConsumeFunc processes the request. After the function returns, the request is no longer accessible, +// and accessing it is considered undefined behavior. +type RequestConsumeFunc = request.RequestConsumeFunc + +// RequestSizer is an interface that returns the size of the given request. +type RequestSizer = request.Sizer[Request]