Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Sns publish batch #314

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
46 commits
Select commit Hold shift + click to select a range
95ade17
Implement SNS:PublishBatch
goddenrich Aug 12, 2024
97c1a04
Fixes
goddenrich Aug 12, 2024
a24ff2e
Make some edits re feedback
goddenrich Aug 12, 2024
eb7a8af
Add PublishBatch to servertest
goddenrich Aug 12, 2024
37d5c64
fix assert lenght test
goddenrich Aug 12, 2024
2fbb393
support aws json
goddenrich Aug 12, 2024
1759c4d
review comments
goddenrich Aug 28, 2024
4b9e312
use message creator interface
goddenrich Aug 28, 2024
e4c58c9
add publish batch smoke test
goddenrich Aug 28, 2024
93af1b3
Merge branch 'feature-aws-json' of github.com:Admiral-Piett/goaws int…
goddenrich Aug 28, 2024
07b9b9b
fix send message
goddenrich Aug 28, 2024
a3fed24
add id (required) to publish entries
goddenrich Aug 29, 2024
faf53c4
fix smoke tests
goddenrich Aug 29, 2024
05cd951
Merge remote-tracking branch 'upstream/feature-aws-json' into SNS-Pub…
goddenrich Aug 29, 2024
0e7bfeb
fix publish batch unit tests
goddenrich Aug 29, 2024
d3188c6
Merge remote-tracking branch 'upstream/feature-aws-json' into SNS-Pub…
goddenrich Sep 2, 2024
93f700c
add sample test
kojisaiki Dec 27, 2023
681ddcd
super roughly passed
kojisaiki Dec 27, 2023
1dcdb36
update attribute format on aws-json protocol
kojisaiki Dec 27, 2023
8c19119
Try routing refactoring
kojisaiki Jan 3, 2024
a86ab6c
Refactor for CreateQueue for V1 JSON support
dhumphreys01 Jan 17, 2024
da44977
Add ListQueuesV1 for JSON support
dhumphreys01 Apr 24, 2024
8ec7d7e
Add GetQueueAttributesV1 for JSON support
dhumphreys01 May 1, 2024
edaa9b7
Migrate SendMessage (+15 squashed commits)
kojisaikiAtSony Mar 14, 2024
8f356e7
SQS JSON API - ReceiveMessage, ChangeMessageVisibility, DeleteMessage
andrew-womeldorf May 23, 2024
b3d5762
Add SetQueueAttributesV1 for JSON support
dhumphreys01 May 17, 2024
fcdd208
Add PurgeQueueV1 for JSON support
dhumphreys01 Jun 3, 2024
a65cc66
Add GetQueueUrlV1 for JSON support
Jun 3, 2024
20c9e6c
update
kojisaikiAtSony Jun 10, 2024
1fd4ded
Add DeleteQueueV1 for JSON support
dhumphreys01 Jun 18, 2024
740700f
Add SNS SubscribeV1 for new pattern support
dhumphreys01 Jun 25, 2024
00ecbda
Add CreateTopicV1 for JSON support
kojisaikiAtSony Jul 4, 2024
2374d82
Add UnsubscribeV1 for JSON support
dhumphreys01 Jul 10, 2024
e5f9bdb
Add SendMessageBatchV1 for JSON support
Jul 11, 2024
5a19808
added Delete Message Batch V1
Jul 9, 2024
233c2ed
Add PublishV1 for JSON support
dhumphreys01 Jul 15, 2024
150a02b
add sns listtopics support
NRatSony Jul 9, 2024
4d51d27
add delete topic v1 support
NRatSony Jul 9, 2024
5555f5e
fixed subsribe logic
Jul 30, 2024
bacb1d3
add sns list subcriptions support
NRatSony Aug 6, 2024
f06f3d7
added get subscription attributes
Aug 7, 2024
ff7a55e
add list subscriptions by topic v1
Aug 14, 2024
8a8707a
add set subscription attributes v1
kojisaikiAtSony Sep 3, 2024
1a058d0
added confirm subscription
Sep 4, 2024
adb72c7
Merge remote-tracking branch 'upstream/feature-aws-json' into SNS-Pub…
goddenrich Sep 25, 2024
f12b05a
Merge remote-tracking branch 'upstream/master' into SNS-PublishBatch
goddenrich Sep 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Here is a list of the APIs:
- [x] Subscribe (raw)
- [x] ListSubscriptions
- [x] Publish
- [x] PublishBatch
- [x] DeleteTopic
- [x] Subscribe
- [x] Unsubscribe
Expand Down
112 changes: 58 additions & 54 deletions app/gosns/publish.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,27 +38,27 @@ func PublishV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
topicName := arnSegments[len(arnSegments)-1]

