From ecf1ef99b3564bee6b36807cc9156197330da642 Mon Sep 17 00:00:00 2001 From: Andrew Wilkins Date: Mon, 9 Mar 2026 10:26:44 +0800 Subject: [PATCH] [pkg/xstreamencoding] add unmarshaler adapters Introduce adapters for plog.LogsUnmarshaler and pmetric.MetricsUnmarshaler which, on first call, read the entire stream into memory and invoke the unmarshaler. Subsequent calls return EOF. The offset is always the entire stream size in bytes, and if an offset is provided then that many bytes will discarded first. We also extract LogsDecoderFactory and MetricsDecoderFactory interfaces from LogsDecoderExtension and MetricsDecoderExtension in the encoding package. --- .../xstreamencoding-unmarshaler-adapter.yaml | 27 ++++ extension/encoding/encoding.go | 14 +- pkg/xstreamencoding/streaming.go | 108 +++++++++++++ pkg/xstreamencoding/streaming_test.go | 150 ++++++++++++++++++ 4 files changed, 297 insertions(+), 2 deletions(-) create mode 100644 .chloggen/xstreamencoding-unmarshaler-adapter.yaml diff --git a/.chloggen/xstreamencoding-unmarshaler-adapter.yaml b/.chloggen/xstreamencoding-unmarshaler-adapter.yaml new file mode 100644 index 0000000000000..29f9974bfba1f --- /dev/null +++ b/.chloggen/xstreamencoding-unmarshaler-adapter.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: pkg/xstreamencoding + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Add stream decoding adapters for unmarshaler interfaces + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46754] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [api] diff --git a/extension/encoding/encoding.go b/extension/encoding/encoding.go index acbd166e4c1a8..aefa30b45b388 100644 --- a/extension/encoding/encoding.go +++ b/extension/encoding/encoding.go @@ -41,10 +41,15 @@ type LogsDecoder interface { Offset() int64 } +// LogsDecoderFactory creates LogsDecoder instances for streaming log deserialization. +type LogsDecoderFactory interface { + NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error) +} + // LogsDecoderExtension is an extension that unmarshals logs from a stream. type LogsDecoderExtension interface { extension.Extension - NewLogsDecoder(reader io.Reader, options ...DecoderOption) (LogsDecoder, error) + LogsDecoderFactory } // MetricsMarshalerExtension is an extension that marshals metrics. @@ -70,10 +75,15 @@ type MetricsDecoder interface { Offset() int64 } +// MetricsDecoderFactory creates MetricsDecoder instances for streaming metric deserialization. +type MetricsDecoderFactory interface { + NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error) +} + // MetricsDecoderExtension is an extension that unmarshals metrics from a stream. type MetricsDecoderExtension interface { extension.Extension - NewMetricsDecoder(reader io.Reader, options ...DecoderOption) (MetricsDecoder, error) + MetricsDecoderFactory } // TracesMarshalerExtension is an extension that marshals traces. diff --git a/pkg/xstreamencoding/streaming.go b/pkg/xstreamencoding/streaming.go index ae8462ead01f9..ff5e967025676 100644 --- a/pkg/xstreamencoding/streaming.go +++ b/pkg/xstreamencoding/streaming.go @@ -208,3 +208,111 @@ func (a MetricsDecoderAdapter) DecodeMetrics() (pmetric.Metrics, error) { func (a MetricsDecoderAdapter) Offset() int64 { return a.offset() } + +// logsUnmarshalerDecoderFactory adapts a plog.Unmarshaler into an encoding.LogsDecoderFactory. +// It reads the entire remaining stream and delegates to the unmarshaler on the first decode call. +type logsUnmarshalerDecoderFactory struct { + unmarshaler plog.Unmarshaler +} + +// NewLogsUnmarshalerDecoderFactory returns an encoding.LogsDecoderFactory that reads the full +// stream into memory and delegates to the provided plog.Unmarshaler. +func NewLogsUnmarshalerDecoderFactory(unmarshaler plog.Unmarshaler) encoding.LogsDecoderFactory { + return &logsUnmarshalerDecoderFactory{unmarshaler: unmarshaler} +} + +func (f *logsUnmarshalerDecoderFactory) NewLogsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.LogsDecoder, error) { + return &logsUnmarshalerDecoder{ + unmarshaler: f.unmarshaler, + reader: reader, + opts: encoding.NewDecoderOptions(options...), + }, nil +} + +type logsUnmarshalerDecoder struct { + unmarshaler plog.Unmarshaler + reader io.Reader + opts encoding.DecoderOptions + offset int64 + done bool +} + +func (d *logsUnmarshalerDecoder) DecodeLogs() (plog.Logs, error) { + if d.done { + return plog.Logs{}, io.EOF + } + d.done = true + if d.opts.Offset > 0 { + if _, err := io.CopyN(io.Discard, d.reader, d.opts.Offset); err != nil { + return plog.Logs{}, fmt.Errorf("failed to discard offset %d: %w", d.opts.Offset, err) + } + } + buf, err := io.ReadAll(d.reader) + if err != nil { + return plog.Logs{}, fmt.Errorf("failed to read stream: %w", err) + } + d.offset = d.opts.Offset + int64(len(buf)) + logs, err := d.unmarshaler.UnmarshalLogs(buf) + if err != nil { + return plog.Logs{}, err + } + return logs, nil +} + +func (d *logsUnmarshalerDecoder) Offset() int64 { + return d.offset +} + +// metricsUnmarshalerDecoderFactory adapts a pmetric.Unmarshaler into an encoding.MetricsDecoderFactory. +// It reads the entire remaining stream and delegates to the unmarshaler on the first decode call. +type metricsUnmarshalerDecoderFactory struct { + unmarshaler pmetric.Unmarshaler +} + +// NewMetricsUnmarshalerDecoderFactory returns an encoding.MetricsDecoderFactory that reads the full +// stream into memory and delegates to the provided pmetric.Unmarshaler. +func NewMetricsUnmarshalerDecoderFactory(unmarshaler pmetric.Unmarshaler) encoding.MetricsDecoderFactory { + return &metricsUnmarshalerDecoderFactory{unmarshaler: unmarshaler} +} + +func (f *metricsUnmarshalerDecoderFactory) NewMetricsDecoder(reader io.Reader, options ...encoding.DecoderOption) (encoding.MetricsDecoder, error) { + return &metricsUnmarshalerDecoder{ + unmarshaler: f.unmarshaler, + reader: reader, + opts: encoding.NewDecoderOptions(options...), + }, nil +} + +type metricsUnmarshalerDecoder struct { + unmarshaler pmetric.Unmarshaler + reader io.Reader + opts encoding.DecoderOptions + offset int64 + done bool +} + +func (d *metricsUnmarshalerDecoder) DecodeMetrics() (pmetric.Metrics, error) { + if d.done { + return pmetric.Metrics{}, io.EOF + } + d.done = true + if d.opts.Offset > 0 { + if _, err := io.CopyN(io.Discard, d.reader, d.opts.Offset); err != nil { + return pmetric.Metrics{}, fmt.Errorf("failed to discard offset %d: %w", d.opts.Offset, err) + } + } + buf, err := io.ReadAll(d.reader) + if err != nil { + return pmetric.Metrics{}, fmt.Errorf("failed to read stream: %w", err) + } + d.offset = d.opts.Offset + int64(len(buf)) + metrics, err := d.unmarshaler.UnmarshalMetrics(buf) + if err != nil { + return pmetric.Metrics{}, err + } + return metrics, nil +} + +func (d *metricsUnmarshalerDecoder) Offset() int64 { + return d.offset +} diff --git a/pkg/xstreamencoding/streaming_test.go b/pkg/xstreamencoding/streaming_test.go index 5477cd8a21798..6caaf9a0a87a9 100644 --- a/pkg/xstreamencoding/streaming_test.go +++ b/pkg/xstreamencoding/streaming_test.go @@ -11,6 +11,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + "go.opentelemetry.io/collector/pdata/pmetric" "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" ) @@ -140,3 +142,151 @@ func TestStreamBatchHelper_ShouldFlush(t *testing.T) { helper.IncrementItems(5) assert.True(t, helper.ShouldFlush()) } + +type stubLogsUnmarshaler struct { + logs plog.Logs + err error +} + +func (s *stubLogsUnmarshaler) UnmarshalLogs(_ []byte) (plog.Logs, error) { + return s.logs, s.err +} + +type capturingLogsUnmarshaler struct { + received *[]byte +} + +func (s *capturingLogsUnmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) { + *s.received = data + return plog.NewLogs(), nil +} + +type stubMetricsUnmarshaler struct { + metrics pmetric.Metrics + err error +} + +func (s *stubMetricsUnmarshaler) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) { + return s.metrics, s.err +} + +type capturingMetricsUnmarshaler struct { + received *[]byte +} + +func (s *capturingMetricsUnmarshaler) UnmarshalMetrics(data []byte) (pmetric.Metrics, error) { + *s.received = data + return pmetric.NewMetrics(), nil +} + +func TestLogsUnmarshalerDecoderFactory(t *testing.T) { + t.Run("decode returns result then EOF", func(t *testing.T) { + expected := plog.NewLogs() + expected.ResourceLogs().AppendEmpty().ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("hello") + + factory := NewLogsUnmarshalerDecoderFactory(&stubLogsUnmarshaler{logs: expected}) + input := "some log data" + decoder, err := factory.NewLogsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + logs, err := decoder.DecodeLogs() + require.NoError(t, err) + assert.Equal(t, expected, logs) + + _, err = decoder.DecodeLogs() + assert.ErrorIs(t, err, io.EOF) + }) + + t.Run("offset equals bytes read", func(t *testing.T) { + factory := NewLogsUnmarshalerDecoderFactory(&stubLogsUnmarshaler{logs: plog.NewLogs()}) + input := "12345678" + decoder, err := factory.NewLogsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + assert.Equal(t, int64(0), decoder.Offset()) + + _, err = decoder.DecodeLogs() + require.NoError(t, err) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("with offset option skips bytes", func(t *testing.T) { + input := "XXXXXreal data" + var received []byte + captureUnmarshaler := &capturingLogsUnmarshaler{received: &received} + factory := NewLogsUnmarshalerDecoderFactory(captureUnmarshaler) + + decoder, err := factory.NewLogsDecoder(strings.NewReader(input), encoding.WithOffset(5)) + require.NoError(t, err) + + _, err = decoder.DecodeLogs() + require.NoError(t, err) + assert.Equal(t, []byte("real data"), received) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("unmarshal error propagates", func(t *testing.T) { + factory := NewLogsUnmarshalerDecoderFactory(&stubLogsUnmarshaler{err: assert.AnError}) + decoder, err := factory.NewLogsDecoder(strings.NewReader("data")) + require.NoError(t, err) + + _, err = decoder.DecodeLogs() + require.ErrorIs(t, err, assert.AnError) + }) +} + +func TestMetricsUnmarshalerDecoderFactory(t *testing.T) { + t.Run("decode returns result then EOF", func(t *testing.T) { + expected := pmetric.NewMetrics() + expected.ResourceMetrics().AppendEmpty().ScopeMetrics().AppendEmpty().Metrics().AppendEmpty().SetName("test") + + factory := NewMetricsUnmarshalerDecoderFactory(&stubMetricsUnmarshaler{metrics: expected}) + input := "some metric data" + decoder, err := factory.NewMetricsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + metrics, err := decoder.DecodeMetrics() + require.NoError(t, err) + assert.Equal(t, expected, metrics) + + _, err = decoder.DecodeMetrics() + assert.ErrorIs(t, err, io.EOF) + }) + + t.Run("offset equals bytes read", func(t *testing.T) { + factory := NewMetricsUnmarshalerDecoderFactory(&stubMetricsUnmarshaler{metrics: pmetric.NewMetrics()}) + input := "12345678" + decoder, err := factory.NewMetricsDecoder(strings.NewReader(input)) + require.NoError(t, err) + + assert.Equal(t, int64(0), decoder.Offset()) + + _, err = decoder.DecodeMetrics() + require.NoError(t, err) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("with offset option skips bytes", func(t *testing.T) { + input := "XXXXXreal data" + var received []byte + captureUnmarshaler := &capturingMetricsUnmarshaler{received: &received} + factory := NewMetricsUnmarshalerDecoderFactory(captureUnmarshaler) + + decoder, err := factory.NewMetricsDecoder(strings.NewReader(input), encoding.WithOffset(5)) + require.NoError(t, err) + + _, err = decoder.DecodeMetrics() + require.NoError(t, err) + assert.Equal(t, []byte("real data"), received) + assert.Equal(t, int64(len(input)), decoder.Offset()) + }) + + t.Run("unmarshal error propagates", func(t *testing.T) { + factory := NewMetricsUnmarshalerDecoderFactory(&stubMetricsUnmarshaler{err: assert.AnError}) + decoder, err := factory.NewMetricsDecoder(strings.NewReader("data")) + require.NoError(t, err) + + _, err = decoder.DecodeMetrics() + require.ErrorIs(t, err, assert.AnError) + }) +}