Skip to content

[8.16](backport #41495) x-pack/filebeat/input/awss3: support for Access Point ARN #41688

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ https://github.com/elastic/beats/compare/v8.8.1\...main[Check the HEAD diff]
- Fix request trace filename handling in http_endpoint input. {pull}39410[39410]
- Upgrade github.com/hashicorp/go-retryablehttp to mitigate CVE-2024-6104 {pull}40036[40036]
- Fix the "No such input type exist: 'salesforce'" error on the Windows/AIX platform. {pull}41664[41664]
- Add support for Access Points in the `aws-s3` input. {pull}41495[41495]

*Heartbeat*

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@
# Bucket ARN used for polling AWS S3 buckets
#bucket_arn: arn:aws:s3:::test-s3-bucket

# Access Point ARN used for polling AWS S3 buckets
#access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint

# Bucket Name used for polling non-AWS S3 buckets
#non_aws_bucket_name: test-s3-bucket

Expand Down
11 changes: 8 additions & 3 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ configuring multiline options.
[float]
==== `queue_url`

URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn` and `non_aws_bucket_name` are not set).
URL of the AWS SQS queue that messages will be received from. (Required when `bucket_arn`, `access_point_arn`, and `non_aws_bucket_name` are not set).

[float]
==== `region`
Expand Down Expand Up @@ -472,7 +472,12 @@ value is `20s`.
[float]
==== `bucket_arn`

ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url` and `non_aws_bucket_name` are not set).
ARN of the AWS S3 bucket that will be polled for list operation. (Required when `queue_url`, `access_point_arn, and `non_aws_bucket_name` are not set).

[float]
==== `access_point_arn`

ARN of the AWS S3 Access Point that will be polled for list operation. (Required when `queue_url`, `bucket_arn`, and `non_aws_bucket_name` are not set).

[float]
==== `non_aws_bucket_name`
Expand All @@ -492,7 +497,7 @@ Prefix to apply for the list request to the S3 bucket. Default empty.
[float]
==== `number_of_workers`

Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` is set, otherwise (in the SQS case) defaults to 5.
Number of workers that will process the S3 or SQS objects listed. Required when `bucket_arn` or `access_point_arn` is set, otherwise (in the SQS case) defaults to 5.


[float]
Expand Down
3 changes: 3 additions & 0 deletions x-pack/filebeat/filebeat.reference.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2990,6 +2990,9 @@ filebeat.inputs:
# Bucket ARN used for polling AWS S3 buckets
#bucket_arn: arn:aws:s3:::test-s3-bucket

# Access Point ARN used for polling AWS S3 buckets
#access_point_arn: arn:aws:s3:us-east-1:123456789:accesspoint/my-accesspoint

# Bucket Name used for polling non-AWS S3 buckets
#non_aws_bucket_name: test-s3-bucket

Expand Down
37 changes: 29 additions & 8 deletions x-pack/filebeat/input/awss3/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
"errors"
"fmt"
"net/url"
"strings"
"time"

awssdk "github.com/aws/aws-sdk-go-v2/aws"
Expand All @@ -33,6 +34,7 @@
QueueURL string `config:"queue_url"`
RegionName string `config:"region"`
BucketARN string `config:"bucket_arn"`
AccessPointARN string `config:"access_point_arn"`
NonAWSBucketName string `config:"non_aws_bucket_name"`
BucketListInterval time.Duration `config:"bucket_list_interval"`
BucketListPrefix string `config:"bucket_list_prefix"`
Expand Down Expand Up @@ -61,28 +63,32 @@
}

func (c *config) Validate() error {
configs := []bool{c.QueueURL != "", c.BucketARN != "", c.NonAWSBucketName != ""}
configs := []bool{c.QueueURL != "", c.BucketARN != "", c.AccessPointARN != "", c.NonAWSBucketName != ""}
enabled := []bool{}
for i := range configs {
if configs[i] {
enabled = append(enabled, configs[i])
}
}
if len(enabled) == 0 {
return errors.New("neither queue_url, bucket_arn nor non_aws_bucket_name were provided")
return errors.New("neither queue_url, bucket_arn, access_point_arn, nor non_aws_bucket_name were provided")
} else if len(enabled) > 1 {
return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, non_aws_bucket_name <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN, c.NonAWSBucketName)
return fmt.Errorf("queue_url <%v>, bucket_arn <%v>, access_point_arn <%v>, non_aws_bucket_name <%v> "+
"cannot be set at the same time", c.QueueURL, c.BucketARN, c.AccessPointARN, c.NonAWSBucketName)
}

