Skip to content

Commit

Permalink
Fix MessageAttributes propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
kojisaikiAtSony committed May 7, 2024
1 parent 85b0f0b commit 4efb39a
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 52 deletions.
38 changes: 0 additions & 38 deletions app/gosqs/message_attributes.go
Original file line number Diff line number Diff line change
@@ -1,47 +1,9 @@
package gosqs

import (
"fmt"
"net/http"

"github.com/Admiral-Piett/goaws/app"
log "github.com/sirupsen/logrus"
)

func extractMessageAttributes(req *http.Request, prefix string) map[string]app.MessageAttributeValue {
attributes := make(map[string]app.MessageAttributeValue)
if prefix != "" {
prefix += "."
}

for i := 1; true; i++ {
name := req.FormValue(fmt.Sprintf("%sMessageAttribute.%d.Name", prefix, i))
if name == "" {
break
}

dataType := req.FormValue(fmt.Sprintf("%sMessageAttribute.%d.Value.DataType", prefix, i))
if dataType == "" {
log.Warnf("DataType of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
continue
}

// StringListValue and BinaryListValue is currently not implemented
for _, valueKey := range [...]string{"StringValue", "BinaryValue"} {
value := req.FormValue(fmt.Sprintf("%sMessageAttribute.%d.Value.%s", prefix, i, valueKey))
if value != "" {
attributes[name] = app.MessageAttributeValue{name, dataType, value, valueKey}
}
}

if _, ok := attributes[name]; !ok {
log.Warnf("StringValue or BinaryValue of MessageAttribute %s is missing, MD5 checksum will most probably be wrong!\n", name)
}
}

return attributes
}

func getMessageAttributeResult(a *app.MessageAttributeValue) *app.ResultMessageAttribute {
v := &app.ResultMessageAttributeValue{
DataType: a.DataType,
Expand Down
36 changes: 33 additions & 3 deletions app/gosqs/send_message.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
messageBody := requestBody.MessageBody
messageGroupID := requestBody.MessageGroupId
messageDeduplicationID := requestBody.MessageDeduplicationId
messageAttributes := extractMessageAttributes(req, "")
messageAttributes := requestBody.MessageAttributes

queueUrl := getQueueFromPath(requestBody.QueueUrl, req.URL.String())

Expand Down Expand Up @@ -60,8 +60,9 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) {
log.Println("Putting Message in Queue:", queueName)
msg := app.Message{MessageBody: []byte(messageBody)}
if len(messageAttributes) > 0 {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
oldStyleMessageAttributes := convertToOldMessageAttributeValueStructure(messageAttributes)
msg.MessageAttributes = oldStyleMessageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(oldStyleMessageAttributes)
}
msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
Expand Down Expand Up @@ -101,3 +102,32 @@ func SendMessageV1(req *http.Request) (int, interfaces.AbstractResponseBody) {

return http.StatusOK, respStruct
}

// TODO:
// Refactor internal MessageAttribute model between SendMessage and ReceiveMessage
// from app.MessageAttributeValue(old) to models.MessageAttributeValue(new) and remove this temporary function.
func convertToOldMessageAttributeValueStructure(newValues map[string]models.MessageAttributeValue) map[string]app.MessageAttributeValue {
attributes := make(map[string]app.MessageAttributeValue)

for name, entry := range newValues {
// StringListValue and BinaryListValue is currently not implemented
// Please refer app/gosqs/message_attributes.go
value := ""
valueKey := ""
if entry.StringValue != "" {
value = entry.StringValue
valueKey = "StringValue"
} else if entry.BinaryValue != "" {
value = entry.BinaryValue
valueKey = "BinaryValue"
}
attributes[name] = app.MessageAttributeValue{
Name: name,
DataType: entry.DataType,
Value: value,
ValueKey: valueKey,
}
}

return attributes
}
22 changes: 11 additions & 11 deletions app/models/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,21 +157,21 @@ func (r *ListQueueRequest) SetAttributesFromForm(values url.Values) {

func NewSendMessageRequest() *SendMessageRequest {
return &SendMessageRequest{
MessageAttributes: make(map[string]MessageAttributes),
MessageSystemAttributes: make(map[string]MessageAttributes),
MessageAttributes: make(map[string]MessageAttributeValue),
MessageSystemAttributes: make(map[string]MessageAttributeValue),
}
}

type SendMessageRequest struct {
DelaySeconds int `json:"Del1aySeconds" schema:"DelaySeconds"`
MessageAttributes map[string]MessageAttributes `json:"MessageAttributes" schema:"MessageAttributes"`
MessageBody string `json:"MessageBody" schema:"MessageBody"`
MessageDeduplicationId string `json:"MessageDeduplicationId" schema:"MessageDeduplicationId"`
MessageGroupId string `json:"MessageGroupId" schema:"MessageGroupId"`
MessageSystemAttributes map[string]MessageAttributes `json:"MessageSystemAttributes" schema:"MessageSystemAttributes"`
QueueUrl string `json:"QueueUrl" schema:"QueueUrl"`
DelaySeconds int `json:"Del1aySeconds" schema:"DelaySeconds"`
MessageAttributes map[string]MessageAttributeValue `json:"MessageAttributes" schema:"MessageAttributes"`
MessageBody string `json:"MessageBody" schema:"MessageBody"`
MessageDeduplicationId string `json:"MessageDeduplicationId" schema:"MessageDeduplicationId"`
MessageGroupId string `json:"MessageGroupId" schema:"MessageGroupId"`
MessageSystemAttributes map[string]MessageAttributeValue `json:"MessageSystemAttributes" schema:"MessageSystemAttributes"`
QueueUrl string `json:"QueueUrl" schema:"QueueUrl"`
}
type MessageAttributes struct {
type MessageAttributeValue struct {
BinaryListValues []string `json:"BinaryListValues"` // goaws does not supported yet
BinaryValue string `json:"BinaryValue"`
DataType string `json:"DataType"`
Expand All @@ -197,7 +197,7 @@ func (r *SendMessageRequest) SetAttributesFromForm(values url.Values) {
stringValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.StringValue", i))
binaryValue := values.Get(fmt.Sprintf("MessageAttribute.%d.Value.BinaryValue", i))

r.MessageAttributes[name] = MessageAttributes{
r.MessageAttributes[name] = MessageAttributeValue{
DataType: dataType,
StringValue: stringValue,
BinaryValue: binaryValue,
Expand Down
95 changes: 95 additions & 0 deletions smoke_tests/sqs_send_message_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/service/sqs"
sqstypes "github.com/aws/aws-sdk-go-v2/service/sqs/types"
"github.com/gavv/httpexpect/v2"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -79,3 +80,97 @@ func Test_SendMessageV1_json_no_attributes(t *testing.T) {
}
}
}

func Test_SendMessageV1_json_with_attributes(t *testing.T) {
server := generateServer()
defer func() {
server.Close()
utils.ResetResources()
}()

sdkConfig, _ := config.LoadDefaultConfig(context.TODO())
sdkConfig.BaseEndpoint = aws.String(server.URL)
sqsClient := sqs.NewFromConfig(sdkConfig)
sdkResponse, _ := sqsClient.CreateQueue(context.TODO(), &sqs.CreateQueueInput{
QueueName: &af.QueueName,
})
targetQueueUrl := sdkResponse.QueueUrl

// Target test: send a message
targetMessageBody := "Test_SendMessageV1_json_with_attributes"
attr1_dataType := "String"
attr1_value := "attr1_value"
attr2_dataType := "String"
attr2_value := "attr1_value"
sendMessageOutput, _ := sqsClient.SendMessage(context.TODO(), &sqs.SendMessageInput{
QueueUrl: targetQueueUrl,
MessageBody: &targetMessageBody,
MessageAttributes: map[string]sqstypes.MessageAttributeValue{
"attr1": {
DataType: &attr1_dataType,
StringValue: &attr1_value,
},
"attr2": {
DataType: &attr2_dataType,
StringValue: &attr2_value,
},
},
})
assert.NotNil(t, sendMessageOutput.MessageId)

// Assert 1 message in the queue
getQueueAttributesBodyXML := struct {
Action string `xml:"Action"`
Version string `xml:"Version"`
Attribute1 string `xml:"AttributeName.1"`
QueueUrl string `xml:"QueueUrl"`
}{
Action: "GetQueueAttributes",
Version: "2012-11-05",
Attribute1: "All",
QueueUrl: *targetQueueUrl,
}
e := httpexpect.Default(t, server.URL)
r := e.POST("/").
WithForm(getQueueAttributesBodyXML).
Expect().
Status(http.StatusOK).
Body().Raw()
r2 := app.GetQueueAttributesResponse{}
xml.Unmarshal([]byte(r), &r2)
for _, attr := range r2.Result.Attrs {
if attr.Name == "ApproximateNumberOfMessages" {
assert.Equal(t, "1", attr.Value)
}
}

// Receive message and check attribute
receiveMessageBodyXML := struct {
Action string `xml:"Action"`
Version string `xml:"Version"`
Attribute1 string `xml:"AttributeName.1"`
QueueUrl string `xml:"QueueUrl"`
}{
Action: "ReceiveMessage",
Version: "2012-11-05",
QueueUrl: *targetQueueUrl,
}
r = e.POST("/").
WithForm(receiveMessageBodyXML).
Expect().
Status(http.StatusOK).
Body().Raw()
r3 := app.ReceiveMessageResponse{}
xml.Unmarshal([]byte(r), &r3)
message := r3.Result.Message[0]
assert.Equal(t, targetMessageBody, string(message.Body))
assert.Equal(t, 2, len(message.MessageAttributes))
}

func Test_SendMessageV1_json_MaximumMessageSize_TooBig(t *testing.T) {
// TODO
}

func Test_SendMessageV1_json_QueueNotExistant(t *testing.T) {
// TODO
}

0 comments on commit 4efb39a

Please sign in to comment.