Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion CHANGELOG.next.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ https://github.com/elastic/beats/compare/v8.2.0\...main[Check the HEAD diff]
==== Bugfixes

*Affecting all Beats*

- Fix in AWS related services initialisation relying on custom endpoint resolver. {issue}32888[32888] {pull}32921[32921]

*Auditbeat*

Expand Down
3 changes: 1 addition & 2 deletions x-pack/filebeat/docs/inputs/input-aws-s3.asciidoc
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ Listing of the S3 bucket will be polled according the time interval defined by
The `aws-s3` input can also poll 3rd party S3 compatible services such as the self hosted Minio.
Using non-AWS S3 compatible buckets requires the use of `access_key_id` and `secret_access_key` for authentication.
To specify the S3 bucket name, use the `non_aws_bucket_name` config and the `endpoint` must be set to replace the default API endpoint.
`endpoint` should be a full URI in the form of `https(s)://<s3 endpoint>`, that will be used as the API endpoint of the service, or a single domain.
If a domain is provided, the full endpoint URI will be constructed with the region name in the standard form of `https://s3.<region>.<domain>` supported by AWS and several 3rd party providers.
`endpoint` should be a full URI in the form of `https(s)://<s3 endpoint>` in the case of `non_aws_bucket_name`, that will be used as the API endpoint of the service.
No `endpoint` is needed if using the native AWS S3 service hosted at `amazonaws.com`.
Please see <<aws-credentials-config,Configuration parameters>> for alternate AWS domains that require a different endpoint.

Expand Down
8 changes: 5 additions & 3 deletions x-pack/filebeat/input/awscloudwatch/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,9 +120,11 @@ func (in *cloudwatchInput) Run(inputContext v2.Context, pipeline beat.Pipeline)
}
defer client.Close()

logsServiceName := awscommon.CreateServiceName("logs", in.config.AWSConfig.FIPSEnabled, in.config.RegionName)
cwConfig := awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, logsServiceName, in.config.RegionName, in.awsConfig)
svc := cloudwatchlogs.NewFromConfig(cwConfig)
svc := cloudwatchlogs.NewFromConfig(in.awsConfig, func(o *cloudwatchlogs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})

logGroupNames, err := getLogGroupNames(svc, in.config.LogGroupNamePrefix, in.config.LogGroupName)
if err != nil {
Expand Down
44 changes: 34 additions & 10 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,27 +155,31 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsReader, error) {
s3ServiceName := awscommon.CreateServiceName("s3", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)
sqsServiceName := awscommon.CreateServiceName("sqs", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)

sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, sqsServiceName, in.awsConfig.Region, in.awsConfig)),
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),
queueURL: in.config.QueueURL,
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
longPollWaitTime: in.config.SQSWaitTime,
}

s3API := &awsS3API{
client: s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig)),
client: s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),
}

log := ctx.Logger.With("queue_url", in.config.QueueURL)
log.Infof("AWS api_timeout is set to %v.", in.config.APITimeout)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Infof("AWS SQS visibility_timeout is set to %v.", in.config.VisibilityTimeout)
log.Infof("AWS SQS max_number_of_messages is set to %v.", in.config.MaxNumberOfMessages)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)
Expand All @@ -195,8 +199,15 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, client beat.Client) (*sqsRe
return sqsReader, nil
}

type nonAWSBucketResolver struct {
endpoint string
}

func (n nonAWSBucketResolver) ResolveEndpoint(region string, options s3.EndpointResolverOptions) (awssdk.Endpoint, error) {
return awssdk.Endpoint{URL: n.endpoint, SigningRegion: region, HostnameImmutable: true, Source: awssdk.EndpointSourceCustom}, nil
}

func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, client beat.Client, persistentStore *statestore.Store, states *states) (*s3Poller, error) {
s3ServiceName := awscommon.CreateServiceName("s3", in.config.AWSConfig.FIPSEnabled, in.awsConfig.Region)
var bucketName string
var bucketID string
if in.config.NonAWSBucketName != "" {
Expand All @@ -207,7 +218,14 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
bucketID = in.config.BucketARN
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig), func(o *s3.Options) {
s3Client := s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint}
}

if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle
})
regionName, err := getRegionForBucket(cancelCtx, s3Client, bucketName)
Expand All @@ -220,7 +238,14 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
in.awsConfig.Region = regionName