if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 {
if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.BucketListInterval <= 0 {
return fmt.Errorf("bucket_list_interval <%v> must be greater than 0", c.BucketListInterval)
}

if (c.BucketARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 {
if (c.BucketARN != "" || c.AccessPointARN != "" || c.NonAWSBucketName != "") && c.NumberOfWorkers <= 0 {
return fmt.Errorf("number_of_workers <%v> must be greater than 0", c.NumberOfWorkers)
}

if c.AccessPointARN != "" && !isValidAccessPointARN(c.AccessPointARN) {
return fmt.Errorf("invalid format for access_point_arn <%v>", c.AccessPointARN)
}

if c.QueueURL != "" && (c.VisibilityTimeout <= 0 || c.VisibilityTimeout.Hours() > 12) {
return fmt.Errorf("visibility_timeout <%v> must be greater than 0 and "+
"less than or equal to 12h", c.VisibilityTimeout)
Expand Down Expand Up @@ -117,14 +123,15 @@
if c.BackupConfig.NonAWSBackupToBucketName != "" && c.NonAWSBucketName == "" {
return errors.New("backup to non-AWS bucket can only be used for non-AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" {
if c.BackupConfig.BackupToBucketArn != "" && c.BucketARN == "" && c.AccessPointARN == "" {
return errors.New("backup to AWS bucket can only be used for AWS sources")
}
if c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.NonAWSBackupToBucketName != "" {
return errors.New("backup_to_bucket_arn and non_aws_backup_to_bucket_name cannot be used together")
}
if c.BackupConfig.GetBucketName() != "" && c.QueueURL == "" {
if (c.BackupConfig.BackupToBucketArn != "" && c.BackupConfig.BackupToBucketArn == c.BucketARN) ||
if (c.BackupConfig.BackupToBucketArn != "" &&
(c.BackupConfig.BackupToBucketArn == c.BucketARN || c.BackupConfig.BackupToBucketArn == c.AccessPointARN)) ||
(c.BackupConfig.NonAWSBackupToBucketName != "" && c.BackupConfig.NonAWSBackupToBucketName == c.NonAWSBucketName) {
if c.BackupConfig.BackupToBucketPrefix == "" {
return errors.New("backup_to_bucket_prefix is a required property when source and backup bucket are the same")
Expand Down Expand Up @@ -233,6 +240,9 @@
if c.NonAWSBucketName != "" {
return c.NonAWSBucketName
}
if c.AccessPointARN != "" {
return c.AccessPointARN
}
if c.BucketARN != "" {
return getBucketNameFromARN(c.BucketARN)
}
Expand All @@ -246,6 +256,9 @@
if c.BucketARN != "" {
return c.BucketARN
}
if c.AccessPointARN != "" {
return c.AccessPointARN
}
return ""
}

Expand Down Expand Up @@ -282,7 +295,7 @@
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
if c.AWSConfig.Endpoint != "" {
o.EndpointResolver = sqs.EndpointResolverFromURL(c.AWSConfig.Endpoint)

Check failure on line 298 in x-pack/filebeat/input/awss3/config.go

View workflow job for this annotation

GitHub Actions / lint (linux)

SA1019: o.EndpointResolver is deprecated: Deprecated: EndpointResolver and WithEndpointResolver. Providing a value for this field will likely prevent you from using any endpoint-related service features released after the introduction of EndpointResolverV2 and BaseEndpoint. (staticcheck)
}
}

Expand All @@ -292,3 +305,11 @@
}
return []fileSelectorConfig{{ReaderConfig: c.ReaderConfig}}
}

// Helper function to detect if an ARN is an Access Point
func isValidAccessPointARN(arn string) bool {
parts := strings.Split(arn, ":")
return len(parts) >= 6 &&
strings.HasPrefix(parts[5], "accesspoint/") &&
len(strings.TrimPrefix(parts[5], "accesspoint/")) > 0
}
Loading
Loading