Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 42 additions & 5 deletions x-pack/filebeat/input/awss3/input.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,15 +71,44 @@ func newInput(config config, store beater.StateStore) (*s3Input, error) {
awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig)

if config.AWSConfig.Endpoint != "" {
// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
endpointUri, err := url.Parse(config.AWSConfig.Endpoint)

if err != nil {
// Log the error and continue with the default endpoint
fmt.Printf("Failed to parse the endpoint: %v", err)
}

// For backwards compat, if the endpoint does not start with S3, we will use the endpoint resolver to all SDK requests through this endpoint
if !strings.HasPrefix(endpointUri.Hostname(), "s3") {
// Get the resolver from the endpoint url
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {
return awssdk.Endpoint{
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
HostnameImmutable: true,
}, nil
})
}
}

// Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint
awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) {

// If the service is S3 and the endpoint is set, return the custom endpoint AS-IS to the S3 service
if service == s3.ServiceID && config.AWSConfig.Endpoint != "" {
return awssdk.Endpoint{
PartitionID: "aws",
Source: awssdk.EndpointSourceCustom,
URL: config.AWSConfig.Endpoint,
SigningRegion: awsConfig.Region,
}, nil
})
}
}

// If the service is not S3, return an EndpointNotFoundError to let the SDK use the default endpoint resolver
return awssdk.Endpoint{}, &awssdk.EndpointNotFoundError{}
})

if err != nil {
return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err)
Expand Down Expand Up @@ -136,7 +165,11 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
// Warn of mismatch, but go ahead with configured region name.
inputContext.Logger.Warnf("%v: using %q", err, regionName)
}
in.awsConfig.Region = regionName

// If we got a region from the queue URL, use it to override the region in the AWS config.
if regionName == "" {
in.awsConfig.Region = in.config.RegionName
}

// Create SQS receiver and S3 notification processor.
receiver, err := in.createSQSReceiver(inputContext, pipeline)
Expand Down Expand Up @@ -185,12 +218,16 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error {
}

func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*sqsReader, error) {

// parse the Endpoint as a uri and extract the domain

sqsAPI := &awsSQSAPI{
client: sqs.NewFromConfig(in.awsConfig, func(o *sqs.Options) {
if in.config.AWSConfig.FIPSEnabled {
o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled
}
}),

queueURL: in.config.QueueURL,
apiTimeout: in.config.APITimeout,
visibilityTimeout: in.config.VisibilityTimeout,
Expand Down