diff --git a/.chloggen/s3-receiver-delete-obj.yaml b/.chloggen/s3-receiver-delete-obj.yaml new file mode 100644 index 0000000000000..ca46deca0fa2b --- /dev/null +++ b/.chloggen/s3-receiver-delete-obj.yaml @@ -0,0 +1,28 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/awss3 + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: "add `tag_object_after_ingestion` flag to the s3 receiver so objects that have been processed can be identified" + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [46078] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + They will be tagged with `otel-collector:status=ingested`. Operators can use that tag to define bucket lifecycle policies. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [] diff --git a/receiver/awss3receiver/README.md b/receiver/awss3receiver/README.md index 5599b7a878181..1843a4e116e4f 100644 --- a/receiver/awss3receiver/README.md +++ b/receiver/awss3receiver/README.md @@ -51,6 +51,7 @@ The following exporter configuration parameters are supported. | `suffix` | Key suffix to match against. | | Required | | `notifications:` | | | | | `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | | +| `tag_object_after_ingestion` | If enabled the receiver will attempt to tag the object after successfully ingesting it. | false | Optional | There are two modes of operation: @@ -64,7 +65,7 @@ The receiver can subscribe to an SQS queue that receives S3 event notifications: ```yaml sqs: # Required: The ARN of the SQS queue that receives S3 bucket notifications - queue_url: "https:https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" + queue_url: "https:https://sqs.us-east-1.amazonaws.com/123456789012/test-queue" # Required: The AWS region of the SQS queue region: "us-east-1" ``` @@ -73,7 +74,7 @@ sqs: Time-based configuration (`starttime`/`endtime`) and SQS configuration cannot be used together. ### Time format for `starttime` and `endtime` -The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data. +The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data. The time format is either RFC3339,`YYYY-MM-DD HH:MM` or simply `YYYY-MM-DD`, in which case the time is assumed to be `00:00`. ### Encodings @@ -81,7 +82,7 @@ By default, the receiver understands the following encodings: - otlp_json (OpenTelemetry Protocol format represented as json) with a suffix of `.json` - otlp_proto (OpenTelemetry Protocol format represented as Protocol Buffers) with a suffix of `.binpb` -The `encodings` options allows you to specify Encoding Extensions to use to decode keys with matching suffixes. +The `encodings` options allows you to specify Encoding Extensions to use to decode keys with matching suffixes. ### Example Configuration @@ -93,7 +94,7 @@ extension: encoding: utf8 marshaling_separator: "\n" unmarshaling_separator: "\r?\n" - + receivers: awss3: starttime: "2024-01-01 01:00" @@ -128,17 +129,17 @@ service: traces: receivers: [awss3/traces] exporters: [otlp_grpc] - + traces/sqs: receivers: [awss3/sqs_traces] exporters: [otlp_grpc] ``` ## Notifications -The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of +The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of "org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus". -The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the -record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the +The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the +record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the following attributes: | Attribute | Description | @@ -150,6 +151,10 @@ following attributes: | `failure_message` | Error message if `ingest_status` is "failed". | The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time. -If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with +If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with the time of the data being ingested when the failure occurred. If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent. + +## Object Lifecycle Management +If the `tag_object_after_ingestion` is enabled the receiver will make a best-effort attempt to tag objects with `otel-collector:status = ingested` after they are processed by the pipeline. This requires an additional `s3:PutObjectTagging` permission. +This tag can then be used with a lifecycle policy to expire ingested objects or transition them to cheaper storage classes. \ No newline at end of file diff --git a/receiver/awss3receiver/config.go b/receiver/awss3receiver/config.go index 3194481dd9395..d342cbc387288 100644 --- a/receiver/awss3receiver/config.go +++ b/receiver/awss3receiver/config.go @@ -26,6 +26,7 @@ type S3DownloaderConfig struct { Endpoint string `mapstructure:"endpoint"` EndpointPartitionID string `mapstructure:"endpoint_partition_id"` S3ForcePathStyle bool `mapstructure:"s3_force_path_style"` + TagObjectAfterIngestion bool `mapstructure:"tag_object_after_ingestion"` } // SQSConfig holds SQS queue configuration for receiving object change notifications. diff --git a/receiver/awss3receiver/config.schema.yaml b/receiver/awss3receiver/config.schema.yaml index c3417000d078f..d7cb8722c87ae 100644 --- a/receiver/awss3receiver/config.schema.yaml +++ b/receiver/awss3receiver/config.schema.yaml @@ -40,6 +40,8 @@ $defs: type: string s3_prefix: type: string + tag_object_after_ingestion: + type: boolean sqs_config: description: SQSConfig holds SQS queue configuration for receiving object change notifications. type: object diff --git a/receiver/awss3receiver/config_test.go b/receiver/awss3receiver/config_test.go index 9b302c1124e4a..9e51ee145a051 100644 --- a/receiver/awss3receiver/config_test.go +++ b/receiver/awss3receiver/config_test.go @@ -36,6 +36,7 @@ func TestConfig_Validate_Valid(t *testing.T) { Endpoint: "", EndpointPartitionID: "aws", S3ForcePathStyle: false, + TagObjectAfterIngestion: true, }, StartTime: "2024-01-01", EndTime: "2024-01-01", @@ -57,6 +58,7 @@ func TestConfig_Validate_Valid(t *testing.T) { Endpoint: "", EndpointPartitionID: "aws", S3ForcePathStyle: false, + TagObjectAfterIngestion: true, }, SQS: &SQSConfig{ QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue", @@ -93,6 +95,7 @@ func TestLoadConfig(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", FilePrefixIncludeTelemetryType: true, EndpointPartitionID: "aws", + TagObjectAfterIngestion: false, }, StartTime: "2024-01-31 15:00", EndTime: "2024-02-03", @@ -109,6 +112,7 @@ func TestLoadConfig(t *testing.T) { FilePrefix: "otel", FilePrefixIncludeTelemetryType: false, EndpointPartitionID: "aws", + TagObjectAfterIngestion: false, }, StartTime: "2024-01-31 15:00", EndTime: "2024-02-03", @@ -136,6 +140,7 @@ func TestLoadConfig(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", FilePrefixIncludeTelemetryType: true, EndpointPartitionID: "aws", + TagObjectAfterIngestion: true, }, StartTime: "2024-01-31T15:00:00Z", EndTime: "2024-02-03T00:00:00Z", @@ -150,6 +155,7 @@ func TestLoadConfig(t *testing.T) { S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M", FilePrefixIncludeTelemetryType: true, EndpointPartitionID: "aws", + TagObjectAfterIngestion: false, }, SQS: &SQSConfig{ QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue", diff --git a/receiver/awss3receiver/s3intf.go b/receiver/awss3receiver/s3intf.go index 9a8cf9c9dc1f0..8fab559c581af 100644 --- a/receiver/awss3receiver/s3intf.go +++ b/receiver/awss3receiver/s3intf.go @@ -11,6 +11,12 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/config" "github.com/aws/aws-sdk-go-v2/service/s3" + "github.com/aws/aws-sdk-go-v2/service/s3/types" +) + +const ( + ingestedTag = "otel-collector:status" + ingestedStatus = "ingested" ) type ListObjectsV2Pager interface { @@ -22,15 +28,16 @@ type ListObjectsAPI interface { NewListObjectsV2Paginator(params *s3.ListObjectsV2Input) ListObjectsV2Pager } -type GetObjectAPI interface { +type SingleObjectAPI interface { GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) } type s3ListObjectsAPIImpl struct { client *s3.Client } -func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) { +func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, SingleObjectAPI, error) { optionsFuncs := make([]func(*config.LoadOptions) error, 0) if cfg.Region != "" { optionsFuncs = append(optionsFuncs, config.WithRegion(cfg.Region)) @@ -52,7 +59,6 @@ func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, G }) } client := s3.NewFromConfig(awsCfg, s3OptionFuncs...) - return &s3ListObjectsAPIImpl{client: client}, client, nil } @@ -61,7 +67,7 @@ func (api *s3ListObjectsAPIImpl) NewListObjectsV2Paginator(params *s3.ListObject } // retrieveS3Object retrieves S3 object content for a given bucket and key -func retrieveS3Object(ctx context.Context, client GetObjectAPI, bucket, key string) ([]byte, error) { +func retrieveS3Object(ctx context.Context, client SingleObjectAPI, bucket, key string) ([]byte, error) { params := s3.GetObjectInput{ Bucket: &bucket, Key: &key, @@ -77,3 +83,21 @@ func retrieveS3Object(ctx context.Context, client GetObjectAPI, bucket, key stri } return contents, nil } + +// tagS3Object tags an S3 object for a given bucket and key +func tagS3Object(ctx context.Context, client SingleObjectAPI, bucket, key string) error { + params := s3.PutObjectTaggingInput{ + Bucket: &bucket, + Key: &key, + Tagging: &types.Tagging{ + TagSet: []types.Tag{ + { + Key: aws.String(ingestedTag), + Value: aws.String(ingestedStatus), + }, + }, + }, + } + _, err := client.PutObjectTagging(ctx, ¶ms) + return err +} diff --git a/receiver/awss3receiver/s3reader.go b/receiver/awss3receiver/s3reader.go index 2e869eafbec5c..2ca977aea23b6 100644 --- a/receiver/awss3receiver/s3reader.go +++ b/receiver/awss3receiver/s3reader.go @@ -18,7 +18,7 @@ type s3TimeBasedReader struct { logger *zap.Logger listObjectsClient ListObjectsAPI - getObjectClient GetObjectAPI + singleObjectClient SingleObjectAPI s3Bucket string s3Prefix string s3PartitionFormat string @@ -28,10 +28,11 @@ type s3TimeBasedReader struct { startTime time.Time endTime time.Time notifier statusNotifier + tagObjectAfterIngestion bool } func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger *zap.Logger, cfg *Config) (*s3TimeBasedReader, error) { - listObjectsClient, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader) + listObjectsClient, singleObjectClient, err := newS3Client(ctx, cfg.S3Downloader) if err != nil { return nil, err } @@ -57,7 +58,7 @@ func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger * return &s3TimeBasedReader{ logger: logger, listObjectsClient: listObjectsClient, - getObjectClient: getObjectClient, + singleObjectClient: singleObjectClient, s3Bucket: cfg.S3Downloader.S3Bucket, s3Prefix: cfg.S3Downloader.S3Prefix, filePrefix: cfg.S3Downloader.FilePrefix, @@ -67,6 +68,7 @@ func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger * startTime: startTime, endTime: endTime, notifier: notifier, + tagObjectAfterIngestion: cfg.S3Downloader.TagObjectAfterIngestion, }, nil } @@ -144,13 +146,28 @@ func (s3Reader *s3TimeBasedReader) readTelemetryForTime(ctx context.Context, t t s3Reader.logger.Info("No telemetry found for time", zap.String("prefix", prefix), zap.Time("time", t)) } else { for _, obj := range page.Contents { - data, err := retrieveS3Object(ctx, s3Reader.getObjectClient, s3Reader.s3Bucket, *obj.Key) + data, err := retrieveS3Object(ctx, s3Reader.singleObjectClient, s3Reader.s3Bucket, *obj.Key) if err != nil { return err } + s3Reader.logger.Debug("Retrieved telemetry", zap.String("key", *obj.Key)) - if err := dataCallback(ctx, *obj.Key, data); err != nil { - return err + if callbackErr := dataCallback(ctx, *obj.Key, data); callbackErr != nil { + return callbackErr + } + + if s3Reader.tagObjectAfterIngestion { + if err = tagS3Object(ctx, s3Reader.singleObjectClient, s3Reader.s3Bucket, *obj.Key); err != nil { + s3Reader.logger.Warn("Failed to tag S3 object", + zap.String("bucket", s3Reader.s3Bucket), + zap.String("key", *obj.Key), + zap.Error(err)) + // Don't return error as the object was processed successfully + } else { + s3Reader.logger.Debug("Tagged S3 object", + zap.String("bucket", s3Reader.s3Bucket), + zap.String("key", *obj.Key)) + } } } } diff --git a/receiver/awss3receiver/s3reader_test.go b/receiver/awss3receiver/s3reader_test.go index ad163de2db3a2..8d0910c9a14a7 100644 --- a/receiver/awss3receiver/s3reader_test.go +++ b/receiver/awss3receiver/s3reader_test.go @@ -183,10 +183,24 @@ func Test_s3Reader_getObjectPrefixForTime(t *testing.T) { } } -type mockGetObjectAPI func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) +type mockSingleObjectAPI struct { + getObjectFunc func(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) + putObjectTaggingFunc func(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) +} -func (m mockGetObjectAPI) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - return m(ctx, params, optFns...) +func (m *mockSingleObjectAPI) GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + if m.getObjectFunc != nil { + return m.getObjectFunc(ctx, params, optFns...) + } + return nil, errors.New("GetObject not mocked") +} + +func (m *mockSingleObjectAPI) PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) { + if m.putObjectTaggingFunc != nil { + return m.putObjectTaggingFunc(ctx, params, optFns...) + } + // Default to success if no mock function provided + return &s3.PutObjectTaggingOutput{}, nil } type mockListObjectsAPI func(params *s3.ListObjectsV2Input) ListObjectsV2Pager @@ -246,14 +260,16 @@ func Test_readTelemetryForTime(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - require.Contains(t, []string{testKey1, testKey2}, *params.Key) - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), - }, nil - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Contains(t, []string{testKey1, testKey2}, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3PartitionFormat: s3PartitionFormatDefault, @@ -299,12 +315,14 @@ func Test_readTelemetryForTime_GetObjectError(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - require.Equal(t, testKey, *params.Key) - return nil, testError - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return nil, testError + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3PartitionFormat: s3PartitionFormatDefault, @@ -334,14 +352,16 @@ func Test_readTelemetryForTime_ListObjectsNoResults(t *testing.T) { return &mockListObjectsV2Pager{} }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - require.Equal(t, testKey, *params.Key) - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), - }, nil - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3PartitionFormat: s3PartitionFormatDefault, @@ -383,14 +403,16 @@ func Test_readTelemetryForTime_NextPageError(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - require.Equal(t, testKey, *params.Key) - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), - }, nil - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3PartitionFormat: s3PartitionFormatDefault, @@ -444,13 +466,15 @@ func Test_readAll(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), - }, nil - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3Prefix: "", @@ -494,13 +518,15 @@ func Test_readAll_StatusMessages(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), - }, nil - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3Prefix: "", @@ -566,13 +592,15 @@ func Test_readAll_ContextDone(t *testing.T) { }, } }), - getObjectClient: mockGetObjectAPI(func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { - t.Helper() - require.Equal(t, "bucket", *params.Bucket) - return &s3.GetObjectOutput{ - Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), - }, nil - }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + }, logger: zap.NewNop(), s3Bucket: "bucket", s3Prefix: "", @@ -613,6 +641,122 @@ func Test_readAll_ContextDone(t *testing.T) { }, notifier.messages) } +func Test_readTelemetryForTime_WithTag(t *testing.T) { + testKey := "year=2023/month=01/day=02/hour=03/minute=04/traces_test" + taggedKeys := make([]string, 0) + + reader := &s3TimeBasedReader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, "year=2023/month=01/day=02/hour=03/minute=04/traces_", *params.Prefix) + + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + {Key: &testKey}, + }, + }, + }, + } + }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + putObjectTaggingFunc: func(_ context.Context, params *s3.PutObjectTaggingInput, _ ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + taggedKeys = append(taggedKeys, *params.Key) + return &s3.PutObjectTaggingOutput{}, nil + }, + }, + logger: zap.NewNop(), + s3Bucket: "bucket", + s3PartitionFormat: s3PartitionFormatDefault, + S3PartitionTimeLocation: time.UTC, + filePrefix: "", + filePrefixIncludeTelemetryType: true, + tagObjectAfterIngestion: true, // Enable tagging + } + + testTime, err := time.Parse(time.RFC3339, "2023-01-02T03:04:05Z") + require.NoError(t, err) + + dataCallbackKeys := make([]string, 0) + err = reader.readTelemetryForTime(t.Context(), testTime, "traces", func(_ context.Context, key string, _ []byte) error { + t.Helper() + dataCallbackKeys = append(dataCallbackKeys, key) + return nil + }) + require.NoError(t, err) + require.Equal(t, []string{testKey}, dataCallbackKeys) + require.Equal(t, []string{testKey}, taggedKeys, "Object should be tagged after successful ingestion") +} + +func Test_readTelemetryForTime_TagFailure(t *testing.T) { + testKey := "year=2023/month=01/day=02/hour=03/minute=04/traces_test" + + reader := &s3TimeBasedReader{ + listObjectsClient: mockListObjectsAPI(func(params *s3.ListObjectsV2Input) ListObjectsV2Pager { + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, "year=2023/month=01/day=02/hour=03/minute=04/traces_", *params.Prefix) + + return &mockListObjectsV2Pager{ + Pages: []*s3.ListObjectsV2Output{ + { + Contents: []types.Object{ + {Key: &testKey}, + }, + }, + }, + } + }), + singleObjectClient: &mockSingleObjectAPI{ + getObjectFunc: func(_ context.Context, params *s3.GetObjectInput, _ ...func(*s3.Options)) (*s3.GetObjectOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return &s3.GetObjectOutput{ + Body: io.NopCloser(bytes.NewReader([]byte("this is the body of the object"))), + }, nil + }, + putObjectTaggingFunc: func(_ context.Context, params *s3.PutObjectTaggingInput, _ ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) { + t.Helper() + require.Equal(t, "bucket", *params.Bucket) + require.Equal(t, testKey, *params.Key) + return nil, errors.New("tagging failed") + }, + }, + logger: zap.NewNop(), + s3Bucket: "bucket", + s3PartitionFormat: s3PartitionFormatDefault, + S3PartitionTimeLocation: time.UTC, + filePrefix: "", + filePrefixIncludeTelemetryType: true, + tagObjectAfterIngestion: true, // Enable tagging + } + + testTime, err := time.Parse(time.RFC3339, "2023-01-02T03:04:05Z") + require.NoError(t, err) + + dataCallbackKeys := make([]string, 0) + // Should not return error even if tagging fails + err = reader.readTelemetryForTime(t.Context(), testTime, "traces", func(_ context.Context, key string, _ []byte) error { + t.Helper() + dataCallbackKeys = append(dataCallbackKeys, key) + return nil + }) + require.NoError(t, err, "Should not fail when tagging fails") + require.Equal(t, []string{testKey}, dataCallbackKeys, "Data should still be processed") +} + func Test_determineTimestep(t *testing.T) { tests := []struct { name string diff --git a/receiver/awss3receiver/s3sqsreader.go b/receiver/awss3receiver/s3sqsreader.go index 76805b9a525d1..d5c50c641f278 100644 --- a/receiver/awss3receiver/s3sqsreader.go +++ b/receiver/awss3receiver/s3sqsreader.go @@ -13,6 +13,7 @@ import ( "time" "github.com/aws/aws-sdk-go-v2/aws" + "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sqs" "go.uber.org/zap" ) @@ -57,14 +58,15 @@ type snsMessage struct { // s3SQSNotificationReader listens for SNS notifications about new S3 objects type s3SQSNotificationReader struct { - logger *zap.Logger - s3Client GetObjectAPI - sqsClient sqsClient - queueURL string - s3Bucket string - s3Prefix string - maxNumberOfMessages int32 - waitTimeSeconds int32 + logger *zap.Logger + s3Client SingleObjectAPI + sqsClient sqsClient + queueURL string + s3Bucket string + s3Prefix string + maxNumberOfMessages int32 + waitTimeSeconds int32 + tagObjectAfterIngestion bool } func newS3SQSReader(ctx context.Context, logger *zap.Logger, cfg *Config) (*s3SQSNotificationReader, error) { @@ -77,7 +79,7 @@ func newS3SQSReader(ctx context.Context, logger *zap.Logger, cfg *Config) (*s3SQ return nil, fmt.Errorf("failed to create SQS client: %w", err) } - _, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader) + _, singleObjectClient, err := newS3Client(ctx, cfg.S3Downloader) if err != nil { return nil, err } @@ -94,14 +96,15 @@ func newS3SQSReader(ctx context.Context, logger *zap.Logger, cfg *Config) (*s3SQ } return &s3SQSNotificationReader{ - logger: logger, - s3Client: getObjectClient, - sqsClient: sqsAPIClient, - queueURL: cfg.SQS.QueueURL, - s3Bucket: cfg.S3Downloader.S3Bucket, - s3Prefix: cfg.S3Downloader.S3Prefix, - maxNumberOfMessages: maxMessages, - waitTimeSeconds: waitTime, + logger: logger, + s3Client: singleObjectClient, + sqsClient: sqsAPIClient, + queueURL: cfg.SQS.QueueURL, + s3Bucket: cfg.S3Downloader.S3Bucket, + s3Prefix: cfg.S3Downloader.S3Prefix, + maxNumberOfMessages: maxMessages, + waitTimeSeconds: waitTime, + tagObjectAfterIngestion: cfg.S3Downloader.TagObjectAfterIngestion, }, nil } @@ -204,11 +207,16 @@ func (r *s3SQSNotificationReader) readAll(ctx context.Context, _ string, callbac var content []byte content, err = retrieveS3Object(ctx, r.s3Client, bucket, decodedKey) if err != nil { - r.logger.Error("Failed to get S3 object", + r.logger.Warn("Failed to get S3 object", zap.String("bucket", bucket), zap.String("key", decodedKey), zap.Error(err)) - allRecordsSucceeded = false + + var noSuchKey *types.NoSuchKey + if !errors.As(err, &noSuchKey) { + // Swallow no such key errors as nothing more can be done + allRecordsSucceeded = false + } continue } @@ -220,6 +228,21 @@ func (r *s3SQSNotificationReader) readAll(ctx context.Context, _ string, callbac allRecordsSucceeded = false continue } + + if r.tagObjectAfterIngestion { + err = tagS3Object(ctx, r.s3Client, bucket, decodedKey) + if err != nil { + r.logger.Warn("Failed to tag S3 object", + zap.String("bucket", bucket), + zap.String("key", decodedKey), + zap.Error(err)) + // Don't mark as failed as the object was processed successfully + } else { + r.logger.Debug("Tagged S3 object", + zap.String("bucket", bucket), + zap.String("key", decodedKey)) + } + } } // Only delete the message if all records were successfully processed. diff --git a/receiver/awss3receiver/s3sqsreader_test.go b/receiver/awss3receiver/s3sqsreader_test.go index c63649c90600f..9a33402555ccd 100644 --- a/receiver/awss3receiver/s3sqsreader_test.go +++ b/receiver/awss3receiver/s3sqsreader_test.go @@ -14,6 +14,7 @@ import ( "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/service/s3" + s3types "github.com/aws/aws-sdk-go-v2/service/s3/types" "github.com/aws/aws-sdk-go-v2/service/sqs" "github.com/aws/aws-sdk-go-v2/service/sqs/types" "github.com/stretchr/testify/assert" @@ -37,12 +38,19 @@ func (m *mockS3ClientSQS) GetObject(ctx context.Context, params *s3.GetObjectInp if !ok { return nil, errors.New("unexpected type for mock GetObject content") } - return &s3.GetObjectOutput{ Body: io.NopCloser(strings.NewReader(string(content))), }, args.Error(1) } +func (m *mockS3ClientSQS) PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, _ ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error) { + args := m.Called(ctx, params) + if args.Get(0) == nil { + return nil, args.Error(1) + } + return args.Get(0).(*s3.PutObjectTaggingOutput), args.Error(1) +} + type mockSQSClient struct { mock.Mock } @@ -408,14 +416,15 @@ func TestS3SQSReader_ReadAllErrorHandling(t *testing.T) { // Create reader with mocks reader := &s3SQSNotificationReader{ - logger: logger, - s3Client: mockS3, - sqsClient: mockSQS, - queueURL: cfg.SQS.QueueURL, - s3Bucket: cfg.S3Downloader.S3Bucket, - s3Prefix: cfg.S3Downloader.S3Prefix, - maxNumberOfMessages: 10, - waitTimeSeconds: 20, + logger: logger, + s3Client: mockS3, + sqsClient: mockSQS, + queueURL: cfg.SQS.QueueURL, + s3Bucket: cfg.S3Downloader.S3Bucket, + s3Prefix: cfg.S3Downloader.S3Prefix, + maxNumberOfMessages: 10, + waitTimeSeconds: 20, + tagObjectAfterIngestion: true, } // Mock error during receive messages @@ -432,20 +441,21 @@ func TestS3SQSReader_ReadAllErrorHandling(t *testing.T) { assert.Equal(t, context.Canceled, err) }) - t.Run("does not delete message on S3 retrieval error", func(t *testing.T) { + t.Run("does not delete message or tag object on S3 retrieval error", func(t *testing.T) { mockS3 := new(mockS3ClientSQS) mockSQS := new(mockSQSClient) // Create reader with mocks reader := &s3SQSNotificationReader{ - logger: logger, - s3Client: mockS3, - sqsClient: mockSQS, - queueURL: cfg.SQS.QueueURL, - s3Bucket: cfg.S3Downloader.S3Bucket, - s3Prefix: cfg.S3Downloader.S3Prefix, - maxNumberOfMessages: 10, - waitTimeSeconds: 20, + logger: logger, + s3Client: mockS3, + sqsClient: mockSQS, + queueURL: cfg.SQS.QueueURL, + s3Bucket: cfg.S3Downloader.S3Bucket, + s3Prefix: cfg.S3Downloader.S3Prefix, + maxNumberOfMessages: 10, + waitTimeSeconds: 20, + tagObjectAfterIngestion: true, } // Create S3 event notification @@ -519,21 +529,23 @@ func TestS3SQSReader_ReadAllErrorHandling(t *testing.T) { mockS3.AssertExpectations(t) mockSQS.AssertExpectations(t) mockSQS.AssertNotCalled(t, "DeleteMessage", mock.Anything, mock.Anything) + mockS3.AssertNotCalled(t, "PutObjectTagging", mock.Anything, mock.Anything) }) - t.Run("does not delete message on partial failure", func(t *testing.T) { + t.Run("does not delete message or tag object on partial failure", func(t *testing.T) { mockS3 := new(mockS3ClientSQS) mockSQS := new(mockSQSClient) reader := &s3SQSNotificationReader{ - logger: logger, - s3Client: mockS3, - sqsClient: mockSQS, - queueURL: cfg.SQS.QueueURL, - s3Bucket: cfg.S3Downloader.S3Bucket, - s3Prefix: cfg.S3Downloader.S3Prefix, - maxNumberOfMessages: 10, - waitTimeSeconds: 20, + logger: logger, + s3Client: mockS3, + sqsClient: mockSQS, + queueURL: cfg.SQS.QueueURL, + s3Bucket: cfg.S3Downloader.S3Bucket, + s3Prefix: cfg.S3Downloader.S3Prefix, + maxNumberOfMessages: 10, + waitTimeSeconds: 20, + tagObjectAfterIngestion: true, } // Create S3 event notification with THREE objects: @@ -597,6 +609,9 @@ func TestS3SQSReader_ReadAllErrorHandling(t *testing.T) { Bucket: aws.String("test-bucket"), Key: aws.String("success-key"), }).Return([]byte("success-content"), nil) + mockS3.On("PutObjectTagging", mock.Anything, mock.MatchedBy(func(input *s3.PutObjectTaggingInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == "success-key" + })).Return(&s3.PutObjectTaggingOutput{}, nil) // Second S3 object FAILS to retrieve. mockS3.On("GetObject", mock.Anything, &s3.GetObjectInput{ @@ -638,21 +653,30 @@ func TestS3SQSReader_ReadAllErrorHandling(t *testing.T) { mockSQS.AssertExpectations(t) // Verify DeleteMessage was never called - message should remain for retry. mockSQS.AssertNotCalled(t, "DeleteMessage", mock.Anything, mock.Anything) + mockS3.AssertNotCalled(t, "PutObjectTagging", mock.Anything, &s3.PutObjectTaggingInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String("s3-failure-key"), + }) + mockS3.AssertNotCalled(t, "PutObjectTagging", mock.Anything, &s3.PutObjectTaggingInput{ + Bucket: aws.String("test-bucket"), + Key: aws.String("callback-failure-key"), + }) }) - t.Run("does not delete message on callback error", func(t *testing.T) { + t.Run("does not delete message or tag object on callback error", func(t *testing.T) { mockS3 := new(mockS3ClientSQS) mockSQS := new(mockSQSClient) reader := &s3SQSNotificationReader{ - logger: logger, - s3Client: mockS3, - sqsClient: mockSQS, - queueURL: cfg.SQS.QueueURL, - s3Bucket: cfg.S3Downloader.S3Bucket, - s3Prefix: cfg.S3Downloader.S3Prefix, - maxNumberOfMessages: 10, - waitTimeSeconds: 20, + logger: logger, + s3Client: mockS3, + sqsClient: mockSQS, + queueURL: cfg.SQS.QueueURL, + s3Bucket: cfg.S3Downloader.S3Bucket, + s3Prefix: cfg.S3Downloader.S3Prefix, + maxNumberOfMessages: 10, + waitTimeSeconds: 20, + tagObjectAfterIngestion: true, } s3Event := s3EventNotification{ @@ -716,6 +740,84 @@ func TestS3SQSReader_ReadAllErrorHandling(t *testing.T) { mockSQS.AssertExpectations(t) // Verify DeleteMessage was never called - message should remain for retry. mockSQS.AssertNotCalled(t, "DeleteMessage", mock.Anything, mock.Anything) + mockS3.AssertNotCalled(t, "PutObjectTagging", mock.Anything, mock.Anything) + }) + t.Run("deletes message if object is not found", func(t *testing.T) { + mockSQS := new(mockSQSClient) + mockS3 := new(mockS3ClientSQS) + + s3Event := s3EventNotification{ + Records: []s3EventRecord{ + { + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + S3: s3Data{ + Bucket: s3BucketData{Name: "test-bucket"}, + Object: s3ObjectData{Key: "test-key"}, + }, + }, + }, + } + + s3EventJSON, err := json.Marshal(s3Event) + require.NoError(t, err) + + receiptHandle := "test-receipt-handle" + + // S3 returns object not found + mockS3.On("GetObject", mock.Anything, mock.MatchedBy(func(input *s3.GetObjectInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == "test-key" + })).Return([]byte(""), &s3types.NoSuchKey{Message: aws.String("The specified key does not exist.")}) + + mockSQS.On("ReceiveMessage", mock.Anything, mock.MatchedBy(func(input *sqs.ReceiveMessageInput) bool { + return *input.QueueUrl == "test-queue-url" + })).Return(&sqs.ReceiveMessageOutput{ + Messages: []types.Message{ + { + Body: aws.String(string(s3EventJSON)), + ReceiptHandle: &receiptHandle, + }, + }, + }, nil).Once() + + mockSQS.On("ReceiveMessage", mock.Anything, mock.Anything).Return( + (*sqs.ReceiveMessageOutput)(nil), context.DeadlineExceeded, + ) + + // Message should be deleted + mockSQS.On("DeleteMessage", mock.Anything, mock.MatchedBy(func(input *sqs.DeleteMessageInput) bool { + return *input.QueueUrl == "test-queue-url" && *input.ReceiptHandle == "test-receipt-handle" + })).Return(&sqs.DeleteMessageOutput{}, nil) + + reader := &s3SQSNotificationReader{ + logger: zap.NewNop(), + s3Client: mockS3, + sqsClient: mockSQS, + queueURL: "test-queue-url", + s3Bucket: "test-bucket", + s3Prefix: "", + maxNumberOfMessages: 10, + waitTimeSeconds: 20, + tagObjectAfterIngestion: true, // Enable deletion + } + + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + + callbackCalled := false + err = reader.readAll(ctx, "test-telemetry", func(_ context.Context, _ string, _ []byte) error { + callbackCalled = true + return nil + }) + + // Context deadline exceeded is expected + assert.Equal(t, context.DeadlineExceeded, err) + + mockSQS.AssertExpectations(t) + mockS3.AssertExpectations(t) + + // Callback never called because there is no object to process + assert.False(t, callbackCalled, "Callback should have been called") }) } @@ -878,6 +980,219 @@ func TestS3SQSReader_ReadAllWithPrefix(t *testing.T) { assert.Equal(t, []byte("second-matching-content"), processedKeys["logs/matched-key-2"]) } +func TestS3SQSReader_Tag(t *testing.T) { + testCases := []struct { + name string + s3Records []s3EventRecord + setupMocks func(*mockS3ClientSQS, *mockSQSClient, map[string]bool) + expectedProcessedKeys map[string][]byte + expectedTaggedKeys []string + }{ + { + name: "single object", + s3Records: []s3EventRecord{ + { + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + S3: s3Data{ + Bucket: s3BucketData{Name: "test-bucket"}, + Object: s3ObjectData{Key: "test-key"}, + }, + }, + }, + setupMocks: func(mockS3 *mockS3ClientSQS, mockSQS *mockSQSClient, taggedKeys map[string]bool) { + mockS3.On("GetObject", mock.Anything, mock.MatchedBy(func(input *s3.GetObjectInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == "test-key" + })).Return([]byte("test-content"), nil) + + mockS3.On("PutObjectTagging", mock.Anything, mock.MatchedBy(func(input *s3.PutObjectTaggingInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == "test-key" + })).Run(func(args mock.Arguments) { + input := args.Get(1).(*s3.PutObjectTaggingInput) + taggedKeys[*input.Key] = true + }).Return(&s3.PutObjectTaggingOutput{}, nil) + + // Mock successful message deletion + mockSQS.On("DeleteMessage", mock.Anything, mock.MatchedBy(func(input *sqs.DeleteMessageInput) bool { + return *input.QueueUrl == "test-queue-url" && *input.ReceiptHandle == "test-receipt-handle" + })).Return(&sqs.DeleteMessageOutput{}, nil) + }, + expectedProcessedKeys: map[string][]byte{ + "test-key": []byte("test-content"), + }, + expectedTaggedKeys: []string{"test-key"}, + }, + { + name: "tag error", + s3Records: []s3EventRecord{ + { + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + S3: s3Data{ + Bucket: s3BucketData{Name: "test-bucket"}, + Object: s3ObjectData{Key: "test-key"}, + }, + }, + }, + setupMocks: func(mockS3 *mockS3ClientSQS, mockSQS *mockSQSClient, _ map[string]bool) { + mockS3.On("GetObject", mock.Anything, mock.MatchedBy(func(input *s3.GetObjectInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == "test-key" + })).Return([]byte("test-content"), nil) + + // Mock tag failure + mockS3.On("PutObjectTagging", mock.Anything, mock.MatchedBy(func(input *s3.PutObjectTaggingInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == "test-key" + })).Return((*s3.PutObjectTaggingOutput)(nil), errors.New("access denied")) + + // Mock successful message deletion (should still delete message even if S3 tagging fails) + mockSQS.On("DeleteMessage", mock.Anything, mock.MatchedBy(func(input *sqs.DeleteMessageInput) bool { + return *input.QueueUrl == "test-queue-url" && *input.ReceiptHandle == "test-receipt-handle" + })).Return(&sqs.DeleteMessageOutput{}, nil) + }, + expectedProcessedKeys: map[string][]byte{ + "test-key": []byte("test-content"), + }, + expectedTaggedKeys: []string{}, + }, + { + name: "multiple objects", + s3Records: []s3EventRecord{ + { + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + S3: s3Data{ + Bucket: s3BucketData{Name: "test-bucket"}, + Object: s3ObjectData{Key: "key1"}, + }, + }, + { + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + S3: s3Data{ + Bucket: s3BucketData{Name: "test-bucket"}, + Object: s3ObjectData{Key: "key2"}, + }, + }, + { + EventSource: "aws:s3", + EventName: "ObjectCreated:Put", + S3: s3Data{ + Bucket: s3BucketData{Name: "test-bucket"}, + Object: s3ObjectData{Key: "key3"}, + }, + }, + }, + setupMocks: func(mockS3 *mockS3ClientSQS, mockSQS *mockSQSClient, taggedKeys map[string]bool) { + // Mock GetObject and DeleteObject for multiple keys + keys := []string{"key1", "key2", "key3"} + for _, key := range keys { + keyCopy := key // Capture key in closure + mockS3.On("GetObject", mock.Anything, mock.MatchedBy(func(input *s3.GetObjectInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == keyCopy + })).Return([]byte("content-"+keyCopy), nil) + + // Mock successful tagging for each key + mockS3.On("PutObjectTagging", mock.Anything, mock.MatchedBy(func(input *s3.PutObjectTaggingInput) bool { + return *input.Bucket == "test-bucket" && *input.Key == keyCopy + })).Run(func(args mock.Arguments) { + input := args.Get(1).(*s3.PutObjectTaggingInput) + taggedKeys[*input.Key] = true + }).Return(&s3.PutObjectTaggingOutput{}, nil) + } + + // Mock successful message deletion + mockSQS.On("DeleteMessage", mock.Anything, mock.MatchedBy(func(input *sqs.DeleteMessageInput) bool { + return *input.QueueUrl == "test-queue-url" && *input.ReceiptHandle == "test-receipt-handle" + })).Return(&sqs.DeleteMessageOutput{}, nil) + }, + expectedProcessedKeys: map[string][]byte{ + "key1": []byte("content-key1"), + "key2": []byte("content-key2"), + "key3": []byte("content-key3"), + }, + expectedTaggedKeys: []string{"key1", "key2", "key3"}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Create mock clients + mockSQS := new(mockSQSClient) + mockS3 := new(mockS3ClientSQS) + + // Track tagged objects + taggedKeys := make(map[string]bool) + + // Create S3 event notification + s3Event := s3EventNotification{ + Records: tc.s3Records, + } + + s3EventJSON, err := json.Marshal(s3Event) + require.NoError(t, err) + + receiptHandle := "test-receipt-handle" + + // Mock SQS ReceiveMessage to return S3 event, then context timeout + mockSQS.On("ReceiveMessage", mock.Anything, mock.MatchedBy(func(input *sqs.ReceiveMessageInput) bool { + return *input.QueueUrl == "test-queue-url" + })).Return(&sqs.ReceiveMessageOutput{ + Messages: []types.Message{ + { + Body: aws.String(string(s3EventJSON)), + ReceiptHandle: &receiptHandle, + }, + }, + }, nil).Once() + + // Second call returns timeout to end the loop + mockSQS.On("ReceiveMessage", mock.Anything, mock.Anything).Return( + (*sqs.ReceiveMessageOutput)(nil), context.DeadlineExceeded, + ) + + // Setup test-specific mocks + tc.setupMocks(mockS3, mockSQS, taggedKeys) + + reader := &s3SQSNotificationReader{ + logger: zap.NewNop(), + s3Client: mockS3, + sqsClient: mockSQS, + queueURL: "test-queue-url", + s3Bucket: "test-bucket", + s3Prefix: "", + maxNumberOfMessages: 10, + waitTimeSeconds: 20, + tagObjectAfterIngestion: true, // Enable deletion + } + + ctx, cancel := context.WithTimeout(t.Context(), 500*time.Millisecond) + defer cancel() + + processedKeys := make(map[string][]byte) + err = reader.readAll(ctx, "test-telemetry", func(_ context.Context, key string, content []byte) error { + processedKeys[key] = content + return nil + }) + + // Context deadline exceeded is expected + assert.Equal(t, context.DeadlineExceeded, err) + + // Verify processed objects + assert.Equal(t, tc.expectedProcessedKeys, processedKeys) + + // Verify tagged objects if needed + assert.Len(t, taggedKeys, len(tc.expectedTaggedKeys)) + for _, key := range tc.expectedTaggedKeys { + assert.True(t, taggedKeys[key], "Object %s should be tagged", key) + } + + // Verify all mock expectations were met + mockSQS.AssertExpectations(t) + mockS3.AssertExpectations(t) + }) + } +} + func TestS3SQSReader_ReadAllDirectS3TestEvent(t *testing.T) { logger := zap.NewNop() cfg := &Config{ diff --git a/receiver/awss3receiver/testdata/config.yaml b/receiver/awss3receiver/testdata/config.yaml index 9e7e3e0222f30..04a1b96f0f1a6 100644 --- a/receiver/awss3receiver/testdata/config.yaml +++ b/receiver/awss3receiver/testdata/config.yaml @@ -17,6 +17,7 @@ awss3/3: s3_partition_timezone: "Asia/Tokyo" file_prefix: "otel" file_prefix_include_telemetry_type: false + tag_object_after_ingestion: false starttime: "2024-01-31 15:00" endtime: "2024-02-03" encodings: @@ -29,6 +30,7 @@ awss3/3: awss3/4: s3downloader: s3_bucket: abucket + tag_object_after_ingestion: true starttime: "2024-01-31T15:00:00Z" endtime: "2024-02-03T00:00:00Z" awss3/5: