Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions .chloggen/s3-receiver-delete-obj.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
# Use this changelog template to create an entry for release notes.

# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix'
change_type: enhancement

# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog)
component: receiver/awss3

# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`).
note: "add `tag_object_after_ingestion` flag to the s3 receiver so objects that have been processed can be identified"

# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists.
issues: [46078]

# (Optional) One or more lines of additional information to render under the primary note.
# These lines will be padded with 2 spaces and then inserted directly into the document.
# Use pipe (|) for multiline entries.
subtext: |
They will be tagged with `otel-collector:status=ingested`. Operators can use that tag to define bucket lifecycle policies.

# If your change doesn't affect end users or the exported elements of any package,
# you should instead start your pull request title with [chore] or use the "Skip Changelog" label.
# Optional: The change log or logs in which this entry should be included.
# e.g. '[user]' or '[user, api]'
# Include 'user' if the change is relevant to end users.
# Include 'api' if there is a change to a library API.
# Default: '[user]'
change_logs: []
23 changes: 14 additions & 9 deletions receiver/awss3receiver/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ The following exporter configuration parameters are supported.
| `suffix` | Key suffix to match against. | | Required |
| `notifications:` | | | |
| `opampextension` | Name of the OpAMP Extension to use to send ingest progress notifications. | | |
| `tag_object_after_ingestion` | If enabled the receiver will attempt to tag the object after successfully ingesting it. | false | Optional |

There are two modes of operation:

Expand All @@ -64,7 +65,7 @@ The receiver can subscribe to an SQS queue that receives S3 event notifications:
```yaml
sqs:
# Required: The ARN of the SQS queue that receives S3 bucket notifications
queue_url: "https:https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
queue_url: "https:https://sqs.us-east-1.amazonaws.com/123456789012/test-queue"
# Required: The AWS region of the SQS queue
region: "us-east-1"
```
Expand All @@ -73,15 +74,15 @@ sqs:
Time-based configuration (`starttime`/`endtime`) and SQS configuration cannot be used together.

### Time format for `starttime` and `endtime`
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
The `starttime` and `endtime` fields are used to specify the time range for which to retrieve data.
The time format is either RFC3339,`YYYY-MM-DD HH:MM` or simply `YYYY-MM-DD`, in which case the time is assumed to be `00:00`.

### Encodings
By default, the receiver understands the following encodings:
- otlp_json (OpenTelemetry Protocol format represented as json) with a suffix of `.json`
- otlp_proto (OpenTelemetry Protocol format represented as Protocol Buffers) with a suffix of `.binpb`

The `encodings` options allows you to specify Encoding Extensions to use to decode keys with matching suffixes.
The `encodings` options allows you to specify Encoding Extensions to use to decode keys with matching suffixes.


### Example Configuration
Expand All @@ -93,7 +94,7 @@ extension:
encoding: utf8
marshaling_separator: "\n"
unmarshaling_separator: "\r?\n"

receivers:
awss3:
starttime: "2024-01-01 01:00"
Expand Down Expand Up @@ -128,17 +129,17 @@ service:
traces:
receivers: [awss3/traces]
exporters: [otlp_grpc]

traces/sqs:
receivers: [awss3/sqs_traces]
exporters: [otlp_grpc]
```

## Notifications
The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of
The receiver can send notifications of ingest progress to an OpAmp server using the custom message capability of
"org.opentelemetry.collector.receiver.awss3" and message type "TimeBasedIngestStatus".
The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the
record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the
The format of the notifications is a ProtoBuf formatted OLTP logs message with a single Log Record. The `body` of the
record is set to `status` and the timestamp of the record is used to hold the ingest time. The record also has the
following attributes:

| Attribute | Description |
Expand All @@ -150,6 +151,10 @@ following attributes:
| `failure_message` | Error message if `ingest_status` is "failed". |

The "ingesting" status is sent at the beginning of the ingest process before data has been retrieved for the specified time.
If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with
If during the processing of the data an error occurs a status message with `ingest_status` set to "failed" status with
the time of the data being ingested when the failure occurred.
If the ingest process completes successfully a status message with `ingest_status` set to "completed" is sent.

## Object Lifecycle Management
If the `tag_object_after_ingestion` is enabled the receiver will make a best-effort attempt to tag objects with `otel-collector:status = ingested` after they are processed by the pipeline. This requires an additional `s3:PutObjectTagging` permission.
This tag can then be used with a lifecycle policy to expire ingested objects or transition them to cheaper storage classes.
1 change: 1 addition & 0 deletions receiver/awss3receiver/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type S3DownloaderConfig struct {
Endpoint string `mapstructure:"endpoint"`
EndpointPartitionID string `mapstructure:"endpoint_partition_id"`
S3ForcePathStyle bool `mapstructure:"s3_force_path_style"`
TagObjectAfterIngestion bool `mapstructure:"tag_object_after_ingestion"`
}

// SQSConfig holds SQS queue configuration for receiving object change notifications.
Expand Down
2 changes: 2 additions & 0 deletions receiver/awss3receiver/config.schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ $defs:
type: string
s3_prefix:
type: string
tag_object_after_ingestion:
type: boolean
sqs_config:
description: SQSConfig holds SQS queue configuration for receiving object change notifications.
type: object
Expand Down
6 changes: 6 additions & 0 deletions receiver/awss3receiver/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
Endpoint: "",
EndpointPartitionID: "aws",
S3ForcePathStyle: false,
TagObjectAfterIngestion: true,
},
StartTime: "2024-01-01",
EndTime: "2024-01-01",
Expand All @@ -57,6 +58,7 @@ func TestConfig_Validate_Valid(t *testing.T) {
Endpoint: "",
EndpointPartitionID: "aws",
S3ForcePathStyle: false,
TagObjectAfterIngestion: true,
},
SQS: &SQSConfig{
QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue",
Expand Down Expand Up @@ -93,6 +95,7 @@ func TestLoadConfig(t *testing.T) {
S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M",
FilePrefixIncludeTelemetryType: true,
EndpointPartitionID: "aws",
TagObjectAfterIngestion: false,
},
StartTime: "2024-01-31 15:00",
EndTime: "2024-02-03",
Expand All @@ -109,6 +112,7 @@ func TestLoadConfig(t *testing.T) {
FilePrefix: "otel",
FilePrefixIncludeTelemetryType: false,
EndpointPartitionID: "aws",
TagObjectAfterIngestion: false,
},
StartTime: "2024-01-31 15:00",
EndTime: "2024-02-03",
Expand Down Expand Up @@ -136,6 +140,7 @@ func TestLoadConfig(t *testing.T) {
S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M",
FilePrefixIncludeTelemetryType: true,
EndpointPartitionID: "aws",
TagObjectAfterIngestion: true,
},
StartTime: "2024-01-31T15:00:00Z",
EndTime: "2024-02-03T00:00:00Z",
Expand All @@ -150,6 +155,7 @@ func TestLoadConfig(t *testing.T) {
S3PartitionFormat: "year=%Y/month=%m/day=%d/hour=%H/minute=%M",
FilePrefixIncludeTelemetryType: true,
EndpointPartitionID: "aws",
TagObjectAfterIngestion: false,
},
SQS: &SQSConfig{
QueueURL: "https://sqs.us-east-1.amazonaws.com/123456789012/test-queue",
Expand Down
32 changes: 28 additions & 4 deletions receiver/awss3receiver/s3intf.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/aws/aws-sdk-go-v2/service/s3/types"
)

const (
ingestedTag = "otel-collector:status"
ingestedStatus = "ingested"
)

type ListObjectsV2Pager interface {
Expand All @@ -22,15 +28,16 @@ type ListObjectsAPI interface {
NewListObjectsV2Paginator(params *s3.ListObjectsV2Input) ListObjectsV2Pager
}

type GetObjectAPI interface {
type SingleObjectAPI interface {
GetObject(ctx context.Context, params *s3.GetObjectInput, optFns ...func(*s3.Options)) (*s3.GetObjectOutput, error)
PutObjectTagging(ctx context.Context, params *s3.PutObjectTaggingInput, optFns ...func(*s3.Options)) (*s3.PutObjectTaggingOutput, error)
}

type s3ListObjectsAPIImpl struct {
client *s3.Client
}

func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, GetObjectAPI, error) {
func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, SingleObjectAPI, error) {
optionsFuncs := make([]func(*config.LoadOptions) error, 0)
if cfg.Region != "" {
optionsFuncs = append(optionsFuncs, config.WithRegion(cfg.Region))
Expand All @@ -52,7 +59,6 @@ func newS3Client(ctx context.Context, cfg S3DownloaderConfig) (ListObjectsAPI, G
})
}
client := s3.NewFromConfig(awsCfg, s3OptionFuncs...)

return &s3ListObjectsAPIImpl{client: client}, client, nil
}

Expand All @@ -61,7 +67,7 @@ func (api *s3ListObjectsAPIImpl) NewListObjectsV2Paginator(params *s3.ListObject
}

// retrieveS3Object retrieves S3 object content for a given bucket and key
func retrieveS3Object(ctx context.Context, client GetObjectAPI, bucket, key string) ([]byte, error) {
func retrieveS3Object(ctx context.Context, client SingleObjectAPI, bucket, key string) ([]byte, error) {
params := s3.GetObjectInput{
Bucket: &bucket,
Key: &key,
Expand All @@ -77,3 +83,21 @@ func retrieveS3Object(ctx context.Context, client GetObjectAPI, bucket, key stri
}
return contents, nil
}

// tagS3Object tags an S3 object for a given bucket and key
func tagS3Object(ctx context.Context, client SingleObjectAPI, bucket, key string) error {
params := s3.PutObjectTaggingInput{
Bucket: &bucket,
Key: &key,
Tagging: &types.Tagging{
TagSet: []types.Tag{
{
Key: aws.String(ingestedTag),
Value: aws.String(ingestedStatus),
},
},
},
}
_, err := client.PutObjectTagging(ctx, &params)
return err
}
29 changes: 23 additions & 6 deletions receiver/awss3receiver/s3reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type s3TimeBasedReader struct {
logger *zap.Logger

listObjectsClient ListObjectsAPI
getObjectClient GetObjectAPI
singleObjectClient SingleObjectAPI
s3Bucket string
s3Prefix string
s3PartitionFormat string
Expand All @@ -28,10 +28,11 @@ type s3TimeBasedReader struct {
startTime time.Time
endTime time.Time
notifier statusNotifier
tagObjectAfterIngestion bool
}

func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger *zap.Logger, cfg *Config) (*s3TimeBasedReader, error) {
listObjectsClient, getObjectClient, err := newS3Client(ctx, cfg.S3Downloader)
listObjectsClient, singleObjectClient, err := newS3Client(ctx, cfg.S3Downloader)
if err != nil {
return nil, err
}
Expand All @@ -57,7 +58,7 @@ func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger *
return &s3TimeBasedReader{
logger: logger,
listObjectsClient: listObjectsClient,
getObjectClient: getObjectClient,
singleObjectClient: singleObjectClient,
s3Bucket: cfg.S3Downloader.S3Bucket,
s3Prefix: cfg.S3Downloader.S3Prefix,
filePrefix: cfg.S3Downloader.FilePrefix,
Expand All @@ -67,6 +68,7 @@ func newS3TimeBasedReader(ctx context.Context, notifier statusNotifier, logger *
startTime: startTime,
endTime: endTime,
notifier: notifier,
tagObjectAfterIngestion: cfg.S3Downloader.TagObjectAfterIngestion,
}, nil
}

Expand Down Expand Up @@ -144,13 +146,28 @@ func (s3Reader *s3TimeBasedReader) readTelemetryForTime(ctx context.Context, t t
s3Reader.logger.Info("No telemetry found for time", zap.String("prefix", prefix), zap.Time("time", t))
} else {
for _, obj := range page.Contents {
data, err := retrieveS3Object(ctx, s3Reader.getObjectClient, s3Reader.s3Bucket, *obj.Key)
data, err := retrieveS3Object(ctx, s3Reader.singleObjectClient, s3Reader.s3Bucket, *obj.Key)
if err != nil {
return err
}

s3Reader.logger.Debug("Retrieved telemetry", zap.String("key", *obj.Key))
if err := dataCallback(ctx, *obj.Key, data); err != nil {
return err
if callbackErr := dataCallback(ctx, *obj.Key, data); callbackErr != nil {
return callbackErr
}

if s3Reader.tagObjectAfterIngestion {
if err = tagS3Object(ctx, s3Reader.singleObjectClient, s3Reader.s3Bucket, *obj.Key); err != nil {
s3Reader.logger.Warn("Failed to tag S3 object",
zap.String("bucket", s3Reader.s3Bucket),
zap.String("key", *obj.Key),
zap.Error(err))
// Don't return error as the object was processed successfully
} else {
s3Reader.logger.Debug("Tagged S3 object",
zap.String("bucket", s3Reader.s3Bucket),
zap.String("key", *obj.Key))
}
}
}
}
Expand Down
Loading
Loading