_, ok = app.SyncTopics.Topics[topicName]
if ok {
log.WithFields(log.Fields{
"topic": topicName,
"topicArn": requestBody.TopicArn,
"subject": requestBody.Subject,
}).Debug("Publish to Topic")
for _, subscription := range app.SyncTopics.Topics[topicName].Subscriptions {
switch app.Protocol(subscription.Protocol) {
case app.ProtocolSQS:
err := publishSQS(subscription, topicName, requestBody)
if err != nil {
utils.CreateErrorResponseV1(err.Error(), false)
}
case app.ProtocolHTTP:
fallthrough
case app.ProtocolHTTPS:
publishHTTP(subscription, requestBody)
if !ok {
return utils.CreateErrorResponseV1("TopicNotFound", false)
}

log.WithFields(log.Fields{
"topic": topicName,
"topicArn": requestBody.TopicArn,
"subject": requestBody.Subject,
}).Debug("Publish to Topic")
for _, subscription := range app.SyncTopics.Topics[topicName].Subscriptions {
switch app.Protocol(subscription.Protocol) {
case app.ProtocolSQS:
err := publishSQS(subscription, topicName, requestBody)
if err != nil {
utils.CreateErrorResponseV1(err.Error(), false)
}
case app.ProtocolHTTP:
fallthrough
case app.ProtocolHTTPS:
publishHTTP(subscription, requestBody.TopicArn, requestBody)
}
} else {
return utils.CreateErrorResponseV1("TopicNotFound", false)
}

//Create the response
Expand All @@ -74,9 +74,15 @@ func PublishV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
return http.StatusOK, respStruct
}

func publishSQS(subscription *app.Subscription, topicName string, requestBody *models.PublishRequest) error {
messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(requestBody.MessageAttributes)
if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(messageAttributes) {
type MessageCreator interface {
GetMessageAttributes() map[string]app.MessageAttributeValue
GetMessage() string
GetSubject() string
GetMessageStructure() string
}

func publishSQS(subscription *app.Subscription, topicName string, messager MessageCreator) error {
if subscription.FilterPolicy != nil && !subscription.FilterPolicy.IsSatisfiedBy(messager.GetMessageAttributes()) {
return nil
}

Expand All @@ -86,49 +92,47 @@ func publishSQS(subscription *app.Subscription, topicName string, requestBody *m
arnSegments := strings.Split(queueName, ":")
queueName = arnSegments[len(arnSegments)-1]

if _, ok := app.SyncQueues.Queues[queueName]; ok {
msg := app.Message{}

if subscription.Raw == false {
m, err := createMessageBody(subscription, requestBody.Message, requestBody.Subject, requestBody.MessageStructure, messageAttributes)
if err != nil {
return err
}

msg.MessageBody = m
if _, ok := app.SyncQueues.Queues[queueName]; !ok {
log.Infof("Queue %s does not exist, message discarded\n", queueName)
return nil
}
msg := app.Message{}
if subscription.Raw == false {
m, err := createMessageBody(subscription, messager.GetMessage(), messager.GetSubject(), messager.GetMessageStructure(), messager.GetMessageAttributes())
if err != nil {
return err
}
msg.MessageBody = m
} else {
msg.MessageAttributes = messager.GetMessageAttributes()
msg.MD5OfMessageAttributes = common.HashAttributes(msg.MessageAttributes)
m, err := extractMessageFromJSON(messager.GetMessage(), subscription.Protocol)
if err == nil {
msg.MessageBody = []byte(m)
} else {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
m, err := extractMessageFromJSON(requestBody.Message, subscription.Protocol)
if err == nil {
msg.MessageBody = []byte(m)
} else {
msg.MessageBody = []byte(requestBody.Message)
}
msg.MessageBody = []byte(messager.GetMessage())
}
}
msg.MD5OfMessageBody = common.GetMD5Hash(messager.GetMessage())
msg.Uuid = uuid.NewString()

msg.MD5OfMessageBody = common.GetMD5Hash(requestBody.Message)
msg.Uuid, _ = common.NewUUID()
app.SyncQueues.Lock()
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
app.SyncQueues.Unlock()
app.SyncQueues.Lock()
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
app.SyncQueues.Unlock()

log.Infof("%s: Topic: %s(%s), Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), topicName, queueName, msg.MessageBody)
} else {
log.Infof("%s: Queue %s does not exist, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName)
}
log.Infof("Topic: %s(%s), Message: %s\n", topicName, queueName, msg.MessageBody)
return nil
}

func publishHTTP(subs *app.Subscription, requestBody *models.PublishRequest) {
messageAttributes := utils.ConvertToOldMessageAttributeValueStructure(requestBody.MessageAttributes)
func publishHTTP(subs *app.Subscription, topicArn string, messager MessageCreator) {
messageAttributes := messager.GetMessageAttributes()
id := uuid.NewString()
msg := app.SNSMessage{
Type: "Notification",
MessageId: id,
TopicArn: requestBody.TopicArn,
Subject: requestBody.Subject,
Message: requestBody.Message,
Message: messager.GetMessage(),
TopicArn: topicArn,
Subject: messager.GetSubject(),
Timestamp: time.Now().UTC().Format(time.RFC3339),
SignatureVersion: "1",
SigningCertURL: fmt.Sprintf("http://%s:%s/SimpleNotificationService/%s.pem", app.CurrentEnvironment.Host, app.CurrentEnvironment.Port, id),
Expand Down
106 changes: 106 additions & 0 deletions app/gosns/publish_batch.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package gosns

import (
"net/http"
"strings"

"github.com/Admiral-Piett/goaws/app"
"github.com/Admiral-Piett/goaws/app/interfaces"
"github.com/Admiral-Piett/goaws/app/models"
"github.com/Admiral-Piett/goaws/app/utils"
"github.com/google/uuid"
log "github.com/sirupsen/logrus"
)

func PublishBatchV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
requestBody := models.NewPublishBatchRequest()
ok := utils.REQUEST_TRANSFORMER(requestBody, req, false)
if !ok {
log.Error("Invalid Request - PublishBatchV1")
return utils.CreateErrorResponseV1("InvalidParameterValue", false)
}

if requestBody.TopicArn == "" {
return utils.CreateErrorResponseV1("InvalidParameterValue", false)
}

if len(requestBody.PublishBatchRequestEntries.Member) == 0 {
return utils.CreateErrorResponseV1("EmptyBatchRequest", false)
}
if len(requestBody.PublishBatchRequestEntries.Member) > 10 {
return utils.CreateErrorResponseV1("TooManyEntriesInBatchRequest", false)
}

topicArn := requestBody.TopicArn
arnSegments := strings.Split(topicArn, ":")
topicName := arnSegments[len(arnSegments)-1]
topic, ok := app.SyncTopics.Topics[topicName]
if !ok {
return utils.CreateErrorResponseV1("TopicNotFound", false)
}

seen := make(map[string]bool)

for _, entry := range requestBody.PublishBatchRequestEntries.Member {
if entry == nil { // we use gorilla schema to parse value params. Indexing on the aws client starts at 1 but gorilla schema starts at 0 so we may have a nil entry at the start of the slice
continue
}
if entry.ID == "" {
// This is a required field for the PublishBatchRequestEntry entity but doesn't seem required in the request.
// If it's not present in the request then assume we should generate one.
entry.ID = uuid.NewString()
}
if seen[entry.ID] {
return utils.CreateErrorResponseV1("BatchEntryIdsNotDistinct", false)
}
seen[entry.ID] = true
}

successfulEntries := []models.PublishBatchResultEntry{}
failedEntries := []models.BatchResultErrorEntry{}
for _, entry := range requestBody.PublishBatchRequestEntries.Member {
if entry == nil { // we use gorilla schema to parse value params. Indexing on the aws client starts at 1 but gorilla schema starts at 0 so we may have a nil entry at the start of the slice
continue
}
// we now know all the entry.IDs are unique and non-blank
for _, sub := range topic.Subscriptions {
switch app.Protocol(sub.Protocol) {
case app.ProtocolSQS:
if err := publishSQS(sub, topicName, entry); err != nil {
er := models.SnsErrors[err.Error()]
failedEntries = append(failedEntries, models.BatchResultErrorEntry{
Code: er.Code,
Id: entry.ID,
Message: er.Message,
SenderFault: true,
})
} else {
msgId := uuid.NewString()
successfulEntries = append(successfulEntries, models.PublishBatchResultEntry{
Id: entry.ID,
MessageId: msgId,
})
}
case app.ProtocolHTTP:
fallthrough
case app.ProtocolHTTPS:
publishHTTP(sub, topicArn, entry)
msgId := uuid.NewString()
successfulEntries = append(successfulEntries, models.PublishBatchResultEntry{
Id: entry.ID,
MessageId: msgId,
})
}
}
}

respStruct := models.PublishBatchResponse{
Xmlns: models.BASE_XMLNS,
Result: models.PublishBatchResult{
Successful: models.PublishBatchSuccessful{SuccessEntries: successfulEntries},
Failed: models.PublishBatchFailed{ErrorEntries: failedEntries},
},
Metadata: app.ResponseMetadata{RequestId: uuid.NewString()},
}
return http.StatusOK, respStruct
}
Loading
Loading