-
-
Notifications
You must be signed in to change notification settings - Fork 154
/
Copy pathexporter.go
83 lines (71 loc) · 2.03 KB
/
exporter.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package sqsexporter
import (
"context"
"encoding/json"
"fmt"
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
"github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/thomaspoignant/go-feature-flag/exporter"
"log"
"sync"
)
type Exporter struct {
// QueueURL is the URL of your SQS queue
// (mandatory)
QueueURL string
// AwsConfig is the AWS SDK configuration object we will use to
// upload your exported data files.
AwsConfig *aws.Config
init sync.Once
sqsService SQSSendMessageAPI
}
// Export is sending SQS event for each featureEvents received.
func (f *Exporter) Export(ctx context.Context, _ *log.Logger, featureEvents []exporter.FeatureEvent) error {
if f.AwsConfig == nil {
cfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return fmt.Errorf("impossible to init SQS exporter: %v", err)
}
f.AwsConfig = &cfg
}
if f.QueueURL == "" {
return fmt.Errorf("impossible to init SQS exporter: QueueURL is a mandatory parameter")
}
if f.sqsService == nil {
f.init.Do(func() {
f.sqsService = sqs.NewFromConfig(*f.AwsConfig)
})
}
for _, event := range featureEvents {
messageBody, err := json.Marshal(event)
if err != nil {
return err
}
_, err = f.sqsService.SendMessage(ctx, &sqs.SendMessageInput{
MessageBody: aws.String(string(messageBody)),
QueueUrl: aws.String(f.QueueURL),
MessageAttributes: map[string]types.MessageAttributeValue{
"emitter": types.MessageAttributeValue{
DataType: aws.String("String"),
StringValue: aws.String("GO Feature Flag"),
},
},
})
if err != nil {
return err
}
}
return nil
}
func (f *Exporter) IsBulk() bool {
return false
}
// SQSSendMessageAPI defines the interface for the GetQueueUrl and SendMessage functions.
// We use this interface to test the functions using a mocked service.
type SQSSendMessageAPI interface {
SendMessage(ctx context.Context,
params *sqs.SendMessageInput,
optFns ...func(*sqs.Options)) (*sqs.SendMessageOutput, error)
}