Skip to content

Commit

Permalink
Add DefaultRetryer to SNS topics
Browse files Browse the repository at this point in the history
  • Loading branch information
elmarcoh committed Apr 13, 2018
1 parent ae8441a commit b848bee
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 38 deletions.
33 changes: 33 additions & 0 deletions retryer/retryer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package retryer

import (
"time"

"github.com/aws/aws-sdk-go/aws/request"
)

// DefaultRetryer implements an AWS `request.Retryer` that has a custom delay
// for credential errors (403 statuscode).
// This is needed in order to wait for credentials to be valid for SQS requests
// due to AWS "eventually consistent" credentials:
// https://docs.aws.amazon.com/IAM/latest/UserGuide/troubleshoot_general.html
type DefaultRetryer struct {
request.Retryer
Delay time.Duration
}

// RetryRules returns the delay for the next request to be made
func (r DefaultRetryer) RetryRules(req *request.Request) time.Duration {
if req.HTTPResponse.StatusCode == 403 {
return r.Delay
}
return r.Retryer.RetryRules(req)
}

// ShouldRetry determines if the passed request should be retried
func (r DefaultRetryer) ShouldRetry(req *request.Request) bool {
if req.HTTPResponse.StatusCode == 403 {
return true
}
return r.Retryer.ShouldRetry(req)
}
17 changes: 11 additions & 6 deletions sns/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,16 @@ import (
"os"
"strings"
"sync"
"time"

"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/client"
"github.com/aws/aws-sdk-go/aws/credentials"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/sns"
"github.com/aws/aws-sdk-go/service/sns/snsiface"
b64 "github.com/zerofox-oss/go-aws-msg/base64"
"github.com/zerofox-oss/go-aws-msg/retryer"
"github.com/zerofox-oss/go-msg"
)

Expand All @@ -40,6 +43,10 @@ func NewTopic(topicARN string) (msg.Topic, error) {
conf := &aws.Config{
Credentials: credentials.NewCredentials(&credentials.EnvProvider{}),
Region: aws.String("us-west-2"),
Retryer: retryer.DefaultRetryer{
Retryer: client.DefaultRetryer{NumMaxRetries: 7},
Delay: 2 * time.Second,
},
}

// You may override AWS_REGION, SNS_ENDPOINT
Expand Down Expand Up @@ -124,7 +131,7 @@ func (w *MessageWriter) Close() error {
w.mux.Lock()
defer w.mux.Unlock()

if w.closed == true {
if w.closed {
return msg.ErrClosedMessageWriter
}
w.closed = true
Expand All @@ -137,10 +144,8 @@ func (w *MessageWriter) Close() error {
}

log.Printf("[TRACE] writing to sns: %v", snsPublishParams)
if _, err := w.snsClient.PublishWithContext(w.ctx, snsPublishParams); err != nil {
return err
}
return nil
_, err := w.snsClient.PublishWithContext(w.ctx, snsPublishParams)
return err
}

// Write writes data to the MessageWriter's internal buffer for aggregation
Expand All @@ -152,7 +157,7 @@ func (w *MessageWriter) Write(p []byte) (int, error) {
w.mux.Lock()
defer w.mux.Unlock()

if w.closed == true {
if w.closed {
return 0, msg.ErrClosedMessageWriter
}
return w.buf.Write(p)
Expand Down
39 changes: 7 additions & 32 deletions sqs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"os"
"time"

"github.com/zerofox-oss/go-aws-msg/retryer"
"github.com/zerofox-oss/go-msg"

"github.com/aws/aws-sdk-go/aws"
Expand Down Expand Up @@ -149,32 +150,6 @@ func (s *Server) Shutdown(ctx context.Context) error {
}
}

// DefaultRetryer implements an AWS `request.Retryer` that has a custom delay
// for credential errors (403 statuscode).
// This is needed in order to wait for credentials to be valid for SQS requests
// due to AWS "eventually consistent" credentials:
// https://docs.aws.amazon.com/IAM/latest/UserGuide/troubleshoot_general.html
type DefaultRetryer struct {
request.Retryer
delay time.Duration
}

// RetryRules returns the delay for the next request to be made
func (r DefaultRetryer) RetryRules(req *request.Request) time.Duration {
if req.HTTPResponse.StatusCode == 403 {
return r.delay
}
return r.Retryer.RetryRules(req)
}

// ShouldRetry determines if the passed request should be retried
func (r DefaultRetryer) ShouldRetry(req *request.Request) bool {
if req.HTTPResponse.StatusCode == 403 {
return true
}
return r.Retryer.ShouldRetry(req)
}

// Option is the signature that modifies a `Server` to set some configuration
type Option func(*Server) error

Expand All @@ -200,6 +175,10 @@ func NewServer(queueURL string, cl int, retryTimeout int64, opts ...Option) (msg
conf := &aws.Config{
Credentials: credentials.NewCredentials(&credentials.EnvProvider{}),
Region: aws.String("us-west-2"),
Retryer: retryer.DefaultRetryer{
Retryer: client.DefaultRetryer{NumMaxRetries: 7},
Delay: 2 * time.Second,
},
}

// http://docs.aws.amazon.com/sdk-for-go/api/aws/client/#Config
Expand All @@ -210,10 +189,6 @@ func NewServer(queueURL string, cl int, retryTimeout int64, opts ...Option) (msg
if url := os.Getenv("SQS_ENDPOINT"); url != "" {
conf.Endpoint = aws.String(url)
}
conf.Retryer = DefaultRetryer{
Retryer: client.DefaultRetryer{NumMaxRetries: 7},
delay: 2 * time.Second,
}

// Create an SQS Client with creds from the Environment
svc := sqs.New(sess, conf)
Expand Down Expand Up @@ -273,9 +248,9 @@ func WithRetries(delay time.Duration, max int) Option {
if err != nil {
return err
}
c.Retryer = DefaultRetryer{
c.Retryer = retryer.DefaultRetryer{
Retryer: client.DefaultRetryer{NumMaxRetries: max},
delay: delay,
Delay: delay,
}
s.Svc = sqs.New(s.session, c)
return nil
Expand Down

0 comments on commit b848bee

Please sign in to comment.