-
Notifications
You must be signed in to change notification settings - Fork 5k
[8.14] Fix handling of custom Endpoint when using S3 + SQS #39709
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0aa6f22
846de8c
c66e2d9
282619b
5346107
702fe8a
c3c3806
6416b68
46b89ec
b642c47
1f8e8d9
4956d10
12a35f4
685503a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,20 +69,38 @@ type s3Input struct { | |
|
|
||
| func newInput(config config, store beater.StateStore) (*s3Input, error) { | ||
| awsConfig, err := awscommon.InitializeAWSConfig(config.AWSConfig) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) | ||
| } | ||
|
|
||
| if config.AWSConfig.Endpoint != "" { | ||
| // Add a custom endpointResolver to the awsConfig so that all the requests are routed to this endpoint | ||
| awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { | ||
| return awssdk.Endpoint{ | ||
| PartitionID: "aws", | ||
| URL: config.AWSConfig.Endpoint, | ||
| SigningRegion: awsConfig.Region, | ||
| }, nil | ||
| }) | ||
| // The awsConfig now contains the region from the credential profile or default region | ||
| // if the region is explicitly set in the config, then it wins | ||
| if config.RegionName != "" { | ||
| awsConfig.Region = config.RegionName | ||
| } | ||
|
|
||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to initialize AWS credentials: %w", err) | ||
| // A custom endpoint has been specified! | ||
| if config.AWSConfig.Endpoint != "" { | ||
|
|
||
| // Parse a URL for the host regardless of it missing the scheme | ||
| endpointUri, err := url.Parse(config.AWSConfig.Endpoint) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to parse endpoint: %w", err) | ||
| } | ||
|
|
||
| // For backwards compat: | ||
| // If the endpoint does not start with S3, we will use the endpoint resolver to make all SDK requests use the specified endpoint | ||
| // If the endpoint does start with S3, we will use the default resolver uses the endpoint field but can replace s3 with the desired service name like sqs | ||
| if !strings.HasPrefix(endpointUri.Hostname(), "s3") { | ||
| awsConfig.EndpointResolverWithOptions = awssdk.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (awssdk.Endpoint, error) { | ||
| return awssdk.Endpoint{ | ||
| PartitionID: "aws", | ||
| Source: awssdk.EndpointSourceCustom, | ||
| URL: config.AWSConfig.Endpoint, | ||
| SigningRegion: awsConfig.Region, | ||
| }, nil | ||
| }) | ||
| } | ||
| } | ||
|
|
||
| return &s3Input{ | ||
|
|
@@ -112,16 +130,23 @@ func (in *s3Input) Run(inputContext v2.Context, pipeline beat.Pipeline) error { | |
| defer cancelInputCtx() | ||
|
|
||
| if in.config.QueueURL != "" { | ||
| regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.RegionName) | ||
| if err != nil && in.config.RegionName == "" { | ||
| return fmt.Errorf("failed to get AWS region from queue_url: %w", err) | ||
| regionName, err := getRegionFromQueueURL(in.config.QueueURL, in.config.AWSConfig.Endpoint, in.config.AWSConfig.DefaultRegion) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's uncommon to return an error and a value that the caller should use. Typically these are mutually exclusive. You either get an error OR you get values that you should use. I suggest trying to a do a small bit of refactoring to keep with those conventions.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @faec has refactored basically all of this plugin on main including undoing this but it's too different to backport. I made a series of integration tests which cover all the various combinations of settings but I'm worried refactoring this might not be worth it given it's all going away soon |
||
|
|
||
| // If we can't get a region from anywhere, error out | ||
| if err != nil && regionName == "" && in.config.RegionName == "" { | ||
| return fmt.Errorf("region not specified and failed to get AWS region from queue_url: %w", err) | ||
| } | ||
| var warn regionMismatchError | ||
| if errors.As(err, &warn) { | ||
| // Warn of mismatch, but go ahead with configured region name. | ||
| inputContext.Logger.Warnf("%v: using %q", err, regionName) | ||
| } | ||
| in.awsConfig.Region = regionName | ||
|
|
||
| // Ensure we don't overwrite region when getRegionFromURL fails | ||
| // Ensure we don't overwrite a user-specified region with a parsed region. | ||
| if regionName != "" && in.config.RegionName == "" { | ||
strawgate marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| in.awsConfig.Region = regionName | ||
| } | ||
|
|
||
| // Create SQS receiver and S3 notification processor. | ||
| receiver, err := in.createSQSReceiver(inputContext, pipeline) | ||
|
|
@@ -186,7 +211,11 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s | |
| if in.config.AWSConfig.FIPSEnabled { | ||
| o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled | ||
| } | ||
| if in.config.AWSConfig.Endpoint != "" { | ||
| o.EndpointResolver = sqs.EndpointResolverFromURL(in.config.AWSConfig.Endpoint) | ||
| } | ||
| }), | ||
|
|
||
| queueURL: in.config.QueueURL, | ||
| apiTimeout: in.config.APITimeout, | ||
| visibilityTimeout: in.config.VisibilityTimeout, | ||
|
|
@@ -198,6 +227,9 @@ func (in *s3Input) createSQSReceiver(ctx v2.Context, pipeline beat.Pipeline) (*s | |
| if in.config.AWSConfig.FIPSEnabled { | ||
| o.EndpointOptions.UseFIPSEndpoint = awssdk.FIPSEndpointStateEnabled | ||
| } | ||
| if in.config.AWSConfig.Endpoint != "" { | ||
| o.EndpointResolver = s3.EndpointResolverFromURL(in.config.AWSConfig.Endpoint) | ||
| } | ||
| o.UsePathStyle = in.config.PathStyle | ||
| }), | ||
| } | ||
|
|
@@ -322,17 +354,45 @@ var errBadQueueURL = errors.New("QueueURL is not in format: https://sqs.{REGION_ | |
|
|
||
| func getRegionFromQueueURL(queueURL string, endpoint, defaultRegion string) (region string, err error) { | ||
| // get region from queueURL | ||
| // Example for custom domain queue: https://sqs.us-east-1.abc.xyz/12345678912/test-s3-logs | ||
| // Example for sqs queue: https://sqs.us-east-1.amazonaws.com/12345678912/test-s3-logs | ||
| // Example for vpce: https://vpce-test.sqs.us-east-1.vpce.amazonaws.com/12345678912/sqs-queue | ||
| u, err := url.Parse(queueURL) | ||
| if err != nil { | ||
| return "", fmt.Errorf(queueURL + " is not a valid URL") | ||
| } | ||
|
|
||
| e, err := url.Parse(endpoint) | ||
| if err != nil { | ||
| return "", fmt.Errorf(endpoint + " is not a valid URL") | ||
| } | ||
|
|
||
| if (u.Scheme == "https" || u.Scheme == "http") && u.Host != "" { | ||
| queueHostSplit := strings.SplitN(u.Host, ".", 3) | ||
| endpointSplit := strings.SplitN(e.Host, ".", 3) | ||
| // check for sqs queue url | ||
|
|
||
| // Parse a user-provided custom endpoint | ||
| if endpoint != "" && queueHostSplit[0] == "sqs" && len(queueHostSplit) == 3 && len(endpointSplit) == 3 { | ||
| // Check if everything after the second dot in the queue url matches everything after the second dot in the endpoint | ||
| endpointMatchesQueueUrl := strings.SplitN(u.Hostname(), ".", 3)[2] == strings.SplitN(e.Hostname(), ".", 3)[2] | ||
strawgate marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| if !endpointMatchesQueueUrl { | ||
| // We couldn't resolve the URL | ||
| // We cannot infer the region by matching the endpoint and queue url, return the default region with a region mismatch warning | ||
| return defaultRegion, regionMismatchError{queueURLRegion: queueHostSplit[1], defaultRegion: endpointSplit[1]} | ||
| } | ||
|
|
||
| region = queueHostSplit[1] | ||
| if defaultRegion != "" && region != defaultRegion { | ||
| return region, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} | ||
| } | ||
| return region, nil | ||
| } | ||
|
|
||
| // Parse a standard SQS url | ||
| if len(queueHostSplit) == 3 && queueHostSplit[0] == "sqs" { | ||
| if queueHostSplit[2] == endpoint || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { | ||
| // handle endpoint with no scheme, handle endpoint with scheme | ||
| if queueHostSplit[2] == endpoint || queueHostSplit[2] == e.Host || (endpoint == "" && strings.HasPrefix(queueHostSplit[2], "amazonaws.")) { | ||
| region = queueHostSplit[1] | ||
| if defaultRegion != "" && region != defaultRegion { | ||
| return defaultRegion, regionMismatchError{queueURLRegion: region, defaultRegion: defaultRegion} | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.