Skip to content

Commit

Permalink
Add PublishV1 for JSON support
Browse files Browse the repository at this point in the history
  • Loading branch information
dhumphreys01 authored and Admiral-Piett committed Jul 16, 2024
1 parent 89b7e55 commit 1a9d01b
Show file tree
Hide file tree
Showing 20 changed files with 1,341 additions and 678 deletions.
2 changes: 0 additions & 2 deletions .github/README.md
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# GoAws
[![Build Status](https://travis-ci.org/p4tin/goaws.svg?branch=master)](https://travis-ci.org/p4tin/goaws)

You are always welcome to [tweet the creator in chief](https://twitter.com/gocodecloud) or [buy him a coffee](https://www.paypal.me/p4tin)

Written in Go this is a clone of the AWS SQS/SNS systems. This system is designed to emulate SQS and SNS in a local environment so developers can test their interfaces without having to connect to the AWS Cloud and possibly incurring the expense, or even worse actually write to production topics/queues by mistake. If you see any problems or would like to see a new feature, please open an issue here in github. As well, I will logon to Gitter so we can discuss your deployment issues or the weather.


Expand Down
15 changes: 13 additions & 2 deletions app/conf/mock-data/mock-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,21 @@ BaseUnitTests:
- Name: unit-queue2
RedrivePolicy: '{"maxReceiveCount": 100, "deadLetterTargetArn":"arn:aws:sqs:us-east-1:100010001000:other-queue1"}'
- Name: other-queue1
- Name: subscribed-queue2
- Name: subscribed-queue1
- Name: subscribed-queue3
Topics:
- Name: unit-topic1
Subscriptions:
- QueueName: subscribed-queue2
- QueueName: subscribed-queue1
Raw: true
- Name: unit-topic2
- Name: unit-topic3
Subscriptions:
- QueueName: subscribed-queue3
Raw: false
- Name: unit-topic-http
Subscriptions:
- Protocol: http
EndPoint: http://over.ride.me/for/tests
TopicArn: arn:aws:sqs:region:accountID:unit-topic-http
Raw: true
202 changes: 2 additions & 200 deletions app/gosns/gosns.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,137 +329,8 @@ func DeleteTopic(w http.ResponseWriter, req *http.Request) {

}

// aws --endpoint-url http://localhost:47194 sns publish --topic-arn arn:aws:sns:yopa-local:000000000000:test1 --message "This is a test"
func Publish(w http.ResponseWriter, req *http.Request) {
content := req.FormValue("ContentType")
topicArn := req.FormValue("TopicArn")
subject := req.FormValue("Subject")
messageBody := req.FormValue("Message")
messageStructure := req.FormValue("MessageStructure")
messageAttributes := getMessageAttributesFromRequest(req)

arnSegments := strings.Split(topicArn, ":")
topicName := arnSegments[len(arnSegments)-1]

_, ok := app.SyncTopics.Topics[topicName]
if ok {
log.WithFields(log.Fields{
"topic": topicName,
"topicArn": topicArn,
"subject": subject,
}).Debug("Publish to Topic")
for _, subs := range app.SyncTopics.Topics[topicName].Subscriptions {
switch app.Protocol(subs.Protocol) {
case app.ProtocolSQS:
publishSQS(w, req, subs, messageBody, messageAttributes, subject, topicArn, topicName, messageStructure)
case app.ProtocolHTTP:
fallthrough
case app.ProtocolHTTPS:
publishHTTP(subs, messageBody, messageAttributes, subject, topicArn)
}
}
} else {
createErrorResponse(w, req, "TopicNotFound")
return
}

//Create the response
msgId, _ := common.NewUUID()
uuid, _ := common.NewUUID()
respStruct := app.PublishResponse{Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/", Result: app.PublishResult{MessageId: msgId}, Metadata: app.ResponseMetadata{RequestId: uuid}}
SendResponseBack(w, req, respStruct, content)
}

func publishSQS(w http.ResponseWriter, req *http.Request,
subs *app.Subscription, messageBody string, messageAttributes map[string]app.MessageAttributeValue,
subject string, topicArn string, topicName string, messageStructure string) {
if subs.FilterPolicy != nil && !subs.FilterPolicy.IsSatisfiedBy(messageAttributes) {
return
}

endPoint := subs.EndPoint
uriSegments := strings.Split(endPoint, "/")
queueName := uriSegments[len(uriSegments)-1]
arnSegments := strings.Split(queueName, ":")
queueName = arnSegments[len(arnSegments)-1]

if _, ok := app.SyncQueues.Queues[queueName]; ok {
msg := app.Message{}

if subs.Raw == false {
m, err := CreateMessageBody(subs, messageBody, subject, messageStructure, messageAttributes)
if err != nil {
createErrorResponse(w, req, err.Error())
return
}

msg.MessageBody = m
} else {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
m, err := extractMessageFromJSON(messageBody, subs.Protocol)
if err == nil {
msg.MessageBody = []byte(m)
} else {
msg.MessageBody = []byte(messageBody)
}
}

msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
app.SyncQueues.Lock()
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
app.SyncQueues.Unlock()

log.Infof("%s: Topic: %s(%s), Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), topicName, queueName, msg.MessageBody)
} else {
log.Infof("%s: Queue %s does not exist, message discarded\n", time.Now().Format("2006-01-02 15:04:05"), queueName)
}
}

func publishHTTP(subs *app.Subscription, messageBody string, messageAttributes map[string]app.MessageAttributeValue,
subject string, topicArn string) {
id, _ := common.NewUUID()
msg := app.SNSMessage{
Type: "Notification",
MessageId: id,
TopicArn: topicArn,
Subject: subject,
Message: messageBody,
Timestamp: time.Now().UTC().Format(time.RFC3339),
SignatureVersion: "1",
SigningCertURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/SimpleNotificationService/" + id + ".pem",
UnsubscribeURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/?Action=Unsubscribe&SubscriptionArn=" + subs.SubscriptionArn,
MessageAttributes: formatAttributes(messageAttributes),
}

signature, err := signMessage(PrivateKEY, &msg)
if err != nil {
log.Error(err)
} else {
msg.Signature = signature
}
err = callEndpoint(subs.EndPoint, subs.SubscriptionArn, msg, subs.Raw)
if err != nil {
log.WithFields(log.Fields{
"EndPoint": subs.EndPoint,
"ARN": subs.SubscriptionArn,
"error": err.Error(),
}).Error("Error calling endpoint")
}
}

func formatAttributes(values map[string]app.MessageAttributeValue) map[string]app.MsgAttr {
attr := make(map[string]app.MsgAttr)
for k, v := range values {
attr[k] = app.MsgAttr{
Type: v.DataType,
Value: v.Value,
}
}
return attr
}

// NOTE: The use case for this is to use GoAWS to call some external system with the message payload. Essentially
// it is a localized subscription to some non-AWS endpoint.
func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool) error {
log.WithFields(log.Fields{
"sns": msg,
Expand Down Expand Up @@ -524,75 +395,6 @@ func callEndpoint(endpoint string, subArn string, msg app.SNSMessage, raw bool)
return nil
}

func getMessageAttributesFromRequest(req *http.Request) map[string]app.MessageAttributeValue {
attributes := make(map[string]app.MessageAttributeValue)

for i := 1; true; i++ {
name := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Name", i))
if name == "" {
break
}

dataType := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.DataType", i))
if dataType == "" {
log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
continue
}

// StringListValue and BinaryListValue is currently not implemented
for _, valueKey := range [...]string{"StringValue", "BinaryValue"} {
value := req.FormValue(fmt.Sprintf("MessageAttributes.entry.%d.Value.%s", i, valueKey))
if value != "" {
attributes[name] = app.MessageAttributeValue{Name: name, DataType: dataType, Value: value, ValueKey: valueKey}
}
}

if _, ok := attributes[name]; !ok {
log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
}
}

return attributes
}

func CreateMessageBody(subs *app.Subscription, msg string, subject string, messageStructure string,
messageAttributes map[string]app.MessageAttributeValue) ([]byte, error) {

msgId, _ := common.NewUUID()

message := app.SNSMessage{
Type: "Notification",
MessageId: msgId,
TopicArn: subs.TopicArn,
Subject: subject,
Timestamp: time.Now().UTC().Format(time.RFC3339),
SignatureVersion: "1",
SigningCertURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/SimpleNotificationService/" + msgId + ".pem",
UnsubscribeURL: "http://" + app.CurrentEnvironment.Host + ":" + app.CurrentEnvironment.Port + "/?Action=Unsubscribe&SubscriptionArn=" + subs.SubscriptionArn,
MessageAttributes: formatAttributes(messageAttributes),
}

if app.MessageStructure(messageStructure) == app.MessageStructureJSON {
m, err := extractMessageFromJSON(msg, subs.Protocol)
if err != nil {
return nil, err
}
message.Message = m
} else {
message.Message = msg
}

signature, err := signMessage(PrivateKEY, &message)
if err != nil {
log.Error(err)
} else {
message.Signature = signature
}

byteMsg, _ := json.Marshal(message)
return byteMsg, nil
}

func extractMessageFromJSON(msg string, protocol string) (string, error) {
var msgWithProtocols map[string]string
if err := json.Unmarshal([]byte(msg), &msgWithProtocols); err != nil {
Expand Down
14 changes: 8 additions & 6 deletions app/gosns/gosns_create_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ const (
messageAttributesKey = "MessageAttributes"
)

// TODO - Admiral-Piett - merge these with `publish_test.go`

// When simple message string is passed,
// it must be used for all subscribers (no matter the protocol)
func TestCreateMessageBody_NonJson(t *testing.T) {
Expand All @@ -27,7 +29,7 @@ func TestCreateMessageBody_NonJson(t *testing.T) {
Raw: false,
}

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureEmpty, make(map[string]app.MessageAttributeValue))
snsMessage, err := createMessageBody(subs, message, subject, messageStructureEmpty, make(map[string]app.MessageAttributeValue))
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -69,7 +71,7 @@ func TestCreateMessageBody_OnlyDefaultValueInJson(t *testing.T) {
message := `{"default": "default message text", "http": "HTTP message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -112,7 +114,7 @@ func TestCreateMessageBody_OnlySqsValueInJson(t *testing.T) {
message := `{"sqs": "message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil)
if err == nil {
t.Fatalf(`error expected but instead SNS message was returned: %s`, snsMessage)
}
Expand All @@ -130,7 +132,7 @@ func TestCreateMessageBody_BothDefaultAndSqsValuesInJson(t *testing.T) {
message := `{"default": "default message text", "sqs": "sqs message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureJSON, nil)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureJSON, nil)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -173,7 +175,7 @@ func TestCreateMessageBody_NonJsonContainingJson(t *testing.T) {
message := `{"default": "default message text", "sqs": "sqs message text"}`
subject := "subject"

snsMessage, err := CreateMessageBody(subs, message, subject, "", nil)
snsMessage, err := createMessageBody(subs, message, subject, "", nil)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down Expand Up @@ -219,7 +221,7 @@ func TestCreateMessageBody_WithMessageAttributes(t *testing.T) {
attributes := map[string]app.MessageAttributeValue{
stringMessageAttributeValue.DataType: stringMessageAttributeValue,
}
snsMessage, err := CreateMessageBody(subs, message, subject, messageStructureEmpty, attributes)
snsMessage, err := createMessageBody(subs, message, subject, messageStructureEmpty, attributes)
if err != nil {
t.Fatalf(`error creating SNS message: %s`, err)
}
Expand Down
Loading

0 comments on commit 1a9d01b

Please sign in to comment.