Skip to content

Commit

Permalink
Implementing a new config QueueNames to sqs Input. (#210)
Browse files Browse the repository at this point in the history
* Implementing a new config QueueNames to sqs Input.
Queue prefixes could lead to undesired behaviour, this new option makes the use strict to specific queues

* Mock the GetQueueUrl method from sqs client

* Fix the unit tests for the new QueueNames parameter

* Add changes to Changelog.md
  • Loading branch information
danimaribeiro authored Dec 12, 2022
1 parent 59ea843 commit 7888e89
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 9 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Add file size-based rotation for `FileWriter` output (`RotateSize` option) [#203](https://github.com/AdRoll/baker/pull/203)
- Add `DiscardEmptyFiles` option to the `FileWriter` output [#204](https://github.com/AdRoll/baker/pull/204)
- Add `URLEscape` and `URLParam` filters [#206](https://github.com/AdRoll/baker/pull/206)
- Add `QueueNames` parameter to SQS Input [#210](https://github.com/AdRoll/baker/pull/210)

### Changed

Expand Down
38 changes: 30 additions & 8 deletions input/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ Supported formats (MessageFormat):
type SQSConfig struct {
AwsRegion string `help:"AWS region to connect to" default:"us-west-2"`
Bucket string `help:"S3 Bucket to use if paths do not have one" default:""`
QueuePrefixes []string `help:"Prefixes of the names of the SQS queues to monitor" required:"true"`
QueuePrefixes []string `help:"Prefixes of the names of the SQS queues to monitor"`
QueueNames []string `help:"Names of the SQS queues to monitor"`
MessageFormat string `help:"SQS message format. See help string for supported formats" default:"sns"`
MessageExpression string `help:"The expression to extract an S3 path from arbitrary message formats"`
FilePathFilter string `help:"If provided, will only use S3 files with the given path."`
Expand Down Expand Up @@ -124,6 +125,10 @@ func NewSQS(cfg baker.InputParams) (baker.Input, error) {
return nil, fmt.Errorf("SQS: %v", err)
}

if len(dcfg.QueuePrefixes) == 0 && len(dcfg.QueueNames) == 0 {
return nil, fmt.Errorf("SQS: QueuePrefixes or QueueNames must be set")
}

sqs := &SQS{
s3Input: s3Input,
Cfg: dcfg,
Expand Down Expand Up @@ -246,7 +251,19 @@ func (s *SQS) Run(inch chan<- *baker.Data) error {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

var wg sync.WaitGroup
queueUrls := []string{}

for _, queueName := range s.Cfg.QueueNames {

resp, err := s.svc.GetQueueUrlWithContext(ctx, &sqs.GetQueueUrlInput{
QueueName: aws.String(queueName),
})
if err != nil {
return err
}
queueUrls = append(queueUrls, *resp.QueueUrl)
}

for _, prefix := range s.Cfg.QueuePrefixes {

resp, err := s.svc.ListQueuesWithContext(ctx, &sqs.ListQueuesInput{
Expand All @@ -257,15 +274,20 @@ func (s *SQS) Run(inch chan<- *baker.Data) error {
}

for _, url := range resp.QueueUrls {
wg.Add(1)
go func(url string) {
defer wg.Done()

s.pollQueue(ctx, url)
}(*url)
queueUrls = append(queueUrls, *url)
}
}

var wg sync.WaitGroup
for _, url := range queueUrls {
wg.Add(1)
go func(url string) {
defer wg.Done()

s.pollQueue(ctx, url)
}(url)
}

// The correct order of operation to cleanly stop the whole pipeline is the
// following:
// - first we close the 'done' channel, this in turns cancel polling via
Expand Down
57 changes: 56 additions & 1 deletion input/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,19 @@ import (
func TestSQSParseMessage(t *testing.T) {
tests := []struct {
name string
queues []string
format, message, expression string
wantPath string
wantConfigErr, wantParseErr bool
}{
{
queues: []string{"some-queue"},
format: "plain",
message: "s3://some-bucket/with/stuff/inside",
wantPath: "s3://some-bucket/with/stuff/inside",
},
{
queues: []string{"some-queue"},
format: "sns",
message: `
{
Expand All @@ -47,6 +50,7 @@ func TestSQSParseMessage(t *testing.T) {
wantPath: "s3://another-bucket/path/to/file",
},
{
queues: []string{"some-queue"},
format: "json",
expression: "Foo.Bar",
message: `
Expand All @@ -59,6 +63,7 @@ func TestSQSParseMessage(t *testing.T) {
wantPath: "s3://another-bucket/path/to/file",
},
{
queues: []string{"some-queue"},
format: "s3::ObjectCreated",
expression: "Records[*].join('/',['s3:/', s3.bucket.name, s3.object.key]) | [0]",
message: `
Expand Down Expand Up @@ -103,6 +108,7 @@ func TestSQSParseMessage(t *testing.T) {
wantPath: "s3://mybucket/path/to/a/csv/file/in/a/bucket/file.csv.log.zst",
},
{
queues: []string{"some-queue"},
format: "json",
expression: "Records[*].join('/',['s3:/', s3.bucket.name, s3.object.key]) | [0]",
message: `
Expand Down Expand Up @@ -170,9 +176,17 @@ func TestSQSParseMessage(t *testing.T) {
wantPath: "whatever",
wantConfigErr: true,
},
{
queues: []string{},
format: "plain",
message: "s3://some-bucket/with/stuff/inside",
wantPath: "s3://some-bucket/with/stuff/inside",
wantConfigErr: true,
},

// parse errors
{
queues: []string{"some-queue"},
name: "invalid json payload",
format: "json",
expression: "Foo.Bar",
Expand All @@ -186,6 +200,7 @@ func TestSQSParseMessage(t *testing.T) {
wantParseErr: true,
},
{
queues: []string{"some-queue"},
name: "field not found",
format: "json",
expression: "Foo.Bar",
Expand All @@ -197,6 +212,7 @@ func TestSQSParseMessage(t *testing.T) {
wantParseErr: true,
},
{
queues: []string{"some-queue"},
name: "field of wrong type",
format: "json",
expression: "Foo.Bar",
Expand All @@ -220,6 +236,7 @@ func TestSQSParseMessage(t *testing.T) {
in, err := NewSQS(baker.InputParams{
ComponentParams: baker.ComponentParams{
DecodedConfig: &SQSConfig{
QueueNames: tt.queues,
MessageFormat: string(tt.format),
MessageExpression: tt.expression,
},
Expand Down Expand Up @@ -252,6 +269,7 @@ type sqsIntegrationTestCase struct {

// SQS input configuration
queuePrefixes []string // QueuePrefixes configuration parameter
queueNames []string // QueueNames configuration parameter
bucket string // Bucket configuration parameter

// SQS service configuration
Expand Down Expand Up @@ -312,6 +330,21 @@ func TestSQS(t *testing.T) {
"bucket-a,path/to,2.zst",
},
},
{
name: "use provided bucket",
queueNames: []string{"queue-a"},
bucket: "bucket-a",
messages: map[string][]sqs.Message{
"queue-a": {
sqsMessage("path/to/file/1.zst"),
sqsMessage("path/to/file/2.zst"),
},
},
wantRecords: []string{
"bucket-a,path/to,1.zst",
"bucket-a,path/to,2.zst",
},
},
}

for _, tt := range tests {
Expand All @@ -334,6 +367,7 @@ Name="sqs"
Bucket=%q
MessageFormat="plain"
QueuePrefixes=[%v]
QueueNames=[%v]
FilePathFilter=".*"
[output]
Expand All @@ -352,8 +386,12 @@ fields=["bucket", "path", "filename"]
for _, pref := range tc.queuePrefixes {
prefixes = append(prefixes, `"`+pref+`"`)
}
queues := []string{}
for _, name := range tc.queueNames {
queues = append(queues, `"`+name+`"`)
}

r := strings.NewReader(fmt.Sprintf(toml, tc.bucket, strings.Join(prefixes, ",")))
r := strings.NewReader(fmt.Sprintf(toml, tc.bucket, strings.Join(prefixes, ","), strings.Join(queues, ",")))
cfg, err := baker.NewConfigFromToml(r, comp)
if err != nil {
t.Fatal(err)
Expand Down Expand Up @@ -426,6 +464,23 @@ func (c *mockSQSClient) ListQueuesWithContext(ctx aws.Context, input *sqs.ListQu
return out, nil
}

func (c *mockSQSClient) GetQueueUrlWithContext(ctx aws.Context, input *sqs.GetQueueUrlInput, options ...request.Option) (*sqs.GetQueueUrlOutput, error) {
c.mu.Lock()
defer c.mu.Unlock()

queueName := ""
for name := range c.queues {
if name == *input.QueueName {
queueName = "https://sqs.us-west-2.amazonaws.com/123456789012/" + name
break
}
}

out := &sqs.GetQueueUrlOutput{QueueUrl: &queueName}
log.WithFields(log.Fields{"sqs": "GetQueueUrlWithContext", "input": *input, "out": *out}).Debug()
return out, nil
}

// ReceiveMessageWithContext sends the first message for the requested queue, if
// any, then removes it from the queue.
func (c *mockSQSClient) ReceiveMessageWithContext(ctx aws.Context, input *sqs.ReceiveMessageInput, options ...request.Option) (*sqs.ReceiveMessageOutput, error) {
Expand Down

0 comments on commit 7888e89

Please sign in to comment.