diff --git a/.chloggen/xstreamencoding-unmarshaler-adapter.yaml b/.chloggen/xstreamencoding-unmarshaler-adapter.yaml new file mode 100644 index 0000000000000..29f9974bfba1f --- /dev/null +++ b/.chloggen/xstreamencoding-unmarshaler-adapter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: pkg/xstreamencoding + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add stream decoding adapters for unmarshaler interfaces + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46754] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/extension/encoding/encoding.go b/extension/encoding/encoding.go index acbd166e4c1a8..aefa30b45b388 100644 --- a/extension/encoding/encoding.go +++ b/extension/encoding/encoding.go @@ -41,10 +41,15 @@ type LogsDecoder interface { Offset() int64 } +// LogsDecoderFactory creates LogsDecoder instances for streaming log deserialization. +type LogsDecoderFactory interface { + NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error) +} + // LogsDecoderExtension is an extension that unmarshals logs from a stream. type LogsDecoderExtension interface { extension.Extension - NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error) + LogsDecoderFactory } // MetricsMarshalerExtension is an extension that marshals metrics. @@ -70,10 +75,15 @@ type MetricsDecoder interface { Offset() int64 } +// MetricsDecoderFactory creates MetricsDecoder instances for streaming metric deserialization. +type MetricsDecoderFactory interface { + NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error) +} + // MetricsDecoderExtension is an extension that unmarshals metrics from a stream. type MetricsDecoderExtension interface { extension.Extension - NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error) + MetricsDecoderFactory } // TracesMarshalerExtension is an extension that marshals traces. diff --git a/pkg/xstreamencoding/streaming.go b/pkg/xstreamencoding/streaming.go index ae8462ead01f9..ff5e967025676 100644 --- a/pkg/xstreamencoding/streaming.go +++ b/pkg/xstreamencoding/streaming.go @@ -208,3 +208,111 @@ func (a MetricsDecoderAdapter) DecodeMetrics() (pmetric.Metrics, error) { func (a MetricsDecoderAdapter) Offset() int64 { return a.offset() } + +// logsUnmarshalerDecoderFactory adapts a plog.Unmarshaler into an encoding.LogsDecoderFactory. +// It reads the entire remaining stream and delegates to the unmarshaler on the first decode call. +type logsUnmarshalerDecoderFactory struct { + unmarshaler plog.Unmarshaler +} + +// NewLogsUnmarshalerDecoderFactory returns an encoding.LogsDecoderFactory that reads the full +// stream into memory and delegates to the provided plog.Unmarshaler. +func NewLogsUnmarshalerDecoderFactory(unmarshaler plog.Unmarshaler) encoding.LogsDecoderFactory { + return &logsUnmarshalerDecoderFactory{unmarshaler: unmarshaler} +} + +func (f *logsUnmarshalerDecoderFactory) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) { + return &logsUnmarshalerDecoder{ + unmarshaler: f.unmarshaler, + reader: reader, + opts: encoding.NewDecoderOptions(options...), + }, nil +} + +type logsUnmarshalerDecoder struct { + unmarshaler plog.Unmarshaler + reader io.Reader + opts encoding.DecoderOptions + offset int64 + done bool +} + +func (d *logsUnmarshalerDecoder) DecodeLogs() (plog.Logs, error) { + if d.done { + return plog.Logs{}, io.EOF + } + d.done = true + if d.opts.Offset > 0 { + if _, err := io.CopyN(io.Discard, d.reader, d.opts.Offset); err != nil { + return plog.Logs{}, fmt.Errorf("failed to discard offset %d: %w", d.opts.Offset, err) + } + } + buf, err := io.ReadAll(d.reader) + if err != nil { + return plog.Logs{}, fmt.Errorf("failed to read stream: %w", err) + } + d.offset = d.opts.Offset + int64(len(buf)) + logs, err := d.unmarshaler.UnmarshalLogs(buf) + if err != nil { + return plog.Logs{}, err + } + return logs, nil +} + +func (d *logsUnmarshalerDecoder) Offset() int64 { + return d.offset +} + +// metricsUnmarshalerDecoderFactory adapts a pmetric.Unmarshaler into an encoding.MetricsDecoderFactory. +// It reads the entire remaining stream and delegates to the unmarshaler on the first decode call. +type metricsUnmarshalerDecoderFactory struct { + unmarshaler pmetric.Unmarshaler +} + +// NewMetricsUnmarshalerDecoderFactory returns an encoding.MetricsDecoderFactory that reads the full +// stream into memory and delegates to the provided pmetric.Unmarshaler. +func NewMetricsUnmarshalerDecoderFactory(unmarshaler pmetric.Unmarshaler) encoding.MetricsDecoderFactory { + return &metricsUnmarshalerDecoderFactory{unmarshaler: unmarshaler} +} + +func (f *metricsUnmarshalerDecoderFactory) NewMetricsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.MetricsDecoder, error) { + return &metricsUnmarshalerDecoder{ + unmarshaler: f.unmarshaler, + reader: reader, + opts: encoding.NewDecoderOptions(options...), + }, nil +} + +type metricsUnmarshalerDecoder struct { + unmarshaler pmetric.Unmarshaler + reader io.Reader + opts encoding.DecoderOptions + offset int64 + done bool +} + +func (d *metricsUnmarshalerDecoder) DecodeMetrics() (pmetric.Metrics, error) { + if d.done { + return pmetric.Metrics{}, io.EOF + } + d.done = true + if d.opts.Offset > 0 { + if _, err := io.CopyN(io.Discard, d.reader, d.opts.Offset); err != nil { + return pmetric.Metrics{}, fmt.Errorf("failed to discard offset %d: %w", d.opts.Offset, err) + } + } + buf, err := io.ReadAll(d.reader) + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("failed to read stream: %w", err) + } + d.offset = d.opts.Offset + int64(len(buf)) + metrics, err := d.unmarshaler.UnmarshalMetrics(buf) + if err != nil { + return pmetric.Metrics{}, err + } + return metrics, nil +} + +func (d *metricsUnmarshalerDecoder) Offset() int64 { + return d.offset +} diff --git a/pkg/xstreamencoding/streaming_test.go b/pkg/xstreamencoding/streaming_test.go index 5477cd8a21798..6caaf9a0a87a9 100644 --- a/pkg/xstreamencoding/streaming_test.go +++ b/pkg/xstreamencoding/streaming_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" ) @@ -140,3 +142,151 @@ func TestStreamBatchHelper_ShouldFlush(t *testing.T) { helper.IncrementItems(5) assert.True(t, helper.ShouldFlush()) } + +type stubLogsUnmarshaler struct { + logs plog.Logs + err error +} + +func (s *stubLogsUnmarshaler) UnmarshalLogs(_ []byte) (plog.Logs, error) { + return s.logs, s.err +} + +type capturingLogsUnmarshaler struct { + received *[]byte +} + +func (s *capturingLogsUnmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) { + *s.received = data + return plog.NewLogs(), nil +} + +type stubMetricsUnmarshaler struct { + metrics pmetric.Metrics + err error +} + +func (s *stubMetricsUnmarshaler) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) { + return s.metrics, s.err +} + +type capturingMetricsUnmarshaler struct { + received *[]byte +} + +func (s *capturingMetricsUnmarshaler) UnmarshalMetrics(data []byte) (pmetric.Metrics, error) { + *s.received = data + return pmetric.NewMetrics(), nil +} + +func TestLogsUnmarshalerDecoderFactory(t *testing.T) { + t.Run("decode returns result then EOF", func(t *testing.T) { + expected := plog.NewLogs() + expected.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("hello") + + factory := NewLogsUnmarshalerDecoderFactory(&stubLogsUnmarshaler{logs: expected}) + input := "some log data" + decoder, err := factory.NewLogsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + logs, err := decoder.DecodeLogs() + require.NoError(t, err) + assert.Equal(t, expected, logs) + + _, err = decoder.DecodeLogs() + assert.ErrorIs(t, err, io.EOF) + }) + + t.Run("offset equals bytes read", func(t *testing.T) { + factory := NewLogsUnmarshalerDecoderFactory(&stubLogsUnmarshaler{logs: plog.NewLogs()}) + input := "12345678" + decoder, err := factory.NewLogsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + assert.Equal(t, int64(0), decoder.Offset()) + + _, err = decoder.DecodeLogs() + require.NoError(t, err) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("with offset option skips bytes", func(t *testing.T) { + input := "XXXXXreal data" + var received []byte + captureUnmarshaler := &capturingLogsUnmarshaler{received: &received} + factory := NewLogsUnmarshalerDecoderFactory(captureUnmarshaler) + + decoder, err := factory.NewLogsDecoder(strings.NewReader(input), encoding.WithOffset(5)) + require.NoError(t, err) + + _, err = decoder.DecodeLogs() + require.NoError(t, err) + assert.Equal(t, []byte("real data"), received) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("unmarshal error propagates", func(t *testing.T) { + factory := NewLogsUnmarshalerDecoderFactory(&stubLogsUnmarshaler{err: assert.AnError}) + decoder, err := factory.NewLogsDecoder(strings.NewReader("data")) + require.NoError(t, err) + + _, err = decoder.DecodeLogs() + require.ErrorIs(t, err, assert.AnError) + }) +} + +func TestMetricsUnmarshalerDecoderFactory(t *testing.T) { + t.Run("decode returns result then EOF", func(t *testing.T) { + expected := pmetric.NewMetrics() + expected.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("test") + + factory := NewMetricsUnmarshalerDecoderFactory(&stubMetricsUnmarshaler{metrics: expected}) + input := "some metric data" + decoder, err := factory.NewMetricsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + metrics, err := decoder.DecodeMetrics() + require.NoError(t, err) + assert.Equal(t, expected, metrics) + + _, err = decoder.DecodeMetrics() + assert.ErrorIs(t, err, io.EOF) + }) + + t.Run("offset equals bytes read", func(t *testing.T) { + factory := NewMetricsUnmarshalerDecoderFactory(&stubMetricsUnmarshaler{metrics: pmetric.NewMetrics()}) + input := "12345678" + decoder, err := factory.NewMetricsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + assert.Equal(t, int64(0), decoder.Offset()) + + _, err = decoder.DecodeMetrics() + require.NoError(t, err) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("with offset option skips bytes", func(t *testing.T) { + input := "XXXXXreal data" + var received []byte + captureUnmarshaler := &capturingMetricsUnmarshaler{received: &received} + factory := NewMetricsUnmarshalerDecoderFactory(captureUnmarshaler) + + decoder, err := factory.NewMetricsDecoder(strings.NewReader(input), encoding.WithOffset(5)) + require.NoError(t, err) + + _, err = decoder.DecodeMetrics() + require.NoError(t, err) + assert.Equal(t, []byte("real data"), received) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("unmarshal error propagates", func(t *testing.T) { + factory := NewMetricsUnmarshalerDecoderFactory(&stubMetricsUnmarshaler{err: assert.AnError}) + decoder, err := factory.NewMetricsDecoder(strings.NewReader("data")) + require.NoError(t, err) + + _, err = decoder.DecodeMetrics() + require.ErrorIs(t, err, assert.AnError) + }) +}