diff --git a/.chloggen/awsfirehosereceiver-cwlogs-opt.yaml b/.chloggen/awsfirehosereceiver-cwlogs-opt.yaml new file mode 100644 index 0000000000000..bbc35baff3a87 --- /dev/null +++ b/.chloggen/awsfirehosereceiver-cwlogs-opt.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. filelogreceiver) +component: awsfirehosereceiver + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Optimise awsfirereceiver's cwlogs record type unmarshaler + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [37111] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go index 2ebca77861dd2..ac83d7c1c846a 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression/compression.go @@ -28,21 +28,3 @@ func Zip(data []byte) ([]byte, error) { return b.Bytes(), nil } - -// Unzip expects gzip-compressed input bytes and returns their uncompressed form. -func Unzip(data []byte) ([]byte, error) { - b := bytes.NewBuffer(data) - - r, err := gzip.NewReader(b) - if err != nil { - return nil, err - } - - var rv bytes.Buffer - _, err = rv.ReadFrom(r) - if err != nil { - return nil, err - } - - return rv.Bytes(), nil -} diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go index b3fa132166b53..12259359d09b5 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler.go @@ -5,19 +5,19 @@ package cwlog // import "github.com/open-telemetry/opentelemetry-collector-contr import ( "bytes" + "compress/gzip" "encoding/json" "errors" + "io" "go.opentelemetry.io/collector/pdata/plog" "go.uber.org/zap" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/compression" ) const ( - TypeStr = "cwlogs" - recordDelimiter = "\n" + TypeStr = "cwlogs" ) var errInvalidRecords = errors.New("record format invalid") @@ -41,7 +41,7 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { md := plog.NewLogs() builders := make(map[resourceAttributes]*resourceLogsBuilder) for recordIndex, compressedRecord := range records { - record, err := compression.Unzip(compressedRecord) + reader, err := gzip.NewReader(bytes.NewReader(compressedRecord)) if err != nil { u.logger.Error("Failed to unzip record", zap.Error(err), @@ -50,39 +50,40 @@ func (u Unmarshaler) Unmarshal(records [][]byte) (plog.Logs, error) { continue } // Multiple logs in each record separated by newline character - for datumIndex, datum := range bytes.Split(record, []byte(recordDelimiter)) { - if len(datum) > 0 { - var log cWLog - err := json.Unmarshal(datum, &log) - if err != nil { - u.logger.Error( - "Unable to unmarshal input", - zap.Error(err), - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue + decoder := json.NewDecoder(reader) + for datumIndex := 0; ; datumIndex++ { + var log cWLog + if err := decoder.Decode(&log); err != nil { + if errors.Is(err, io.EOF) { + break } - if !u.isValid(log) { - u.logger.Error( - "Invalid log", - zap.Int("datum_index", datumIndex), - zap.Int("record_index", recordIndex), - ) - continue - } - attrs := resourceAttributes{ - owner: log.Owner, - logGroup: log.LogGroup, - logStream: log.LogStream, - } - lb, ok := builders[attrs] - if !ok { - lb = newResourceLogsBuilder(md, attrs) - builders[attrs] = lb - } - lb.AddLog(log) + u.logger.Error( + "Unable to unmarshal input", + zap.Error(err), + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + if !u.isValid(log) { + u.logger.Error( + "Invalid log", + zap.Int("datum_index", datumIndex), + zap.Int("record_index", recordIndex), + ) + continue + } + attrs := resourceAttributes{ + owner: log.Owner, + logGroup: log.LogGroup, + logStream: log.LogStream, + } + lb, ok := builders[attrs] + if !ok { + lb = newResourceLogsBuilder(md, attrs) + builders[attrs] = lb } + lb.AddLog(log) } } diff --git a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go index 71b49295df609..3c5ab6a449639 100644 --- a/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go +++ b/receiver/awsfirehosereceiver/internal/unmarshaler/cwlog/unmarshaler_test.go @@ -103,3 +103,20 @@ func TestLogTimestamp(t *testing.T) { expectedTimestamp := "2024-09-05 13:47:15.523 +0000 UTC" require.Equal(t, expectedTimestamp, ilm.LogRecords().At(0).Timestamp().String()) } + +func BenchmarkUnmarshal(b *testing.B) { + unmarshaler := NewUnmarshaler(zap.NewNop()) + record, err := os.ReadFile(filepath.Join("testdata", "multiple_resources")) + require.NoError(b, err) + + compressedRecord, err := compression.Zip(record) + require.NoError(b, err) + records := [][]byte{compressedRecord} + + for i := 0; i < b.N; i++ { + _, err := unmarshaler.Unmarshal(records) + if err != nil { + b.Fatal(err) + } + } +}