diff --git a/x-pack/filebeat/input/awss3/input.go b/x-pack/filebeat/input/awss3/input.go index 0b33ae042f9a..f54db623a501 100644 --- a/x-pack/filebeat/input/awss3/input.go +++ b/x-pack/filebeat/input/awss3/input.go @@ -71,15 +71,44 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) { awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) if config.AWSConfig.Endpoint != "" { - // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint - awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + endpointUri, err := url.Parse(config.AWSConfig.Endpoint) + + if err != nil { + // Log the error and continue with the default endpoint + fmt.Printf("Failed to parse the endpoint: %v", err) + } + + // For backwards compat, if the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint + if !strings.HasPrefix(endpointUri.Hostname(), "s3") { + // Get the resolver from the endpoint url + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + return awssdk.Endpoint{ + PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, + URL: config.AWSConfig.Endpoint, + SigningRegion: awsConfig.Region, + HostnameImmutable: true, + }, nil + }) + } + } + + // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint + awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { + + // If the service is S3 and the endpoint is set, return the custom endpoint AS-IS to the S3 service + if service == s3.ServiceID && config.AWSConfig.Endpoint != "" { return awssdk.Endpoint{ PartitionID: "aws", + Source: awssdk.EndpointSourceCustom, URL: config.AWSConfig.Endpoint, SigningRegion: awsConfig.Region, }, nil - }) - } + } + + // If the service is not S3, return an EndpointNotFoundError to let the SDK use the default endpoint resolver + return awssdk.Endpoint{}, &awssdk.EndpointNotFoundError{} + }) if err != nil { return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) @@ -136,7 +165,11 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { // Warn of mismatch, but go ahead with configured region name. inputContext.Logger.Warnf("%v: using %q", err, regionName) } - in.awsConfig.Region = regionName + + // If we got a region from the queue URL, use it to override the region in the AWS config. + if regionName == "" { + in.awsConfig.Region = in.config.RegionName + } // Create SQS receiver and S3 notification processor. receiver, err := in.createSQSReceiver(inputContext, pipeline) @@ -185,12 +218,16 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { } func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) { + + // parse the Endpoint as a uri and extract the domain + sqsAPI := &awsSQSAPI{ client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) { if in.config.AWSConfig.FIPSEnabled { o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled } }), + queueURL: in.config.QueueURL, apiTimeout: in.config.APITimeout, visibilityTimeout: in.config.VisibilityTimeout,