Skip to content

Commit

Permalink
[exporterhelper] Adds functions to convert pdata types into Request
Browse files Browse the repository at this point in the history
This also helps do reduce the duplicated logic between old and new exporter helpers
  • Loading branch information
dmitryax committed Feb 19, 2024
1 parent 0ab8f44 commit a0095a2
Show file tree
Hide file tree
Showing 8 changed files with 198 additions and 164 deletions.
29 changes: 29 additions & 0 deletions .chloggen/add-pdata-request-converters.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. otlpreceiver)
component: exporterhelper

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: Adds functions to convert pdata types into Request.

# One or more tracking issues or pull requests related to the change
issues: [9603]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
The following functions are available now that can be used as converters in the new exporter helper:
- RequestFromTraces
- RequestFromMetrics
- RequestFromLogs
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: [api]
38 changes: 25 additions & 13 deletions exporter/exporterhelper/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ func WithRetry(config configretry.BackOffConfig) Option {
// This option cannot be used with the new exporter helpers New[Traces|Metrics|Logs]RequestExporter.
func WithQueue(config QueueSettings) Option {
return func(o *baseExporter) {
if o.requestExporter {
if o.marshaler == nil || o.unmarshaler == nil {
panic("WithQueue option is not available for the new request exporters, use WithRequestQueue instead")
}
if !config.Enabled {
Expand Down Expand Up @@ -114,6 +114,9 @@ func WithQueue(config QueueSettings) Option {
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func WithRequestQueue(cfg exporterqueue.Config, queueFactory exporterqueue.Factory[Request]) Option {
return func(o *baseExporter) {
if o.marshaler != nil || o.unmarshaler != nil {
panic("WithRequestQueue option must be used with the new request exporters only, use WithQueue instead")

Check warning on line 118 in exporter/exporterhelper/common.go

View check run for this annotation

Codecov / codecov/patch

exporter/exporterhelper/common.go#L118

Added line #L118 was not covered by tests
}
if !cfg.Enabled {
o.exportFailureMessage += " Try enabling sending_queue to survive temporary failures."
return
Expand All @@ -135,15 +138,30 @@ func WithCapabilities(capabilities consumer.Capabilities) Option {
}
}

// withMarshaler is used to set the request marshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withMarshaler(marshaler exporterqueue.Marshaler[Request]) Option {
return func(o *baseExporter) {
o.marshaler = marshaler
}
}

// withUnmarshaler is used to set the request unmarshaler for the new exporter helper.
// It must be provided as the first option when creating a new exporter helper.
func withUnmarshaler(unmarshaler exporterqueue.Unmarshaler[Request]) Option {
return func(o *baseExporter) {
o.unmarshaler = unmarshaler
}
}

// baseExporter contains common fields between different exporter types.
type baseExporter struct {
component.StartFunc
component.ShutdownFunc

requestExporter bool
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType
marshaler exporterqueue.Marshaler[Request]
unmarshaler exporterqueue.Unmarshaler[Request]
signal component.DataType

set exporter.CreateSettings
obsrep *ObsReport
Expand All @@ -162,20 +180,14 @@ type baseExporter struct {
consumerOptions []consumer.Option
}

// TODO: requestExporter, marshaler, and unmarshaler arguments can be removed when the old exporter helpers will be updated to call the new ones.
func newBaseExporter(set exporter.CreateSettings, signal component.DataType, requestExporter bool,
marshaler exporterqueue.Marshaler[Request], unmarshaler exporterqueue.Unmarshaler[Request], osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {

func newBaseExporter(set exporter.CreateSettings, signal component.DataType, osf obsrepSenderFactory, options ...Option) (*baseExporter, error) {
obsReport, err := NewObsReport(ObsReportSettings{ExporterID: set.ID, ExporterCreateSettings: set})
if err != nil {
return nil, err
}

be := &baseExporter{
requestExporter: requestExporter,
marshaler: marshaler,
unmarshaler: unmarshaler,
signal: signal,
signal: signal,

queueSender: &baseRequestSender{},
obsrepSender: osf(obsReport),
Expand Down
18 changes: 7 additions & 11 deletions exporter/exporterhelper/common_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,7 @@ func newNoopObsrepSender(*ObsReport) requestSender {
}

func TestBaseExporter(t *testing.T) {
be, err := newBaseExporter(defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
be, err = newBaseExporter(defaultSettings, defaultType, true, nil, nil, newNoopObsrepSender)
be, err := newBaseExporter(defaultSettings, defaultType, newNoopObsrepSender)
require.NoError(t, err)
require.NoError(t, be.Start(context.Background(), componenttest.NewNopHost()))
require.NoError(t, be.Shutdown(context.Background()))
Expand All @@ -49,7 +45,7 @@ func TestBaseExporter(t *testing.T) {
func TestBaseExporterWithOptions(t *testing.T) {
want := errors.New("my error")
be, err := newBaseExporter(
defaultSettings, defaultType, false, nil, nil, newNoopObsrepSender,
defaultSettings, defaultType, newNoopObsrepSender,
WithStart(func(context.Context, component.Host) error { return want }),
WithShutdown(func(context.Context) error { return want }),
WithTimeout(NewDefaultTimeoutSettings()),
Expand All @@ -69,12 +65,13 @@ func checkStatus(t *testing.T, sd sdktrace.ReadOnlySpan, err error) {
}

func TestQueueRetryOptionsWithRequestExporter(t *testing.T) {
bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender,
bs, err := newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()))
require.Nil(t, err)
require.True(t, bs.requestExporter)
require.Nil(t, bs.marshaler)
require.Nil(t, bs.unmarshaler)
require.Panics(t, func() {
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, true, nil, nil, newNoopObsrepSender,
_, _ = newBaseExporter(exportertest.NewNopCreateSettings(), defaultType, newNoopObsrepSender,
WithRetry(configretry.NewDefaultBackOffConfig()), WithQueue(NewDefaultQueueSettings()))
})
}
Expand All @@ -85,9 +82,8 @@ func TestBaseExporterLogging(t *testing.T) {
set.Logger = zap.New(logger)
rCfg := configretry.NewDefaultBackOffConfig()
rCfg.Enabled = false
bs, err := newBaseExporter(set, defaultType, true, nil, nil, newNoopObsrepSender, WithRetry(rCfg))
bs, err := newBaseExporter(set, defaultType, newNoopObsrepSender, WithRetry(rCfg))
require.Nil(t, err)
require.True(t, bs.requestExporter)
sendErr := bs.send(context.Background(), newErrorRequest())
require.Error(t, sendErr)

Expand Down
40 changes: 13 additions & 27 deletions exporter/exporterhelper/logs.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type logsExporter struct {

// NewLogsExporter creates an exporter.Logs that records observability metrics and wraps every request with a Span.
func NewLogsExporter(
_ context.Context,
ctx context.Context,
set exporter.CreateSettings,
cfg component.Config,
pusher consumer.ConsumeLogsFunc,
Expand All @@ -79,41 +79,27 @@ func NewLogsExporter(
if cfg == nil {
return nil, errNilConfig
}

if set.Logger == nil {
return nil, errNilLogger
}

if pusher == nil {
return nil, errNilPushLogsData
}

be, err := newBaseExporter(set, component.DataTypeLogs, false, logsRequestMarshaler,
newLogsRequestUnmarshalerFunc(pusher), newLogsExporterWithObservability, options...)
if err != nil {
return nil, err
}

lc, err := consumer.NewLogs(func(ctx context.Context, ld plog.Logs) error {
req := newLogsRequest(ld, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeLogs, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)

return &logsExporter{
baseExporter: be,
Logs: lc,
}, err
logsOpts := []Option{withMarshaler(logsRequestMarshaler), withUnmarshaler(newLogsRequestUnmarshalerFunc(pusher))}
return NewLogsRequestExporter(ctx, set, RequestFromLogs(pusher), append(logsOpts, options...)...)
}

// RequestFromLogsFunc converts plog.Logs data into a user-defined request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestFromLogsFunc func(context.Context, plog.Logs) (Request, error)

// RequestFromLogs returns a RequestFromLogsFunc that converts plog.Logs into a Request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func RequestFromLogs(pusher consumer.ConsumeLogsFunc) RequestFromLogsFunc {
return func(ctx context.Context, ld plog.Logs) (Request, error) {
return newLogsRequest(ld, pusher), nil
}
}

// NewLogsRequestExporter creates new logs exporter based on custom LogsConverter and RequestSender.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand All @@ -131,7 +117,7 @@ func NewLogsRequestExporter(
return nil, errNilLogsConverter
}

be, err := newBaseExporter(set, component.DataTypeLogs, true, nil, nil, newLogsExporterWithObservability, options...)
be, err := newBaseExporter(set, component.DataTypeLogs, newLogsExporterWithObservability, options...)
if err != nil {
return nil, err
}
Expand Down
40 changes: 13 additions & 27 deletions exporter/exporterhelper/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ type metricsExporter struct {

// NewMetricsExporter creates an exporter.Metrics that records observability metrics and wraps every request with a Span.
func NewMetricsExporter(
_ context.Context,
ctx context.Context,
set exporter.CreateSettings,
cfg component.Config,
pusher consumer.ConsumeMetricsFunc,
Expand All @@ -79,41 +79,27 @@ func NewMetricsExporter(
if cfg == nil {
return nil, errNilConfig
}

if set.Logger == nil {
return nil, errNilLogger
}

if pusher == nil {
return nil, errNilPushMetricsData
}

be, err := newBaseExporter(set, component.DataTypeMetrics, false, metricsRequestMarshaler,
newMetricsRequestUnmarshalerFunc(pusher), newMetricsSenderWithObservability, options...)
if err != nil {
return nil, err
}

mc, err := consumer.NewMetrics(func(ctx context.Context, md pmetric.Metrics) error {
req := newMetricsRequest(md, pusher)
serr := be.send(ctx, req)
if errors.Is(serr, queue.ErrQueueIsFull) {
be.obsrep.recordEnqueueFailure(ctx, component.DataTypeMetrics, int64(req.ItemsCount()))
}
return serr
}, be.consumerOptions...)

return &metricsExporter{
baseExporter: be,
Metrics: mc,
}, err
metricsOpts := []Option{withMarshaler(metricsRequestMarshaler), withUnmarshaler(newMetricsRequestUnmarshalerFunc(pusher))}
return NewMetricsRequestExporter(ctx, set, RequestFromMetrics(pusher), append(metricsOpts, options...)...)
}

// RequestFromMetricsFunc converts pdata.Metrics into a user-defined request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
type RequestFromMetricsFunc func(context.Context, pmetric.Metrics) (Request, error)

// RequestFromMetrics returns a RequestFromMetricsFunc that converts pdata.Metrics into a Request.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
func RequestFromMetrics(pusher consumer.ConsumeMetricsFunc) RequestFromMetricsFunc {
return func(ctx context.Context, md pmetric.Metrics) (Request, error) {
return newMetricsRequest(md, pusher), nil
}
}

// NewMetricsRequestExporter creates a new metrics exporter based on a custom MetricsConverter and RequestSender.
// This API is at the early stage of development and may change without backward compatibility
// until https://github.com/open-telemetry/opentelemetry-collector/issues/8122 is resolved.
Expand All @@ -131,7 +117,7 @@ func NewMetricsRequestExporter(
return nil, errNilMetricsConverter
}

be, err := newBaseExporter(set, component.DataTypeMetrics, true, nil, nil, newMetricsSenderWithObservability, options...)
be, err := newBaseExporter(set, component.DataTypeMetrics, newMetricsSenderWithObservability, options...)
if err != nil {
return nil, err
}
Expand Down
Loading

0 comments on commit a0095a2

Please sign in to comment.