Skip to content

Commit

Permalink
Migrate SQS SendMessage w/o smoke test
Browse files Browse the repository at this point in the history
  • Loading branch information
kojisaikiAtSony committed Apr 26, 2024
1 parent 558e44a commit f3062f3
Show file tree
Hide file tree
Showing 7 changed files with 258 additions and 163 deletions.
86 changes: 0 additions & 86 deletions app/gosqs/gosqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,92 +95,6 @@ func PeriodicTasks(d time.Duration, quit <-chan struct{}) {
}
}

func SendMessage(w http.ResponseWriter, req *http.Request) {
w.Header().Set("Content-Type", "application/xml")
req.ParseForm()
messageBody := req.FormValue("MessageBody")
messageGroupID := req.FormValue("MessageGroupId")
messageDeduplicationID := req.FormValue("MessageDeduplicationId")
messageAttributes := extractMessageAttributes(req, "")

queueUrl := getQueueFromPath(req.FormValue("QueueUrl"), req.URL.String())

queueName := ""
if queueUrl == "" {
vars := mux.Vars(req)
queueName = vars["queueName"]
} else {
uriSegments := strings.Split(queueUrl, "/")
queueName = uriSegments[len(uriSegments)-1]
}

if _, ok := app.SyncQueues.Queues[queueName]; !ok {
// Queue does not exist
createErrorResponse(w, req, "QueueNotFound")
return
}

if app.SyncQueues.Queues[queueName].MaximumMessageSize > 0 &&
len(messageBody) > app.SyncQueues.Queues[queueName].MaximumMessageSize {
// Message size is too big
createErrorResponse(w, req, "MessageTooBig")
return
}

delaySecs := app.SyncQueues.Queues[queueName].DelaySeconds
if mv := req.FormValue("DelaySeconds"); mv != "" {
delaySecs, _ = strconv.Atoi(mv)
}

log.Println("Putting Message in Queue:", queueName)
msg := app.Message{MessageBody: []byte(messageBody)}
if len(messageAttributes) > 0 {
msg.MessageAttributes = messageAttributes
msg.MD5OfMessageAttributes = common.HashAttributes(messageAttributes)
}
msg.MD5OfMessageBody = common.GetMD5Hash(messageBody)
msg.Uuid, _ = common.NewUUID()
msg.GroupID = messageGroupID
msg.DeduplicationID = messageDeduplicationID
msg.SentTime = time.Now()
msg.DelaySecs = delaySecs

app.SyncQueues.Lock()
fifoSeqNumber := ""
if app.SyncQueues.Queues[queueName].IsFIFO {
fifoSeqNumber = app.SyncQueues.Queues[queueName].NextSequenceNumber(messageGroupID)
}

if !app.SyncQueues.Queues[queueName].IsDuplicate(messageDeduplicationID) {
app.SyncQueues.Queues[queueName].Messages = append(app.SyncQueues.Queues[queueName].Messages, msg)
} else {
log.Debugf("Message with deduplicationId [%s] in queue [%s] is duplicate ", messageDeduplicationID, queueName)
}

app.SyncQueues.Queues[queueName].InitDuplicatation(messageDeduplicationID)
app.SyncQueues.Unlock()
log.Infof("%s: Queue: %s, Message: %s\n", time.Now().Format("2006-01-02 15:04:05"), queueName, msg.MessageBody)

respStruct := app.SendMessageResponse{
Xmlns: "http://queue.amazonaws.com/doc/2012-11-05/",
Result: app.SendMessageResult{
MD5OfMessageAttributes: msg.MD5OfMessageAttributes,
MD5OfMessageBody: msg.MD5OfMessageBody,
MessageId: msg.Uuid,
SequenceNumber: fifoSeqNumber,
},
Metadata: app.ResponseMetadata{
RequestId: "00000000-0000-0000-0000-000000000000",
},
}

enc := xml.NewEncoder(w)
enc.Indent(" ", " ")
if err := enc.Encode(respStruct); err != nil {
log.Printf("error: %v\n", err)
}
}

type SendEntry struct {
Id string
MessageBody string
Expand Down
Loading

0 comments on commit f3062f3

Please sign in to comment.