diff --git a/.chloggen/feat_aws-lambda-receiver-stream-support.yaml b/.chloggen/feat_aws-lambda-receiver-stream-support.yaml new file mode 100644 index 0000000000000..b43d9247ee24a --- /dev/null +++ b/.chloggen/feat_aws-lambda-receiver-stream-support.yaml @@ -0,0 +1,27 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/awslambda + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Adopt encoding extension streaming for AWS Lambda receiver + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46608] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awslambdareceiver/README.md b/receiver/awslambdareceiver/README.md index ba23045685ae2..c35d63d4ffba8 100644 --- a/receiver/awslambdareceiver/README.md +++ b/receiver/awslambdareceiver/README.md @@ -84,6 +84,10 @@ Consider following notes on default behaviors: - When `cloudwatch::encoding` is not specified, the receiver defaults to parsing CloudWatch Logs messages to OpenTelemetry log records. - For metrics, the default behavior is to decode using `awscloudwatchmetricstreams_encoding` extension. +> [!NOTE] +> The receiver supports end to end streaming utilizing encoding extension streaming capabilities. +> For extensions that does not support streaming, xstreamencoding wrapper will be used where full payload get processed at once. + Given below are example configurations for various use cases. ### Example 1: VPC Flow Logs from S3 diff --git a/receiver/awslambdareceiver/benchmark_test.go b/receiver/awslambdareceiver/benchmark_test.go index eddaeab092dad..b07395b4c81ca 100644 --- a/receiver/awslambdareceiver/benchmark_test.go +++ b/receiver/awslambdareceiver/benchmark_test.go @@ -44,7 +44,7 @@ func BenchmarkHandleS3Notification(b *testing.B) { enrichS3Logs(logs, event) return consumer.ConsumeLogs(ctx, logs) } - handler := newS3Handler(service, zap.NewNop(), customLogUnmarshaler{}.UnmarshalLogs, logsConsumer) + handler := newS3LogsHandler(service, zap.NewNop(), &customLogUnmarshaler{}, logsConsumer) b.Run("HandleS3Event", func(b *testing.B) { b.ReportAllocs() diff --git a/receiver/awslambdareceiver/go.mod b/receiver/awslambdareceiver/go.mod index eedf9335b5f08..abe90f57554bf 100644 --- a/receiver/awslambdareceiver/go.mod +++ b/receiver/awslambdareceiver/go.mod @@ -7,8 +7,10 @@ require ( github.com/aws/aws-sdk-go-v2/config v1.32.11 github.com/aws/aws-sdk-go-v2/service/s3 v1.96.4 github.com/goccy/go-json v0.10.5 + github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding v0.147.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden v0.147.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.147.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding v0.147.0 github.com/stretchr/testify v1.11.1 go.opentelemetry.io/collector/component v1.53.1-0.20260312104527-c74f90fe3922 go.opentelemetry.io/collector/component/componenttest v0.147.1-0.20260312104527-c74f90fe3922 @@ -17,6 +19,7 @@ require ( go.opentelemetry.io/collector/consumer v1.53.1-0.20260312104527-c74f90fe3922 go.opentelemetry.io/collector/consumer/consumererror v0.147.1-0.20260312104527-c74f90fe3922 go.opentelemetry.io/collector/consumer/consumertest v0.147.1-0.20260312104527-c74f90fe3922 + go.opentelemetry.io/collector/extension v1.53.1-0.20260312104527-c74f90fe3922 go.opentelemetry.io/collector/pdata v1.53.1-0.20260312104527-c74f90fe3922 go.opentelemetry.io/collector/receiver v1.53.1-0.20260312104527-c74f90fe3922 go.opentelemetry.io/collector/receiver/receivertest v0.147.1-0.20260312104527-c74f90fe3922 @@ -92,4 +95,8 @@ replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden => ../../pkg/golden +replace github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding => ../../pkg/xstreamencoding + +replace github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding => ../../extension/encoding + tool go.uber.org/mock/mockgen diff --git a/receiver/awslambdareceiver/go.sum b/receiver/awslambdareceiver/go.sum index 7610ec2e6592b..5d320af727348 100644 --- a/receiver/awslambdareceiver/go.sum +++ b/receiver/awslambdareceiver/go.sum @@ -113,6 +113,8 @@ go.opentelemetry.io/collector/consumer/consumertest v0.147.1-0.20260312104527-c7 go.opentelemetry.io/collector/consumer/consumertest v0.147.1-0.20260312104527-c74f90fe3922/go.mod h1:QWGFRmeYNbKaseDTNT3a2iGDmjl+DCZnLzMP7Rjj0JM= go.opentelemetry.io/collector/consumer/xconsumer v0.147.1-0.20260312104527-c74f90fe3922 h1:zvwakBvjvSGAAmjj8oiCWw6LHgWwr2U8SHnsKMTNrmY= go.opentelemetry.io/collector/consumer/xconsumer v0.147.1-0.20260312104527-c74f90fe3922/go.mod h1:mtwh1VsUoGjxwdmXEzjbswH7KAGByJNCIMHmhqwXeK0= +go.opentelemetry.io/collector/extension v1.53.1-0.20260312104527-c74f90fe3922 h1:tiT0YJzDXGSYmg35eSgHaJAIwqPYp9bH4QzFTl6WFkA= +go.opentelemetry.io/collector/extension v1.53.1-0.20260312104527-c74f90fe3922/go.mod h1:+xvUomF8thLpO+NRsA8m/9GkDChvoNRqaLqCLVCeMIo= go.opentelemetry.io/collector/featuregate v1.53.1-0.20260312104527-c74f90fe3922 h1:ZCfYa0H43+as7y6WeeUrIOJSwfuoSgogu+dmaeHxD5U= go.opentelemetry.io/collector/featuregate v1.53.1-0.20260312104527-c74f90fe3922/go.mod h1:PS7zY/zaCb28EqciePVwRHVhc3oKortTFXsi3I6ee4g= go.opentelemetry.io/collector/internal/componentalias v0.147.1-0.20260312104527-c74f90fe3922 h1:Mt5Ybbp2854fsvKvaic6n5BGXmi1EPvoXmE+ZjMfLKQ= diff --git a/receiver/awslambdareceiver/handler.go b/receiver/awslambdareceiver/handler.go index 4591de6b8d1eb..73af3867738f9 100644 --- a/receiver/awslambdareceiver/handler.go +++ b/receiver/awslambdareceiver/handler.go @@ -4,6 +4,7 @@ package awslambdareceiver // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver" import ( + "bufio" "bytes" "compress/gzip" "context" @@ -11,9 +12,8 @@ import ( "encoding/json" "errors" "fmt" + "io" "strings" - "time" - "unicode/utf8" "github.com/aws/aws-lambda-go/events" gojson "github.com/goccy/go-json" @@ -24,18 +24,23 @@ import ( conventions "go.opentelemetry.io/otel/semconv/v1.38.0" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" - "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal/metadata" ) +// s3StreamBatchSize defines the size of data chunks read from S3 object stream for processing. +const s3StreamBatchSize = 10_000_000 // 10MB chunks + +// readerBufferSize defines the buffer size for buffered readers. +const readerBufferSize = 128 * 1024 // 128KB buffer size + type emits interface { plog.Logs | pmetric.Metrics } type ( - unmarshalFunc[T emits] func([]byte) (T, error) s3EventConsumerFunc[T emits] func(context.Context, events.S3EventRecord, T) error - handlerRegistry map[eventType]func() lambdaEventHandler + handlerRegistry map[eventType]lambdaEventHandler ) type handlerProvider interface { @@ -43,11 +48,10 @@ type handlerProvider interface { } // handlerProvider is responsible for providing event handlers based on event types. -// It operates with a registry of handler factories and caches loadedHandlers for reuse. +// It operates with a registry of handlers and caches loadedHandlers for reuse. type handlerProviderImpl struct { - registry handlerRegistry - loadedHandlers map[eventType]lambdaEventHandler - knownTypes []string + registry handlerRegistry + knownTypes []string } func newHandlerProvider(registry handlerRegistry) handlerProvider { @@ -57,24 +61,17 @@ func newHandlerProvider(registry handlerRegistry) handlerProvider { } return &handlerProviderImpl{ - loadedHandlers: map[eventType]lambdaEventHandler{}, - registry: registry, - knownTypes: types, + registry: registry, + knownTypes: types, } } func (h *handlerProviderImpl) getHandler(eventType eventType) (lambdaEventHandler, error) { - if loaded, exists := h.loadedHandlers[eventType]; exists { - return loaded, nil - } - - factory, exists := h.registry[eventType] + handler, exists := h.registry[eventType] if !exists { return nil, fmt.Errorf("no handler registered for event type %s, known types: '%s'", eventType, strings.Join(h.knownTypes, ",")) } - handler := factory() - h.loadedHandlers[eventType] = handler return handler, nil } @@ -85,32 +82,100 @@ type lambdaEventHandler interface { } // s3Handler is specialized in S3 object event handling -type s3Handler[T emits] struct { - s3Service internal.S3Service - logger *zap.Logger - s3Unmarshaler unmarshalFunc[T] - consumer s3EventConsumerFunc[T] +type s3Handler struct { + s3Service internal.S3Service + logger *zap.Logger + + decodeF func(ctx context.Context, reader io.Reader, event events.S3EventRecord) error } -func newS3Handler[T emits]( +func newS3LogsHandler( service internal.S3Service, baseLogger *zap.Logger, - unmarshal unmarshalFunc[T], - consumer s3EventConsumerFunc[T], -) *s3Handler[T] { - return &s3Handler[T]{ - s3Service: service, - logger: baseLogger.Named("s3"), - s3Unmarshaler: unmarshal, - consumer: consumer, + logsDecoder encoding.LogsDecoderFactory, + consumer s3EventConsumerFunc[plog.Logs], +) *s3Handler { + logDecodeF := func(ctx context.Context, reader io.Reader, event events.S3EventRecord) error { + var decoder encoding.LogsDecoder + // Bytes based batching and disable flush on items + decoder, err := logsDecoder.NewLogsDecoder(reader, encoding.WithFlushBytes(s3StreamBatchSize), encoding.WithFlushItems(0)) + if err != nil { + return fmt.Errorf("failed to derive the extension for S3 logs: %w", err) + } + + for { + var logs plog.Logs + logs, err = decoder.DecodeLogs() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return fmt.Errorf("failed to decode S3 logs: %w", err) + } + + enrichS3Logs(logs, event) + if err = consumer(ctx, event, logs); err != nil { + return checkConsumerErrorAndWrap(err) + } + } + + return nil + } + + return &s3Handler{ + s3Service: service, + logger: baseLogger.Named("s3"), + decodeF: logDecodeF, + } +} + +func newS3MetricsHandler( + service internal.S3Service, + baseLogger *zap.Logger, + metricsDecoder encoding.MetricsDecoderFactory, + consumer s3EventConsumerFunc[pmetric.Metrics], +) *s3Handler { + metricDecodeF := func(ctx context.Context, reader io.Reader, event events.S3EventRecord) error { + var decoder encoding.MetricsDecoder + // Bytes based batching and disable flush on items + decoder, err := metricsDecoder.NewMetricsDecoder(reader, encoding.WithFlushBytes(s3StreamBatchSize), encoding.WithFlushItems(0)) + if err != nil { + return fmt.Errorf("failed to derive the extension for S3 metrics: %w", err) + } + + for { + var metrics pmetric.Metrics + metrics, err = decoder.DecodeMetrics() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return fmt.Errorf("failed to decode S3 metrics: %w", err) + } + + if err := consumer(ctx, event, metrics); err != nil { + return checkConsumerErrorAndWrap(err) + } + } + + return nil + } + + return &s3Handler{ + s3Service: service, + logger: baseLogger.Named("s3"), + decodeF: metricDecodeF, } } -func (*s3Handler[T]) handlerType() eventType { +func (*s3Handler) handlerType() eventType { return s3Event } -func (s *s3Handler[T]) handle(ctx context.Context, event json.RawMessage) error { +func (s *s3Handler) handle(ctx context.Context, event json.RawMessage) error { + var err error parsedEvent, err := s.parseEvent(event) if err != nil { return fmt.Errorf("failed to parse the event: %w", err) @@ -127,24 +192,31 @@ func (s *s3Handler[T]) handle(ctx context.Context, event json.RawMessage) error return nil } - body, err := s.s3Service.ReadObject(ctx, parsedEvent.S3.Bucket.Name, parsedEvent.S3.Object.URLDecodedKey) + reader, err := s.s3Service.GetReader(ctx, parsedEvent.S3.Bucket.Name, parsedEvent.S3.Object.URLDecodedKey) if err != nil { return err } - data, err := s.s3Unmarshaler(body) + wrappedReader, err := gunzipIfNeeded(reader) if err != nil { - return fmt.Errorf("failed to unmarshal S3 data: %w", err) + return fmt.Errorf("failed to derive reader with wrapper: %w", err) } - if err := s.consumer(ctx, parsedEvent, data); err != nil { - return checkConsumerErrorAndWrap(err) + defer func() { + if gzReader, ok := wrappedReader.(*gzip.Reader); ok { + _ = gzReader.Close() + } + }() + + err = s.decodeF(ctx, wrappedReader, parsedEvent) + if err != nil { + return err } return nil } -func (*s3Handler[T]) parseEvent(raw json.RawMessage) (event events.S3EventRecord, err error) { +func (*s3Handler) parseEvent(raw json.RawMessage) (event events.S3EventRecord, err error) { var message events.S3Event if err := gojson.Unmarshal(raw, &message); err != nil { return events.S3EventRecord{}, fmt.Errorf("failed to unmarshal S3 event notification: %w", err) @@ -160,17 +232,17 @@ func (*s3Handler[T]) parseEvent(raw json.RawMessage) (event events.S3EventRecord // cwLogsSubscriptionHandler is specialized in CloudWatch log stream subscription filter events type cwLogsSubscriptionHandler struct { - unmarshal unmarshalFunc[plog.Logs] - consumer func(context.Context, plog.Logs) error + logsDecoder encoding.LogsDecoderFactory + consumer func(context.Context, plog.Logs) error } func newCWLogsSubscriptionHandler( - unmarshal unmarshalFunc[plog.Logs], + logsDecoder encoding.LogsDecoderFactory, consumer func(context.Context, plog.Logs) error, ) *cwLogsSubscriptionHandler { return &cwLogsSubscriptionHandler{ - unmarshal: unmarshal, - consumer: consumer, + logsDecoder: logsDecoder, + consumer: consumer, } } @@ -197,70 +269,45 @@ func (c *cwLogsSubscriptionHandler) handle(ctx context.Context, event json.RawMe defer reader.Close() - var decodedData bytes.Buffer - _, err = decodedData.ReadFrom(reader) + decoder, err := c.logsDecoder.NewLogsDecoder(reader, encoding.WithFlushBytes(s3StreamBatchSize)) if err != nil { - return fmt.Errorf("failed to read decompressed data from cloudwatch subscription event: %w", err) + return err } - data, err := c.unmarshal(decodedData.Bytes()) - if err != nil { - return fmt.Errorf("failed to unmarshal CloudWatch logs: %w", err) - } - if err := c.consumer(ctx, data); err != nil { - return checkConsumerErrorAndWrap(err) + for { + var logs plog.Logs + logs, err = decoder.DecodeLogs() + if err != nil { + if errors.Is(err, io.EOF) { + break + } + + return fmt.Errorf("failed to decode S3 logs: %w", err) + } + if err = c.consumer(ctx, logs); err != nil { + return checkConsumerErrorAndWrap(err) + } } return nil } -// cwLogsToPlogs implements unmarshalFunc for plog.Logs. -// This defines the built-in behavior for CloudWatch subscription filter events when no encoding extension is provided. -func cwLogsToPlogs(data []byte) (plog.Logs, error) { - var cwLog events.CloudwatchLogsData - err := gojson.Unmarshal(data, &cwLog) +// gunzipIfNeeded checks if the provided reader is a gzipped stream and returns a reader with gunzip wrapping if needed. +func gunzipIfNeeded(r io.Reader) (io.Reader, error) { + buf := bufio.NewReaderSize(r, readerBufferSize) + header, err := buf.Peek(2) if err != nil { - return plog.Logs{}, fmt.Errorf("failed to unmarshal data from cloudwatch logs event: %w", err) - } - - logs := plog.NewLogs() - rl := logs.ResourceLogs().AppendEmpty() - resourceAttrs := rl.Resource().Attributes() - resourceAttrs.PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAWS.Value.AsString()) - resourceAttrs.PutStr(string(conventions.CloudAccountIDKey), cwLog.Owner) - resourceAttrs.PutStr(string(conventions.AWSLogGroupNamesKey), cwLog.LogGroup) - resourceAttrs.PutStr(string(conventions.AWSLogStreamNamesKey), cwLog.LogStream) - - sl := rl.ScopeLogs().AppendEmpty() - sl.Scope().SetName(metadata.ScopeName) - - for _, event := range cwLog.LogEvents { - logRecord := sl.LogRecords().AppendEmpty() - // pcommon.Timestamp is a time specified as UNIX Epoch time in nanoseconds - // but timestamp in cloudwatch logs are in milliseconds. - logRecord.SetTimestamp(pcommon.Timestamp(event.Timestamp * int64(time.Millisecond))) - logRecord.Body().SetStr(event.Message) + return nil, err } - - return logs, err -} - -// bytesToPlogs implements unmarshalFunc for plog.Logs. -// This defines the built-in behavior for S3 events when no encoding extension is provided. -func bytesToPlogs(data []byte) (plog.Logs, error) { - logs := plog.NewLogs() - rl := logs.ResourceLogs().AppendEmpty() - sl := rl.ScopeLogs().AppendEmpty() - sl.Scope().SetName(metadata.ScopeName) - - lr := sl.LogRecords().AppendEmpty() - if utf8.Valid(data) { - lr.Body().SetStr(string(data)) - } else { - lr.Body().SetEmptyBytes().FromRaw(data) + // gzip magic number: 0x1f 0x8b + if header[0] == 0x1f && header[1] == 0x8b { + gr, err := gzip.NewReader(buf) + if err != nil { + return nil, err + } + return gr, nil } - - return logs, nil + return buf, nil } func enrichS3Logs(logs plog.Logs, event events.S3EventRecord) { diff --git a/receiver/awslambdareceiver/handler_test.go b/receiver/awslambdareceiver/handler_test.go index 993bdabfb1d00..cfcee83b0f209 100644 --- a/receiver/awslambdareceiver/handler_test.go +++ b/receiver/awslambdareceiver/handler_test.go @@ -9,6 +9,7 @@ import ( "encoding/base64" "encoding/json" "errors" + "io" "os" "path/filepath" "testing" @@ -17,6 +18,7 @@ import ( "github.com/aws/aws-lambda-go/events" "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" "go.opentelemetry.io/collector/pdata/pcommon" @@ -24,8 +26,10 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" ) @@ -44,7 +48,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { name string s3Event events.S3Event s3MockContent s3Content - unmarshaler func(buf []byte) (plog.Logs, error) + extension encoding.LogsDecoderFactory eventConsumer consumer.Logs expectedErr string }{ @@ -70,7 +74,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { objectKey: "test-file.txt", data: []byte("Some log in S3 object"), }, - unmarshaler: customLogUnmarshaler{}.UnmarshalLogs, + extension: &customLogUnmarshaler{}, eventConsumer: &noOpLogsConsumer{}, }, { @@ -95,7 +99,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { objectKey: "Test-file(10x10)#1.txt", data: []byte("Some log in S3 object"), }, - unmarshaler: customLogUnmarshaler{}.UnmarshalLogs, + extension: &customLogUnmarshaler{}, eventConsumer: &noOpLogsConsumer{}, }, { @@ -121,7 +125,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { objectKey: "test-file.txt", data: []byte("Some log in S3 object"), }, - unmarshaler: bytesToPlogs, + extension: internal.NewDefaultS3LogsDecoder(), eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "s3_log_expected_string.yaml")}, }, { @@ -145,9 +149,9 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { s3MockContent: s3Content{ bucketName: "test-bucket", objectKey: "test-file.txt", - data: []byte("H4sIAAAAAAAAAwvOz01VyMlPV8jMUwg2VshPykpNLgEAo01BGxUAAAA="), + data: compressData(t, []byte("Logs in Gzip S3 object")), }, - unmarshaler: bytesToPlogs, + extension: internal.NewDefaultS3LogsDecoder(), eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "s3_log_expected_gzip.yaml")}, }, { @@ -155,6 +159,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { s3Event: events.S3Event{ Records: []events.S3EventRecord{}, }, + extension: &customLogUnmarshaler{}, eventConsumer: &noOpLogsConsumer{}, expectedErr: "s3 event notification should contain one record instead of 0", }, @@ -179,7 +184,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { objectKey: "test-file.txt", data: []byte("Some log in S3 object"), }, - unmarshaler: customLogUnmarshaler{error: errors.New("failed to unmarshal logs")}.UnmarshalLogs, + extension: &customLogUnmarshaler{error: errors.New("failed to unmarshal logs")}, eventConsumer: &noOpLogsConsumer{}, expectedErr: "failed to unmarshal logs", }, @@ -204,6 +209,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { objectKey: "test-file.txt", data: []byte{}, }, + extension: &customLogUnmarshaler{}, eventConsumer: &noOpLogsConsumer{}, }, } @@ -214,8 +220,8 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { t.Run(test.name, func(t *testing.T) { s3Service := internal.NewMockS3Service(ctr) s3Service.EXPECT(). - ReadObject(gomock.Any(), test.s3MockContent.bucketName, test.s3MockContent.objectKey). - Return(test.s3MockContent.data, nil). + GetReader(gomock.Any(), test.s3MockContent.bucketName, test.s3MockContent.objectKey). + Return(io.NopCloser(bytes.NewReader(test.s3MockContent.data)), nil). AnyTimes() // Wrap the consumer to match the new s3EventConsumerFunc signature @@ -224,7 +230,7 @@ func TestProcessLambdaEvent_S3LogNotification(t *testing.T) { return test.eventConsumer.ConsumeLogs(ctx, logs) } - handler := newS3Handler(s3Service, zap.NewNop(), test.unmarshaler, logsConsumer) + handler := newS3LogsHandler(s3Service, zap.NewNop(), test.extension, logsConsumer) var event json.RawMessage event, err := json.Marshal(test.s3Event) @@ -316,7 +322,7 @@ func TestS3HandlerParseEvent(t *testing.T) { enrichS3Logs(logs, event) return consumer.ConsumeLogs(ctx, logs) } - handler := newS3Handler(s3Service, zap.NewNop(), customLogUnmarshaler{}.UnmarshalLogs, logsConsumer) + handler := newS3LogsHandler(s3Service, zap.NewNop(), &customLogUnmarshaler{}, logsConsumer) for _, test := range tests { t.Run(test.name, func(t *testing.T) { @@ -339,37 +345,37 @@ func TestHandleCloudwatchLogEvent(t *testing.T) { t.Parallel() tests := []struct { - name string - eventData string - unmarshalerFunc func(buf []byte) (plog.Logs, error) - eventConsumer consumer.Logs - expectedErr string + name string + eventData string + extension encoding.LogsDecoderFactory + eventConsumer consumer.Logs + expectedErr string }{ { - name: "Valid CloudWatch log event with built-in unmarshaler and golden validation consumer", - eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")), - unmarshalerFunc: cwLogsToPlogs, - eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected_default.yaml")}, + name: "Valid CloudWatch log event with built-in unmarshaler and golden validation consumer", + eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")), + extension: internal.NewDefaultCWLogsDecoder(), + eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected_default.yaml")}, }, { - name: "Valid CloudWatch log event with custom unmarshaler and golden validation consumer", - eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")), - unmarshalerFunc: customLogUnmarshaler{}.UnmarshalLogs, - eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected_custom.yaml")}, + name: "Valid CloudWatch log event with custom unmarshaler and golden validation consumer", + eventData: loadCompressedData(t, filepath.Join(testDataDirectory, "cloudwatch_log.json")), + extension: &customLogUnmarshaler{}, + eventConsumer: &logConsumerWithGoldenValidation{logsExpectedPath: filepath.Join(testDataDirectory, "cloudwatch_log_expected_custom.yaml")}, }, { - name: "Invalid CloudWatch log event - invalid base64 data", - eventData: "#", - unmarshalerFunc: cwLogsToPlogs, - expectedErr: "failed to decode data from cloudwatch logs event", - eventConsumer: &noOpLogsConsumer{}, + name: "Invalid CloudWatch log event - invalid base64 data", + eventData: "#", + extension: internal.NewDefaultCWLogsDecoder(), + expectedErr: "failed to decode data from cloudwatch logs event", + eventConsumer: &noOpLogsConsumer{}, }, { - name: "Invalid CloudWatch log event - invalid json data", - eventData: "test", - unmarshalerFunc: cwLogsToPlogs, - expectedErr: "failed to decompress data from cloudwatch subscription event", - eventConsumer: &noOpLogsConsumer{}, + name: "Invalid CloudWatch log event - invalid json data", + eventData: "test", + extension: internal.NewDefaultCWLogsDecoder(), + expectedErr: "failed to decompress data from cloudwatch subscription event", + eventConsumer: &noOpLogsConsumer{}, }, } @@ -384,7 +390,8 @@ func TestHandleCloudwatchLogEvent(t *testing.T) { lambdaEvent, err := json.Marshal(cwEvent) require.NoError(t, err) - handler := newCWLogsSubscriptionHandler(test.unmarshalerFunc, test.eventConsumer.ConsumeLogs) + handler := newCWLogsSubscriptionHandler(test.extension, test.eventConsumer.ConsumeLogs) + err = handler.handle(t.Context(), lambdaEvent) if test.expectedErr != "" { require.ErrorContains(t, err, test.expectedErr) @@ -499,14 +506,16 @@ func TestConsumerErrorHandling(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { s3Service := internal.NewMockS3Service(ctr) - s3Service.EXPECT().ReadObject(gomock.Any(), gomock.Any(), gomock.Any()).Return([]byte("object content"), nil).Times(1) + s3Service.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any()). + Return(io.NopCloser(bytes.NewReader([]byte("object content"))), nil). + Times(1) // Consumer that returns the test error logsConsumer := func(_ context.Context, _ events.S3EventRecord, _ plog.Logs) error { return test.consumerErr } - handler := newS3Handler(s3Service, zap.NewNop(), customLogUnmarshaler{}.UnmarshalLogs, logsConsumer) + handler := newS3LogsHandler(s3Service, zap.NewNop(), &customLogUnmarshaler{}, logsConsumer) event, err := json.Marshal(mockEvent) require.NoError(t, err) @@ -561,16 +570,53 @@ type customLogUnmarshaler struct { error error } -func (customLogUnmarshaler) Capabilities() consumer.Capabilities { +func (*customLogUnmarshaler) Start(_ context.Context, _ component.Host) error { + return nil +} + +func (*customLogUnmarshaler) Shutdown(_ context.Context) error { + return nil +} + +func (*customLogUnmarshaler) Capabilities() consumer.Capabilities { return consumer.Capabilities{} } -func (m customLogUnmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) { +func (m *customLogUnmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) { if m.error != nil { return plog.Logs{}, m.error } // perform minimal unmarshaling for validations + return m.makeLog(data), nil +} + +func (m *customLogUnmarshaler) NewLogsDecoder(reader io.Reader, _ ...encoding.DecoderOption) (encoding.LogsDecoder, error) { + if m.error != nil { + return nil, m.error + } + + data, err := io.ReadAll(reader) + if err != nil { + return nil, err + } + + isEOF := false + return xstreamencoding.NewLogsDecoderAdapter( + func() (plog.Logs, error) { + if isEOF { + return plog.Logs{}, io.EOF + } + + isEOF = true + return m.makeLog(data), nil + }, func() int64 { + return 0 + }), + nil +} + +func (*customLogUnmarshaler) makeLog(data []byte) plog.Logs { logs := plog.NewLogs() rl := logs.ResourceLogs().AppendEmpty() @@ -584,7 +630,7 @@ func (m customLogUnmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) { } else { lr.Body().SetEmptyBytes().FromRaw(data) } - return logs, nil + return logs } type mockHandlerProvider struct { diff --git a/receiver/awslambdareceiver/internal/default_decoders.go b/receiver/awslambdareceiver/internal/default_decoders.go new file mode 100644 index 0000000000000..f45fc87dccfd9 --- /dev/null +++ b/receiver/awslambdareceiver/internal/default_decoders.go @@ -0,0 +1,78 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal // import "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" + +import ( + "fmt" + "time" + + "github.com/aws/aws-lambda-go/events" + gojson "github.com/goccy/go-json" + "go.opentelemetry.io/collector/pdata/pcommon" + "go.opentelemetry.io/collector/pdata/plog" + conventions "go.opentelemetry.io/otel/semconv/v1.38.0" + + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal/metadata" +) + +// NewDefaultS3LogsDecoder returns a defaultS3Unmarshaler wrapped as an encoding.LogsDecoderFactory. +func NewDefaultS3LogsDecoder() encoding.LogsDecoderFactory { + return xstreamencoding.NewLogsUnmarshalerDecoderFactory(&defaultS3Unmarshaler{}) +} + +// defaultS3Unmarshaler defines the default S3 logs decoder for AWS Lambda receiver. +type defaultS3Unmarshaler struct{} + +// UnmarshalLogs defines the built-in behavior for S3 events when no encoding extension is provided. +func (*defaultS3Unmarshaler) UnmarshalLogs(data []byte) (plog.Logs, error) { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName(metadata.ScopeName) + + lr := sl.LogRecords().AppendEmpty() + lr.Body().SetStr(string(data)) + + return logs, nil +} + +// NewDefaultCWLogsDecoder returns a defaultCWLogsDecoder wrapped as an encoding.LogsDecoderFactory. +func NewDefaultCWLogsDecoder() encoding.LogsDecoderFactory { + return xstreamencoding.NewLogsUnmarshalerDecoderFactory(&defaultCWLogsDecoder{}) +} + +// defaultCWLogsDecoder defines the default CloudWatch logs decoder for AWS Lambda receiver. +type defaultCWLogsDecoder struct{} + +// UnmarshalLogs defines the built-in behavior for CloudWatch logs events when no encoding extension is provided. +func (*defaultCWLogsDecoder) UnmarshalLogs(data []byte) (plog.Logs, error) { + var cwLog events.CloudwatchLogsData + err := gojson.Unmarshal(data, &cwLog) + if err != nil { + return plog.NewLogs(), fmt.Errorf("failed to unmarshal data from cloudwatch logs event: %w", err) + } + + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + resourceAttrs := rl.Resource().Attributes() + resourceAttrs.PutStr(string(conventions.CloudProviderKey), conventions.CloudProviderAWS.Value.AsString()) + resourceAttrs.PutStr(string(conventions.CloudAccountIDKey), cwLog.Owner) + resourceAttrs.PutStr(string(conventions.AWSLogGroupNamesKey), cwLog.LogGroup) + resourceAttrs.PutStr(string(conventions.AWSLogStreamNamesKey), cwLog.LogStream) + + sl := rl.ScopeLogs().AppendEmpty() + sl.Scope().SetName(metadata.ScopeName) + + for _, event := range cwLog.LogEvents { + logRecord := sl.LogRecords().AppendEmpty() + // pcommon.Timestamp is a time specified as UNIX Epoch time in nanoseconds + // but timestamp in cloudwatch logs are in milliseconds. + logRecord.SetTimestamp(pcommon.Timestamp(event.Timestamp * int64(time.Millisecond))) + logRecord.Body().SetStr(event.Message) + } + + return logs, nil +} diff --git a/receiver/awslambdareceiver/internal/default_decoders_test.go b/receiver/awslambdareceiver/internal/default_decoders_test.go new file mode 100644 index 0000000000000..c06458df09b21 --- /dev/null +++ b/receiver/awslambdareceiver/internal/default_decoders_test.go @@ -0,0 +1,106 @@ +// Copyright The OpenTelemetry Authors +// SPDX-License-Identifier: Apache-2.0 + +package internal + +import ( + "bytes" + "io" + "path/filepath" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/golden" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest/plogtest" +) + +func TestDefaultS3LogsDecoder_NewLogsDecoder(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input func() io.Reader + expectedPath string + }{ + { + name: "plain text input", + input: func() io.Reader { + return bytes.NewReader([]byte("some text")) + }, + expectedPath: filepath.Join("testdata", "default_s3_decoder_expected.yaml"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + decoder := NewDefaultS3LogsDecoder() + logsDecoder, err := decoder.NewLogsDecoder(tt.input()) + require.NoError(t, err) + require.NotNil(t, logsDecoder) + + // First call should return logs + logs, err := logsDecoder.DecodeLogs() + require.NoError(t, err) + require.Equal(t, 1, logs.ResourceLogs().Len()) + + // Validate against expected golden file + // Uncomment below to update the expected files + // golden.WriteLogs(t, tt.expectedPath, logs) + expectedLogs, err := golden.ReadLogs(tt.expectedPath) + require.NoError(t, err) + require.NoError(t, plogtest.CompareLogs(expectedLogs, logs)) + + // Second call should return EOF + _, err = logsDecoder.DecodeLogs() + require.ErrorIs(t, err, io.EOF) + }) + } +} + +func TestDefaultCWLogsDecoder_NewLogsDecoder(t *testing.T) { + t.Parallel() + + tests := []struct { + name string + input func() io.Reader + expectedPath string + }{ + { + name: "plain text CloudWatch logs JSON", + input: func() io.Reader { + return bytes.NewReader([]byte(`{"owner":"111222333444","logGroup":"/test/log-group","logStream":"test-stream","messageType":"DATA_MESSAGE","logEvents":[{"id":"1","timestamp":1700000000000,"message":"some text"}]}`)) + }, + expectedPath: filepath.Join("testdata", "default_cw_decoder_expected.yaml"), + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + decoder := NewDefaultCWLogsDecoder() + logsDecoder, err := decoder.NewLogsDecoder(tt.input()) + require.NoError(t, err) + require.NotNil(t, logsDecoder) + + // First call should return logs + logs, err := logsDecoder.DecodeLogs() + require.NoError(t, err) + require.Equal(t, 1, logs.ResourceLogs().Len()) + + // Validate against expected golden file + // Uncomment below to update the expected files + // golden.WriteLogs(t, tt.expectedPath, logs) + expectedLogs, err := golden.ReadLogs(tt.expectedPath) + require.NoError(t, err) + require.NoError(t, plogtest.CompareLogs(expectedLogs, logs)) + + // Second call should return EOF + _, err = logsDecoder.DecodeLogs() + require.ErrorIs(t, err, io.EOF) + }) + } +} diff --git a/receiver/awslambdareceiver/internal/mock_s3_service.go b/receiver/awslambdareceiver/internal/mock_s3_service.go index 642cc67471aa4..ee0b993d30766 100644 --- a/receiver/awslambdareceiver/internal/mock_s3_service.go +++ b/receiver/awslambdareceiver/internal/mock_s3_service.go @@ -11,6 +11,7 @@ package internal import ( context "context" + io "io" reflect "reflect" s3 "github.com/aws/aws-sdk-go-v2/service/s3" @@ -139,6 +140,21 @@ func (mr *MockS3ServiceMockRecorder) DeleteObject(ctx, bucketName, objectKey any return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeleteObject", reflect.TypeOf((*MockS3Service)(nil).DeleteObject), ctx, bucketName, objectKey) } +// GetReader mocks base method. +func (m *MockS3Service) GetReader(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "GetReader", ctx, bucketName, objectKey) + ret0, _ := ret[0].(io.ReadCloser) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// GetReader indicates an expected call of GetReader. +func (mr *MockS3ServiceMockRecorder) GetReader(ctx, bucketName, objectKey any) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetReader", reflect.TypeOf((*MockS3Service)(nil).GetReader), ctx, bucketName, objectKey) +} + // ListObjects mocks base method. func (m *MockS3Service) ListObjects(ctx context.Context, bucketName, continuationToken, prefix string) (*s3.ListObjectsV2Output, error) { m.ctrl.T.Helper() diff --git a/receiver/awslambdareceiver/internal/s3.go b/receiver/awslambdareceiver/internal/s3.go index 22f34c63a836b..6e4acc13732d7 100644 --- a/receiver/awslambdareceiver/internal/s3.go +++ b/receiver/awslambdareceiver/internal/s3.go @@ -22,6 +22,7 @@ type s3API interface { // S3Service define services exposed for consumers type S3Service interface { + GetReader(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error) ReadObject(ctx context.Context, bucketName, objectKey string) ([]byte, error) ListObjects(ctx context.Context, bucketName, continuationToken, prefix string) (*s3.ListObjectsV2Output, error) DeleteObject(ctx context.Context, bucketName, objectKey string) error @@ -69,6 +70,16 @@ func (s *s3ServiceClient) ReadObject(ctx context.Context, bucketName, objectKey return body, nil } +func (s *s3ServiceClient) GetReader(ctx context.Context, bucketName, objectKey string) (io.ReadCloser, error) { + params := s3.GetObjectInput{Bucket: &bucketName, Key: &objectKey} + out, err := s.api.GetObject(ctx, ¶ms) + if err != nil { + return nil, fmt.Errorf("unable to to obtain file object with key %s from bucket %s: %w", objectKey, bucketName, err) + } + + return out.Body, nil +} + func (s *s3ServiceClient) ListObjects(ctx context.Context, bucketName, continuationToken, prefix string) (*s3.ListObjectsV2Output, error) { input := s3.ListObjectsV2Input{ Bucket: &bucketName, diff --git a/receiver/awslambdareceiver/internal/testdata/default_cw_decoder_expected.yaml b/receiver/awslambdareceiver/internal/testdata/default_cw_decoder_expected.yaml new file mode 100644 index 0000000000000..facaa8203a915 --- /dev/null +++ b/receiver/awslambdareceiver/internal/testdata/default_cw_decoder_expected.yaml @@ -0,0 +1,22 @@ +resourceLogs: + - resource: + attributes: + - key: cloud.provider + value: + stringValue: aws + - key: cloud.account.id + value: + stringValue: "111222333444" + - key: aws.log.group.names + value: + stringValue: /test/log-group + - key: aws.log.stream.names + value: + stringValue: test-stream + scopeLogs: + - logRecords: + - body: + stringValue: some text + timeUnixNano: "1700000000000000000" + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver diff --git a/receiver/awslambdareceiver/internal/testdata/default_s3_decoder_expected.yaml b/receiver/awslambdareceiver/internal/testdata/default_s3_decoder_expected.yaml new file mode 100644 index 0000000000000..a43897216bdc6 --- /dev/null +++ b/receiver/awslambdareceiver/internal/testdata/default_s3_decoder_expected.yaml @@ -0,0 +1,8 @@ +resourceLogs: + - resource: {} + scopeLogs: + - logRecords: + - body: + stringValue: some text + scope: + name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver diff --git a/receiver/awslambdareceiver/receiver.go b/receiver/awslambdareceiver/receiver.go index 675a9b2816daf..8b9f1ef5d0373 100644 --- a/receiver/awslambdareceiver/receiver.go +++ b/receiver/awslambdareceiver/receiver.go @@ -17,11 +17,14 @@ import ( "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/consumer" "go.opentelemetry.io/collector/consumer/consumererror" + "go.opentelemetry.io/collector/extension" "go.opentelemetry.io/collector/pdata/plog" "go.opentelemetry.io/collector/pdata/pmetric" "go.opentelemetry.io/collector/receiver" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" ) @@ -228,48 +231,43 @@ func newLogsHandler( s3Provider internal.S3Provider, ) (handlerProvider, error) { logger := set.Logger - var s3Unmarshaler unmarshalFunc[plog.Logs] = bytesToPlogs + + var err error + s3LogsDecoder := internal.NewDefaultS3LogsDecoder() if cfg.S3.Encoding != "" { logger.Info("Using configured S3 encoding for logs", zap.String("encoding", cfg.S3.Encoding)) - extension, err := loadEncodingExtension[plog.Unmarshaler](host, cfg.S3.Encoding, "logs") + + s3LogsDecoder, err = resolveLogsDecoder(host, cfg.S3.Encoding) if err != nil { return nil, err } + } + + s3Service, err := s3Provider.GetService(ctx) + if err != nil { + return nil, fmt.Errorf("unable to load the S3 service: %w", err) + } - s3Unmarshaler = extension.UnmarshalLogs + // Wrapper function that sets observed timestamp for S3 logs + logsConsumer := func(ctx context.Context, event events.S3EventRecord, logs plog.Logs) error { + enrichS3Logs(logs, event) + return next.ConsumeLogs(ctx, logs) } - var cwUnmarshaler unmarshalFunc[plog.Logs] = cwLogsToPlogs + cwDecoder := internal.NewDefaultCWLogsDecoder() if cfg.CloudWatch.Encoding != "" { logger.Info("Using configured CloudWatch encoding for logs", zap.String("encoding", cfg.CloudWatch.Encoding)) - extension, err := loadEncodingExtension[plog.Unmarshaler](host, cfg.CloudWatch.Encoding, "logs") + + cwDecoder, err = resolveLogsDecoder(host, cfg.CloudWatch.Encoding) if err != nil { return nil, err } - - cwUnmarshaler = extension.UnmarshalLogs - } - - s3Service, err := s3Provider.GetService(ctx) - if err != nil { - return nil, fmt.Errorf("unable to load the S3 service: %w", err) } // Register handlers. Logs supports S3 and CloudWatch Logs subscription events. registry := make(handlerRegistry) - registry[s3Event] = func() lambdaEventHandler { - // Wrapper function that sets observed timestamp for S3 logs - logsConsumer := func(ctx context.Context, event events.S3EventRecord, logs plog.Logs) error { - enrichS3Logs(logs, event) - return next.ConsumeLogs(ctx, logs) - } - - return newS3Handler(s3Service, logger, s3Unmarshaler, logsConsumer) - } - - registry[cwEvent] = func() lambdaEventHandler { - return newCWLogsSubscriptionHandler(cwUnmarshaler, next.ConsumeLogs) - } + registry[s3Event] = newS3LogsHandler(s3Service, logger, s3LogsDecoder, logsConsumer) + registry[cwEvent] = newCWLogsSubscriptionHandler(cwDecoder, next.ConsumeLogs) return newHandlerProvider(registry), nil } @@ -290,27 +288,60 @@ func newMetricsHandler( extensionID = cfg.S3.Encoding } - encodingExtension, err := loadEncodingExtension[pmetric.Unmarshaler](host, extensionID, "metrics") + ext, err := loadEncodingExtension[extension.Extension](host, extensionID, "metrics") if err != nil { return nil, err } + var decoder encoding.MetricsDecoderFactory + decoder, ok := ext.(encoding.MetricsDecoderExtension) + if !ok { + // derive a decoder wrapper if extension is of encoding.MetricsUnmarshalerExtension type + metricsUnmarshaler, t := ext.(encoding.MetricsUnmarshalerExtension) + if !t { + return nil, errors.New("provided extension does not implement MetricsDecoder or MetricsUnmarshalerExtension interfaces") + } + + decoder = xstreamencoding.NewMetricsUnmarshalerDecoderFactory(metricsUnmarshaler) + } + s3Service, err := s3Provider.GetService(ctx) if err != nil { return nil, fmt.Errorf("unable to load the S3 service: %w", err) } + metricConsumer := func(ctx context.Context, _ events.S3EventRecord, metrics pmetric.Metrics) error { + return next.ConsumeMetrics(ctx, metrics) + } + // Register handlers. Metrics supports S3 events. registry := make(handlerRegistry) - registry[s3Event] = func() lambdaEventHandler { - metricConsumer := func(ctx context.Context, _ events.S3EventRecord, metrics pmetric.Metrics) error { - return next.ConsumeMetrics(ctx, metrics) + registry[s3Event] = newS3MetricsHandler(s3Service, set.Logger, decoder, metricConsumer) + + return newHandlerProvider(registry), nil +} + +func resolveLogsDecoder(host component.Host, encoderName string) (encoding.LogsDecoderFactory, error) { + var ext extension.Extension + ext, err := loadEncodingExtension[extension.Extension](host, encoderName, "logs") + if err != nil { + return nil, err + } + + var decoderFactory encoding.LogsDecoderFactory + var ok bool + decoderFactory, ok = ext.(encoding.LogsDecoderExtension) + if !ok { + // derive a decoder wrapper if extension is of encoding.LogsUnmarshalerExtension type + logsUnmarshaler, t := ext.(encoding.LogsUnmarshalerExtension) + if !t { + return nil, errors.New("provided extension does not implement LogsDecoder or LogsUnmarshalerExtension interfaces") } - return newS3Handler(s3Service, set.Logger, encodingExtension.UnmarshalMetrics, metricConsumer) + decoderFactory = xstreamencoding.NewLogsUnmarshalerDecoderFactory(logsUnmarshaler) } - return newHandlerProvider(registry), nil + return decoderFactory, nil } // loadEncodingExtension attempts to load an available extension for the given name. diff --git a/receiver/awslambdareceiver/receiver_test.go b/receiver/awslambdareceiver/receiver_test.go index 1d7e70de4423b..1c9a3863f8815 100644 --- a/receiver/awslambdareceiver/receiver_test.go +++ b/receiver/awslambdareceiver/receiver_test.go @@ -3,9 +3,11 @@ package awslambdareceiver import ( + "bytes" "context" "encoding/json" "errors" + "io" "testing" "github.com/aws/aws-lambda-go/events" @@ -18,6 +20,8 @@ import ( "go.uber.org/mock/gomock" "go.uber.org/zap" + "github.com/open-telemetry/opentelemetry-collector-contrib/extension/encoding" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/xstreamencoding" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal" "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver/internal/metadata" ) @@ -53,22 +57,15 @@ func TestCreateLogs(t *testing.T) { // Test data - mock S3 file content testData := []byte("version account-id interface-id srcaddr dstaddr srcport dstport protocol packets bytes start end action log-status\n2 627286350134 eni-0377aa710071c557e 172.31.31.124 140.82.121.6 52718 443 6 13 3777 1751375679 ENDTIME ACCEPT OK\n") - s3Service.EXPECT().ReadObject(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return(testData, nil) + s3Service.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any()). + Times(1). + Return(io.NopCloser(bytes.NewReader(testData)), nil) // Register the extension with the same component ID as S3Encoding // This is required: the extension ID must match cfg.S3Encoding host := mockHost{GetFunc: func() map[component.ID]component.Component { return map[component.ID]component.Component{ - component.MustNewID(s3Encoding): &mockExtensionWithPLogUnmarshaler{ - Unmarshaler: unmarshalLogsFunc(func(data []byte) (plog.Logs, error) { - require.Equal(t, string(testData), string(data)) - logs := plog.NewLogs() - rl := logs.ResourceLogs().AppendEmpty() - sl := rl.ScopeLogs().AppendEmpty() - sl.LogRecords().AppendEmpty() - return logs, nil - }), - }, + component.MustNewID(s3Encoding): &mockExtensionWithPLogUnmarshaler{}, } }} @@ -122,23 +119,13 @@ func TestCreateMetrics(t *testing.T) { s3Service := internal.NewMockS3Service(goMock) s3Provider := internal.NewMockS3Provider(goMock) s3Provider.EXPECT().GetService(gomock.Any()).AnyTimes().Return(s3Service, nil) - s3Service.EXPECT().ReadObject(gomock.Any(), gomock.Any(), gomock.Any()).Times(1).Return( - []byte("dummy data"), nil, - ) + s3Service.EXPECT().GetReader(gomock.Any(), gomock.Any(), gomock.Any()). + Times(1). + Return(io.NopCloser(bytes.NewReader([]byte("dummy data"))), nil) host := mockHost{GetFunc: func() map[component.ID]component.Component { return map[component.ID]component.Component{ - component.MustNewID(encoderName): &mockExtensionWithPMetricUnmarshaler{ - Unmarshaler: unmarshalMetricsFunc(func(data []byte) (pmetric.Metrics, error) { - require.Equal(t, "dummy data", string(data)) - metrics := pmetric.NewMetrics() - rm := metrics.ResourceMetrics().AppendEmpty() - ilm := rm.ScopeMetrics().AppendEmpty() - dp := ilm.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() - dp.SetIntValue(123) - return metrics, nil - }), - }, + component.MustNewID(encoderName): &mockExtensionWithPMetricUnmarshaler{}, } }} @@ -547,11 +534,73 @@ type mockExtensionWithPLogUnmarshaler struct { plog.Unmarshaler // Add the unmarshaler interface when needed. } +func (mockExtensionWithPLogUnmarshaler) UnmarshalLogs(_ []byte) (plog.Logs, error) { + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + sl.LogRecords().AppendEmpty() + return logs, nil +} + +func (mockExtensionWithPLogUnmarshaler) NewLogsDecoder(_ io.Reader, _ ...encoding.DecoderOption) (encoding.LogsDecoder, error) { + isEOF := false + return xstreamencoding.NewLogsDecoderAdapter( + func() (plog.Logs, error) { + if isEOF { + return plog.Logs{}, io.EOF + } + + logs := plog.NewLogs() + rl := logs.ResourceLogs().AppendEmpty() + sl := rl.ScopeLogs().AppendEmpty() + sl.LogRecords().AppendEmpty() + + isEOF = true + return logs, nil + }, + func() int64 { + return 0 + }), + nil +} + type mockExtensionWithPMetricUnmarshaler struct { mockExtension // Embed the base mock implementation. pmetric.Unmarshaler // Add the unmarshaler interface when needed. } +func (mockExtensionWithPMetricUnmarshaler) UnmarshalMetrics(_ []byte) (pmetric.Metrics, error) { + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + ilm := rm.ScopeMetrics().AppendEmpty() + dp := ilm.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetIntValue(123) + return metrics, nil +} + +func (mockExtensionWithPMetricUnmarshaler) NewMetricsDecoder(_ io.Reader, _ ...encoding.DecoderOption) (encoding.MetricsDecoder, error) { + isEOF := false + return xstreamencoding.NewMetricsDecoderAdapter( + func() (pmetric.Metrics, error) { + if isEOF { + return pmetric.Metrics{}, io.EOF + } + + metrics := pmetric.NewMetrics() + rm := metrics.ResourceMetrics().AppendEmpty() + ilm := rm.ScopeMetrics().AppendEmpty() + dp := ilm.Metrics().AppendEmpty().SetEmptyGauge().DataPoints().AppendEmpty() + dp.SetIntValue(123) + + isEOF = true + return metrics, nil + }, + func() int64 { + return 0 + }), + nil +} + type mockExtension struct{} func (*mockExtension) Start(_ context.Context, _ component.Host) error { @@ -572,15 +621,3 @@ type mockHost struct { func (m mockHost) GetExtensions() map[component.ID]component.Component { return m.GetFunc() } - -type unmarshalLogsFunc func([]byte) (plog.Logs, error) - -func (f unmarshalLogsFunc) UnmarshalLogs(data []byte) (plog.Logs, error) { - return f(data) -} - -type unmarshalMetricsFunc func([]byte) (pmetric.Metrics, error) - -func (f unmarshalMetricsFunc) UnmarshalMetrics(data []byte) (pmetric.Metrics, error) { - return f(data) -} diff --git a/receiver/awslambdareceiver/testdata/s3_log_expected_gzip.yaml b/receiver/awslambdareceiver/testdata/s3_log_expected_gzip.yaml index 133f398a8ab70..94d22f2002f4f 100644 --- a/receiver/awslambdareceiver/testdata/s3_log_expected_gzip.yaml +++ b/receiver/awslambdareceiver/testdata/s3_log_expected_gzip.yaml @@ -17,7 +17,7 @@ resourceLogs: - logRecords: - attributes: body: - stringValue: H4sIAAAAAAAAAwvOz01VyMlPV8jMUwg2VshPykpNLgEAo01BGxUAAAA= + stringValue: Logs in Gzip S3 object observedTimeUnixNano: "1764625361000000000" scope: name: github.com/open-telemetry/opentelemetry-collector-contrib/receiver/awslambdareceiver