if regionName != originalAwsConfigRegion {
s3Client = s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(in.config.AWSConfig.Endpoint, s3ServiceName, in.awsConfig.Region, in.awsConfig), func(o *s3.Options) {
s3Client = s3.NewFromConfig(in.awsConfig, func(o *s3.Options) {
if in.config.NonAWSBucketName != "" {
o.EndpointResolver = nonAWSBucketResolver{endpoint: in.config.AWSConfig.Endpoint}
}

if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
o.UsePathStyle = in.config.PathStyle
})
}
Expand All @@ -234,7 +259,6 @@ func (in *s3Input) createS3Lister(ctx v2.Context, cancelCtx context.Context, cli
log.Infof("bucket_list_interval is set to %v.", in.config.BucketListInterval)
log.Infof("bucket_list_prefix is set to %v.", in.config.BucketListPrefix)
log.Infof("AWS region is set to %v.", in.awsConfig.Region)
log.Debugf("AWS S3 service name is %v.", s3ServiceName)

metricRegistry := monitoring.GetNamespace("dataset").GetRegistry()
metrics := newInputMetrics(metricRegistry, ctx.ID)
Expand Down
4 changes: 2 additions & 2 deletions x-pack/filebeat/input/awss3/input_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -398,7 +398,7 @@ func TestGetRegionForBucketARN(t *testing.T) {
t.Fatal(err)
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", cfg))
s3Client := s3.NewFromConfig(cfg)

regionName, err := getRegionForBucket(context.Background(), s3Client, getBucketNameFromARN(tfConfig.BucketName))
assert.NoError(t, err)
Expand Down Expand Up @@ -428,7 +428,7 @@ func TestPaginatorListPrefix(t *testing.T) {
t.Fatal(err)
}

s3Client := s3.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint("", "s3", "", cfg))
s3Client := s3.NewFromConfig(cfg)

s3API := &awsS3API{
client: s3Client,
Expand Down
18 changes: 12 additions & 6 deletions x-pack/libbeat/autodiscover/providers/aws/ec2/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ package ec2
import (
"fmt"

awssdk "github.com/aws/aws-sdk-go-v2/aws"

"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/gofrs/uuid"

Expand Down Expand Up @@ -63,9 +65,11 @@ func AutodiscoverBuilder(
if config.Regions == nil {
// set default region to make initial aws api call
awsCfg.Region = "us-west-1"
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsCfg.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, awsCfg.Region, awsCfg))
svcEC2 := ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
})

completeRegionsList, err := awsauto.GetRegions(svcEC2)
if err != nil {
Expand All @@ -81,9 +85,11 @@ func AutodiscoverBuilder(
logp.Error(fmt.Errorf("error loading AWS config for aws_ec2 autodiscover provider: %w", err))
}
awsCfg.Region = region
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, region)
clients = append(clients, ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, region, awsCfg)))
clients = append(clients, ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}))
}

return internalBuilder(uuid, bus, config, newAPIFetcher(clients), keystore)
Expand Down
19 changes: 13 additions & 6 deletions x-pack/libbeat/autodiscover/providers/aws/elb/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package elb

import (
awssdk "github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/service/ec2"
"github.com/aws/aws-sdk-go-v2/service/elasticloadbalancingv2"
"github.com/gofrs/uuid"
Expand Down Expand Up @@ -64,9 +65,12 @@ func AutodiscoverBuilder(

// Construct MetricSet with a full regions list if there is no region specified.
if config.Regions == nil {
ec2ServiceName := awscommon.CreateServiceName("ec2", config.AWSConfig.FIPSEnabled, awsCfg.Region)
svcEC2 := ec2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, ec2ServiceName, awsCfg.Region, awsCfg))
svcEC2 := ec2.NewFromConfig(awsCfg, func(o *ec2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

})

completeRegionsList, err := awsauto.GetRegions(svcEC2)
if err != nil {
Expand All @@ -88,9 +92,12 @@ func AutodiscoverBuilder(
logp.Err("error loading AWS config for aws_elb autodiscover provider: %s", err)
}
awsCfg.Region = region
elbServiceName := awscommon.CreateServiceName("elasticloadbalancing", config.AWSConfig.FIPSEnabled, region)
clients = append(clients, elasticloadbalancingv2.NewFromConfig(awscommon.EnrichAWSConfigWithEndpoint(
config.AWSConfig.Endpoint, elbServiceName, region, awsCfg)))
clients = append(clients, elasticloadbalancingv2.NewFromConfig(awsCfg, func(o *elasticloadbalancingv2.Options) {
if config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}

}))
}

return internalBuilder(uuid, bus, config, newAPIFetcher(clients), keystore)
Expand Down
38 changes: 0 additions & 38 deletions x-pack/libbeat/common/aws/credentials.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"fmt"
"net/http"
"net/url"
"strings"

"github.com/aws/aws-sdk-go-v2/service/sts"

Expand Down Expand Up @@ -164,40 +163,3 @@ func addStaticCredentialsProviderToAwsConfig(beatsConfig ConfigAWS, awsConfig *a

awsConfig.Credentials = staticCredentialsProvider
}

// EnrichAWSConfigWithEndpoint function enabled endpoint resolver for AWS service clients when endpoint is given in config.
func EnrichAWSConfigWithEndpoint(endpoint string, serviceName string, regionName string, beatsConfig awssdk.Config) awssdk.Config {
var eurl string
if endpoint != "" {
parsedEndpoint, _ := url.Parse(endpoint)

// Beats uses the provided endpoint if the scheme is present or...
if parsedEndpoint.Scheme != "" {
eurl = endpoint
} else {
// ...build one by using the scheme, service and region names.
if regionName == "" {
eurl = "https://" + serviceName + "." + endpoint
} else {
eurl = "https://" + serviceName + "." + regionName + "." + endpoint
}
}

beatsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(
func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{URL: eurl}, nil
})
}
return beatsConfig
}

// CreateServiceName based on Service name, Region and FIPS. Returns service name if Fips is not enabled.
func CreateServiceName(serviceName string, fipsEnabled bool, region string) string {
if fipsEnabled {
_, found := OptionalGovCloudFIPS[serviceName]
if !strings.HasPrefix(region, "us-gov-") || found {
return serviceName + "-fips"
}
}
return serviceName
}
Loading